Class TcpTransporter
java.lang.Object
services.moleculer.service.MoleculerComponent
services.moleculer.transporter.Transporter
services.moleculer.transporter.TcpTransporter
- All Implemented Interfaces:
MetricConstants, MoleculerLifecycle
TCP Transporter with optional UDP discovery ("zero configuration") module.
TCP Transporter uses fault tolerant and peer-to-peer Gossip Protocol to discover location and service information about the other nodes participating in a Moleculer Cluster. In Moleculer's P2P architecture all nodes are equal, there is no "leader" or "controller" node, so the cluster is truly horizontally scalable. This transporter aims to run on top of an infrastructure of hundreds of nodes.
Nodes can explore each other in two ways. With or without UDP packets. If the URLs of all nodes specified in startup parameters (in "tcp://host:port/nodeID" format), TCP Transporter will work without UDP. Otherwise the TCP Transporter starts an UDP server, and the Moleculer nodes detect each other with UDP packets. Events and function calls always go through TCP channels.
TCP Transporter provides the highest speed data transfer between the nodes (eg. hundred thousand packets per second can be transmitted from one node to another over a high-speed LAN). Usage:
TCP Transporter uses fault tolerant and peer-to-peer Gossip Protocol to discover location and service information about the other nodes participating in a Moleculer Cluster. In Moleculer's P2P architecture all nodes are equal, there is no "leader" or "controller" node, so the cluster is truly horizontally scalable. This transporter aims to run on top of an infrastructure of hundreds of nodes.
Nodes can explore each other in two ways. With or without UDP packets. If the URLs of all nodes specified in startup parameters (in "tcp://host:port/nodeID" format), TCP Transporter will work without UDP. Otherwise the TCP Transporter starts an UDP server, and the Moleculer nodes detect each other with UDP packets. Events and function calls always go through TCP channels.
TCP Transporter provides the highest speed data transfer between the nodes (eg. hundred thousand packets per second can be transmitted from one node to another over a high-speed LAN). Usage:
ServiceBroker broker = ServiceBroker.builder().nodeID("node1")
.transporter(new TcpTransporter()).build();
// *- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected NodeDescriptorCurrent node descriptorprotected byte[]protected intCurrent TCP portprotected ScheduledFuture<?> Cancelable timer of gossiperprotected intGossiping period time, in SECONDS.protected UDPLocatorUDP broadcasterprotected intMax number of keep-alive connections (-1 = unlimited, 0 = disable keep-alive connections).protected intMax enable packet size (BYTES).protected static final byteprotected static final byteprotected static final byteprotected static final byteprotected static final byteprotected static final byteprotected static final byteprotected static final byteprotected intTCP port (used by the Transporter and Gossiper services).protected TcpReaderSocket readerprotected final RandomRandom generator.protected StringThe local hostname specified in the "urls" parameter.protected AtomicLongCurrent node descriptor's timestampprotected StringUDP bind address (null = autodetect)protected booleanUse UDP broadcast WITH UDP multicast (false = use UDP multicast only)protected intMaximum number of outgoing multicast packets (0 = runs forever)protected StringUDP multicast address of automatic discovery service.protected intTTL of UDP multicast packetsprotected intUDP broadcast/multicast period in SECONDSprotected intUDP broadcast/multicast portprotected booleanResuse addressesprotected String[]List of URLs ("tcp://host:port/nodeID" or "host:port/nodeID" or "host/nodeID"), when UDP discovery is disabled.protected booleanUse hostnames instead of IP addresses As the DHCP environment is dynamic, any later attempt to use IPs instead hostnames would most likely yield false results.protected TcpWriterSocket writerFields inherited from class Transporter
checkTimeoutTimer, counterTransporterPacketsReceivedBytes, counterTransporterPacketsReceivedTotal, counterTransporterPacketsSentBytes, counterTransporterPacketsSentTotal, debug, debugHeartbeats, disconnectChannel, discoverBroadcastChannel, discoverChannel, eventbus, eventChannel, executor, gaugeTransitConnected, heartbeatChannel, heartbeatInterval, heartbeatTimeout, heartBeatTimer, infoBroadcastChannel, infoChannel, infoScheduled, infoScheduledAt, instanceID, lastReceivedMessageAt, metrics, monitor, namespace, nodeID, nodes, offlineTimeout, PACKET_DISCONNECT, PACKET_DISCOVER, PACKET_EVENT, PACKET_HEARTBEAT, PACKET_INFO, PACKET_PING, PACKET_PONG, PACKET_REQUEST, PACKET_RESPONSE, pingChannel, pongChannel, preferHostname, prefix, protocolVersion, registry, requestChannel, responseChannel, scheduler, serializer, serviceInvoker, subscriptionTimeout, uidGenerator, usingJsonSerializerFields inherited from class MoleculerComponent
broker, logger, nameFields inherited from interface MetricConstants
MOLECULER_CACHER_CLEAN_TIME, MOLECULER_CACHER_CLEAN_TIME_DESC, MOLECULER_CACHER_CLEAN_TOTAL, MOLECULER_CACHER_CLEAN_TOTAL_DESC, MOLECULER_CACHER_DEL_TIME, MOLECULER_CACHER_DEL_TIME_DESC, MOLECULER_CACHER_DEL_TOTAL, MOLECULER_CACHER_DEL_TOTAL_DESC, MOLECULER_CACHER_EXPIRED_TOTAL, MOLECULER_CACHER_EXPIRED_TOTAL_DESC, MOLECULER_CACHER_FOUND_TOTAL, MOLECULER_CACHER_FOUND_TOTAL_DESC, MOLECULER_CACHER_GET_TIME, MOLECULER_CACHER_GET_TIME_DESC, MOLECULER_CACHER_GET_TOTAL, MOLECULER_CACHER_GET_TOTAL_DESC, MOLECULER_CACHER_SET_TIME, MOLECULER_CACHER_SET_TIME_DESC, MOLECULER_CACHER_SET_TOTAL, MOLECULER_CACHER_SET_TOTAL_DESC, MOLECULER_CIRCUIT_BREAKER_HALF_OPENED_ACTIVE, MOLECULER_CIRCUIT_BREAKER_HALF_OPENED_ACTIVE_DESC, MOLECULER_CIRCUIT_BREAKER_OPENED_ACTIVE, MOLECULER_CIRCUIT_BREAKER_OPENED_ACTIVE_DESC, MOLECULER_CIRCUIT_BREAKER_OPENED_TOTAL, MOLECULER_CIRCUIT_BREAKER_OPENED_TOTAL_DESC, MOLECULER_EVENT_BROADCAST_TOTAL, MOLECULER_EVENT_BROADCAST_TOTAL_DESC, MOLECULER_EVENT_BROADCASTLOCAL_TOTAL, MOLECULER_EVENT_BROADCASTLOCAL_TOTAL_DESC, MOLECULER_EVENT_EMIT_TOTAL, MOLECULER_EVENT_EMIT_TOTAL_DESC, MOLECULER_EVENT_RECEIVED_ACTIVE, MOLECULER_EVENT_RECEIVED_ACTIVE_DESC, MOLECULER_EVENT_RECEIVED_ERROR_TOTAL, MOLECULER_EVENT_RECEIVED_ERROR_TOTAL_DESC, MOLECULER_EVENT_RECEIVED_TIME, MOLECULER_EVENT_RECEIVED_TIME_DESC, MOLECULER_EVENT_RECEIVED_TOTAL, MOLECULER_EVENT_RECEIVED_TOTAL_DESC, MOLECULER_EXECUTOR, MOLECULER_REQUEST_ACTIVE, MOLECULER_REQUEST_ACTIVE_DESC, MOLECULER_REQUEST_ERROR_TOTAL, MOLECULER_REQUEST_ERROR_TOTAL_DESC, MOLECULER_REQUEST_LEVELS, MOLECULER_REQUEST_LEVELS_DESC, MOLECULER_REQUEST_RETRY_ATTEMPTS_TOTAL, MOLECULER_REQUEST_RETRY_ATTEMPTS_TOTAL_DESC, MOLECULER_REQUEST_TIME, MOLECULER_REQUEST_TIME_DESC, MOLECULER_REQUEST_TIMEOUT_TOTAL, MOLECULER_REQUEST_TIMEOUT_TOTAL_DESC, MOLECULER_REQUEST_TOTAL, MOLECULER_REQUEST_TOTAL_DESC, MOLECULER_SCHEDULER, MOLECULER_TRANSIT_CONNECTED, MOLECULER_TRANSIT_CONNECTED_DESC, MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL, MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL_DESC, MOLECULER_TRANSIT_REQUESTS_ACTIVE, MOLECULER_TRANSIT_REQUESTS_ACTIVE_DESC, MOLECULER_TRANSIT_STREAMS_RECEIVE_ACTIVE, MOLECULER_TRANSIT_STREAMS_RECEIVE_ACTIVE_DESC, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_BYTES, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_BYTES_DESC, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_TOTAL, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_TOTAL_DESC, MOLECULER_TRANSPORTER_PACKETS_SENT_BYTES, MOLECULER_TRANSPORTER_PACKETS_SENT_BYTES_DESC, MOLECULER_TRANSPORTER_PACKETS_SENT_TOTAL, MOLECULER_TRANSPORTER_PACKETS_SENT_TOTAL_DESC -
Constructor Summary
ConstructorsConstructorDescriptionStart TCP Transporter in "zero config" mode, with automatic UDP service discovery.TcpTransporter(String... urls) Start TCP Transporter in full TCP mode, without UDP discovery.TcpTransporter(URL urlList) Start TCP Transporter in full TCP mode, without UDP discovery. -
Method Summary
Modifier and TypeMethodDescriptionvoidvoidconnect()protected voidbyte[]Create Gossip HELLO packet.intintintintintgetPort()intintintintString[]getUrls()booleanbooleanbooleanprotected voidprocessGossipHello(io.datatree.Tree data) protected io.datatree.TreeprocessGossipRequest(io.datatree.Tree data) protected voidprocessGossipResponse(io.datatree.Tree data) voidvoidreceived(byte packetID, byte[] packet) protected voidprotected voidregisterAsNewNode(String sender, String host, int port) protected voidprotected io.datatree.TreeCreate and send a Gossip request packet.protected voidsendGossipToRandomEndpoint(String[] endpoints, int size, byte[] packet, io.datatree.Tree message) protected byte[]serialize(byte packetID, io.datatree.Tree message) voidsetGossipPeriod(int gossipPeriod) voidsetHeartbeatInterval(int heartbeatInterval) voidsetHeartbeatTimeout(int heartbeatTimeout) voidsetMaxConnections(int maxConnections) voidsetMaxPacketSize(int maxPacketSize) voidsetPort(int port) voidsetUdpBindAddress(String udpBindAddress) voidsetUdpBroadcast(boolean udpBroadcast) voidsetUdpMaxDiscovery(int udpMaxDiscovery) voidsetUdpMulticast(String udpMulticast) voidsetUdpMulticastTTL(int udpMulticastTTL) voidsetUdpPeriod(int udpPeriod) voidsetUdpPort(int udpPort) voidsetUdpReuseAddr(boolean udpReuseAddr) voidvoidsetUseHostname(boolean preferHostname) voidstopped()Closes TCP Transporter.io.datatree.PromisevoidudpPacketReceived(String sender, String host, int port) voidunableToSend(String nodeID, LinkedList<byte[]> packets, Throwable cause) Methods inherited from class Transporter
broadcastNodeConnected, broadcastNodeDisconnected, broadcastNodeUpdated, broadcastTransporterConnected, broadcastTransporterDisconnected, channel, checkTimeouts, connected, connected, createPingPacket, error, getAddress, getAllNodeIDs, getCpuUsage, getDescriptor, getHeartbeatInterval, getHeartbeatTimeout, getInstanceID, getLastHeartbeatTime, getNamespace, getOfflineTimeout, getPrefix, getSerializer, getSubscriptionTimeout, isDebug, isDebugHeartbeats, isOnline, isPreferHostname, processReceivedMessage, processReceivedMessage, publish, received, scheduleInfoPacket, sendClosePacket, sendDataPacket, sendDiscoverPacket, sendErrorPacket, sendEventPacket, sendHeartbeatPacket, sendInfoPacket, sendPongPacket, sendRequestPacket, setDebug, setDebugHeartbeats, setNamespace, setOfflineTimeout, setPreferHostname, setPrefix, setSerializer, setSubscriptionTimeout, started, subscribe, updateNodeInfoMethods inherited from class MoleculerComponent
getBroker, getLogger, getName
-
Field Details
-
PACKET_EVENT_ID
protected static final byte PACKET_EVENT_ID- See Also:
-
PACKET_REQUEST_ID
protected static final byte PACKET_REQUEST_ID- See Also:
-
PACKET_RESPONSE_ID
protected static final byte PACKET_RESPONSE_ID- See Also:
-
PACKET_PING_ID
protected static final byte PACKET_PING_ID- See Also:
-
PACKET_PONG_ID
protected static final byte PACKET_PONG_ID- See Also:
-
PACKET_GOSSIP_REQ_ID
protected static final byte PACKET_GOSSIP_REQ_ID- See Also:
-
PACKET_GOSSIP_RSP_ID
protected static final byte PACKET_GOSSIP_RSP_ID- See Also:
-
PACKET_GOSSIP_HELLO_ID
protected static final byte PACKET_GOSSIP_HELLO_ID- See Also:
-
port
protected int portTCP port (used by the Transporter and Gossiper services). A port number of zero will let the system pick up an ephemeral port in a bind operation. -
gossipPeriod
protected int gossipPeriodGossiping period time, in SECONDS. -
maxConnections
protected int maxConnectionsMax number of keep-alive connections (-1 = unlimited, 0 = disable keep-alive connections). -
maxPacketSize
protected int maxPacketSizeMax enable packet size (BYTES). -
urls
List of URLs ("tcp://host:port/nodeID" or "host:port/nodeID" or "host/nodeID"), when UDP discovery is disabled. -
useHostname
protected boolean useHostnameUse hostnames instead of IP addresses As the DHCP environment is dynamic, any later attempt to use IPs instead hostnames would most likely yield false results. Therefore, use hostnames if you are using DHCP. -
udpPort
protected int udpPortUDP broadcast/multicast port -
udpBindAddress
UDP bind address (null = autodetect) -
udpPeriod
protected int udpPeriodUDP broadcast/multicast period in SECONDS -
udpReuseAddr
protected boolean udpReuseAddrResuse addresses -
udpMaxDiscovery
protected int udpMaxDiscoveryMaximum number of outgoing multicast packets (0 = runs forever) -
udpMulticast
UDP multicast address of automatic discovery service. -
udpMulticastTTL
protected int udpMulticastTTLTTL of UDP multicast packets -
udpBroadcast
protected boolean udpBroadcastUse UDP broadcast WITH UDP multicast (false = use UDP multicast only) -
rnd
Random generator. -
gossiperTimer
Cancelable timer of gossiper -
reader
Socket reader -
writer
Socket writer -
locator
UDP broadcaster -
currentPort
protected int currentPortCurrent TCP port -
startupHost
The local hostname specified in the "urls" parameter. -
cachedDescriptor
Current node descriptor -
timestamp
Current node descriptor's timestamp -
cachedHelloMessage
protected byte[] cachedHelloMessage
-
-
Constructor Details
-
TcpTransporter
public TcpTransporter()Start TCP Transporter in "zero config" mode, with automatic UDP service discovery. -
TcpTransporter
Start TCP Transporter in full TCP mode, without UDP discovery. Valid URL syntax is "tcp://host:port/nodeID" or "host:port/nodeID".- Parameters:
urls- list of urls of ALL nodes
-
TcpTransporter
Start TCP Transporter in full TCP mode, without UDP discovery. Loads node list from an URL (as an http or file resource in JSON/XML/YAML format). Sample JSON file:
{
"nodes":[
"tcp://host1:port1/nodeID1",
"tcp://host2:port2/nodeID2",
"tcp://host3:port3/nodeID3"
]
}- Parameters:
urlList- an URL, where the peer configuration's JSON is located- Throws:
Exception- URL format exception or any I/O error
-
-
Method Details
-
connect
public void connect()- Specified by:
connectin classTransporter
-
disconnect
protected void disconnect() -
reconnect
protected void reconnect() -
stopped
public void stopped()Closes TCP Transporter.- Specified by:
stoppedin interfaceMoleculerLifecycle- Overrides:
stoppedin classTransporter
-
received
public void received(byte packetID, byte[] packet) -
unableToSend
-
sendDisconnectPacket
protected void sendDisconnectPacket()- Overrides:
sendDisconnectPacketin classTransporter
-
subscribe
- Specified by:
subscribein classTransporter
-
publish
- Specified by:
publishin classTransporter
-
serialize
-
getDescriptor
-
udpPacketReceived
-
processGossipHello
protected void processGossipHello(io.datatree.Tree data) -
registerAsNewNode
-
sendGossipRequest
protected io.datatree.Tree sendGossipRequest()Create and send a Gossip request packet.- Returns:
- created Gossip request (used for testing)
-
sendGossipToRandomEndpoint
-
processGossipRequest
-
processGossipResponse
-
generateGossipHello
public byte[] generateGossipHello()Create Gossip HELLO packet. Hello message is invariable, so we can cache it.- Returns:
- created "hello" request
-
broadcastInfoPacket
public void broadcastInfoPacket()- Overrides:
broadcastInfoPacketin classTransporter
-
setHeartbeatInterval
public void setHeartbeatInterval(int heartbeatInterval) - Overrides:
setHeartbeatIntervalin classTransporter
-
setHeartbeatTimeout
public void setHeartbeatTimeout(int heartbeatTimeout) - Overrides:
setHeartbeatTimeoutin classTransporter
-
getUrls
-
setUrls
-
getPort
public int getPort() -
setPort
public void setPort(int port) -
getGossipPeriod
public int getGossipPeriod() -
setGossipPeriod
public void setGossipPeriod(int gossipPeriod) -
getMaxConnections
public int getMaxConnections() -
setMaxConnections
public void setMaxConnections(int maxConnections) -
getMaxPacketSize
public int getMaxPacketSize() -
setMaxPacketSize
public void setMaxPacketSize(int maxPacketSize) -
getCurrentPort
public int getCurrentPort() -
isUseHostname
public boolean isUseHostname() -
setUseHostname
public void setUseHostname(boolean preferHostname) -
isUdpBroadcast
public boolean isUdpBroadcast() -
setUdpBroadcast
public void setUdpBroadcast(boolean udpBroadcast) -
getUdpMaxDiscovery
public int getUdpMaxDiscovery() -
setUdpMaxDiscovery
public void setUdpMaxDiscovery(int udpMaxDiscovery) -
getUdpPeriod
public int getUdpPeriod() -
setUdpPeriod
public void setUdpPeriod(int udpPeriod) -
isUdpReuseAddr
public boolean isUdpReuseAddr() -
setUdpReuseAddr
public void setUdpReuseAddr(boolean udpReuseAddr) -
getUdpPort
public int getUdpPort() -
setUdpPort
public void setUdpPort(int udpPort) -
getUdpBindAddress
-
setUdpBindAddress
-
getUdpMulticast
-
setUdpMulticast
-
getUdpMulticastTTL
public int getUdpMulticastTTL() -
setUdpMulticastTTL
public void setUdpMulticastTTL(int udpMulticastTTL)
-