Class TcpTransporter

All Implemented Interfaces:
MetricConstants, MoleculerLifecycle

public class TcpTransporter extends Transporter
TCP Transporter with optional UDP discovery ("zero configuration") module.

TCP Transporter uses fault tolerant and peer-to-peer Gossip Protocol to discover location and service information about the other nodes participating in a Moleculer Cluster. In Moleculer's P2P architecture all nodes are equal, there is no "leader" or "controller" node, so the cluster is truly horizontally scalable. This transporter aims to run on top of an infrastructure of hundreds of nodes.

Nodes can explore each other in two ways. With or without UDP packets. If the URLs of all nodes specified in startup parameters (in "tcp://host:port/nodeID" format), TCP Transporter will work without UDP. Otherwise the TCP Transporter starts an UDP server, and the Moleculer nodes detect each other with UDP packets. Events and function calls always go through TCP channels.

TCP Transporter provides the highest speed data transfer between the nodes (eg. hundred thousand packets per second can be transmitted from one node to another over a high-speed LAN). Usage:
 ServiceBroker broker = ServiceBroker.builder().nodeID("node1")
 .transporter(new TcpTransporter()).build();
 
// *
See Also:
  • Field Details

    • PACKET_EVENT_ID

      protected static final byte PACKET_EVENT_ID
      See Also:
    • PACKET_REQUEST_ID

      protected static final byte PACKET_REQUEST_ID
      See Also:
    • PACKET_RESPONSE_ID

      protected static final byte PACKET_RESPONSE_ID
      See Also:
    • PACKET_PING_ID

      protected static final byte PACKET_PING_ID
      See Also:
    • PACKET_PONG_ID

      protected static final byte PACKET_PONG_ID
      See Also:
    • PACKET_GOSSIP_REQ_ID

      protected static final byte PACKET_GOSSIP_REQ_ID
      See Also:
    • PACKET_GOSSIP_RSP_ID

      protected static final byte PACKET_GOSSIP_RSP_ID
      See Also:
    • PACKET_GOSSIP_HELLO_ID

      protected static final byte PACKET_GOSSIP_HELLO_ID
      See Also:
    • port

      protected int port
      TCP port (used by the Transporter and Gossiper services). A port number of zero will let the system pick up an ephemeral port in a bind operation.
    • gossipPeriod

      protected int gossipPeriod
      Gossiping period time, in SECONDS.
    • maxConnections

      protected int maxConnections
      Max number of keep-alive connections (-1 = unlimited, 0 = disable keep-alive connections).
    • maxPacketSize

      protected int maxPacketSize
      Max enable packet size (BYTES).
    • urls

      protected String[] urls
      List of URLs ("tcp://host:port/nodeID" or "host:port/nodeID" or "host/nodeID"), when UDP discovery is disabled.
    • useHostname

      protected boolean useHostname
      Use hostnames instead of IP addresses As the DHCP environment is dynamic, any later attempt to use IPs instead hostnames would most likely yield false results. Therefore, use hostnames if you are using DHCP.
    • udpPort

      protected int udpPort
      UDP broadcast/multicast port
    • udpBindAddress

      protected String udpBindAddress
      UDP bind address (null = autodetect)
    • udpPeriod

      protected int udpPeriod
      UDP broadcast/multicast period in SECONDS
    • udpReuseAddr

      protected boolean udpReuseAddr
      Resuse addresses
    • udpMaxDiscovery

      protected int udpMaxDiscovery
      Maximum number of outgoing multicast packets (0 = runs forever)
    • udpMulticast

      protected String udpMulticast
      UDP multicast address of automatic discovery service.
    • udpMulticastTTL

      protected int udpMulticastTTL
      TTL of UDP multicast packets
    • udpBroadcast

      protected boolean udpBroadcast
      Use UDP broadcast WITH UDP multicast (false = use UDP multicast only)
    • rnd

      protected final Random rnd
      Random generator.
    • gossiperTimer

      protected volatile ScheduledFuture<?> gossiperTimer
      Cancelable timer of gossiper
    • reader

      protected TcpReader reader
      Socket reader
    • writer

      protected TcpWriter writer
      Socket writer
    • locator

      protected UDPLocator locator
      UDP broadcaster
    • currentPort

      protected int currentPort
      Current TCP port
    • startupHost

      protected String startupHost
      The local hostname specified in the "urls" parameter.
    • cachedDescriptor

      protected NodeDescriptor cachedDescriptor
      Current node descriptor
    • timestamp

      protected AtomicLong timestamp
      Current node descriptor's timestamp
    • cachedHelloMessage

      protected byte[] cachedHelloMessage
  • Constructor Details

    • TcpTransporter

      public TcpTransporter()
      Start TCP Transporter in "zero config" mode, with automatic UDP service discovery.
    • TcpTransporter

      public TcpTransporter(String... urls)
      Start TCP Transporter in full TCP mode, without UDP discovery. Valid URL syntax is "tcp://host:port/nodeID" or "host:port/nodeID".
      Parameters:
      urls - list of urls of ALL nodes
    • TcpTransporter

      public TcpTransporter(URL urlList) throws Exception
      Start TCP Transporter in full TCP mode, without UDP discovery. Loads node list from an URL (as an http or file resource in JSON/XML/YAML format). Sample JSON file:

      {
      "nodes":[
      "tcp://host1:port1/nodeID1",
      "tcp://host2:port2/nodeID2",
      "tcp://host3:port3/nodeID3"
      ]
      }
      Parameters:
      urlList - an URL, where the peer configuration's JSON is located
      Throws:
      Exception - URL format exception or any I/O error
  • Method Details

    • connect

      public void connect()
      Specified by:
      connect in class Transporter
    • disconnect

      protected void disconnect()
    • reconnect

      protected void reconnect()
    • stopped

      public void stopped()
      Closes TCP Transporter.
      Specified by:
      stopped in interface MoleculerLifecycle
      Overrides:
      stopped in class Transporter
    • received

      public void received(byte packetID, byte[] packet)
    • unableToSend

      public void unableToSend(String nodeID, LinkedList<byte[]> packets, Throwable cause)
    • sendDisconnectPacket

      protected void sendDisconnectPacket()
      Overrides:
      sendDisconnectPacket in class Transporter
    • subscribe

      public io.datatree.Promise subscribe(String channel)
      Specified by:
      subscribe in class Transporter
    • publish

      public void publish(String channel, io.datatree.Tree message)
      Specified by:
      publish in class Transporter
    • serialize

      protected byte[] serialize(byte packetID, io.datatree.Tree message) throws Exception
      Throws:
      Exception
    • getDescriptor

      public NodeDescriptor getDescriptor()
    • udpPacketReceived

      public void udpPacketReceived(String sender, String host, int port)
    • processGossipHello

      protected void processGossipHello(io.datatree.Tree data)
    • registerAsNewNode

      protected void registerAsNewNode(String sender, String host, int port)
    • sendGossipRequest

      protected io.datatree.Tree sendGossipRequest()
      Create and send a Gossip request packet.
      Returns:
      created Gossip request (used for testing)
    • sendGossipToRandomEndpoint

      protected void sendGossipToRandomEndpoint(String[] endpoints, int size, byte[] packet, io.datatree.Tree message) throws Exception
      Throws:
      Exception
    • processGossipRequest

      protected io.datatree.Tree processGossipRequest(io.datatree.Tree data) throws Exception
      Throws:
      Exception
    • processGossipResponse

      protected void processGossipResponse(io.datatree.Tree data) throws Exception
      Throws:
      Exception
    • generateGossipHello

      public byte[] generateGossipHello()
      Create Gossip HELLO packet. Hello message is invariable, so we can cache it.
      Returns:
      created "hello" request
    • broadcastInfoPacket

      public void broadcastInfoPacket()
      Overrides:
      broadcastInfoPacket in class Transporter
    • setHeartbeatInterval

      public void setHeartbeatInterval(int heartbeatInterval)
      Overrides:
      setHeartbeatInterval in class Transporter
    • setHeartbeatTimeout

      public void setHeartbeatTimeout(int heartbeatTimeout)
      Overrides:
      setHeartbeatTimeout in class Transporter
    • getUrls

      public String[] getUrls()
    • setUrls

      public void setUrls(String... urls)
    • getPort

      public int getPort()
    • setPort

      public void setPort(int port)
    • getGossipPeriod

      public int getGossipPeriod()
    • setGossipPeriod

      public void setGossipPeriod(int gossipPeriod)
    • getMaxConnections

      public int getMaxConnections()
    • setMaxConnections

      public void setMaxConnections(int maxConnections)
    • getMaxPacketSize

      public int getMaxPacketSize()
    • setMaxPacketSize

      public void setMaxPacketSize(int maxPacketSize)
    • getCurrentPort

      public int getCurrentPort()
    • isUseHostname

      public boolean isUseHostname()
    • setUseHostname

      public void setUseHostname(boolean preferHostname)
    • isUdpBroadcast

      public boolean isUdpBroadcast()
    • setUdpBroadcast

      public void setUdpBroadcast(boolean udpBroadcast)
    • getUdpMaxDiscovery

      public int getUdpMaxDiscovery()
    • setUdpMaxDiscovery

      public void setUdpMaxDiscovery(int udpMaxDiscovery)
    • getUdpPeriod

      public int getUdpPeriod()
    • setUdpPeriod

      public void setUdpPeriod(int udpPeriod)
    • isUdpReuseAddr

      public boolean isUdpReuseAddr()
    • setUdpReuseAddr

      public void setUdpReuseAddr(boolean udpReuseAddr)
    • getUdpPort

      public int getUdpPort()
    • setUdpPort

      public void setUdpPort(int udpPort)
    • getUdpBindAddress

      public String getUdpBindAddress()
    • setUdpBindAddress

      public void setUdpBindAddress(String udpBindAddress)
    • getUdpMulticast

      public String getUdpMulticast()
    • setUdpMulticast

      public void setUdpMulticast(String udpMulticast)
    • getUdpMulticastTTL

      public int getUdpMulticastTTL()
    • setUdpMulticastTTL

      public void setUdpMulticastTTL(int udpMulticastTTL)