public class KafkaTransporter extends Transporter
KafkaTransporter kafka = new KafkaTransporter();
kafka.setUrls(new String[] { "192.168.51.29:9092" });
kafka.setDebug(true);
kafka.setProducerProperty("session.timeout.ms", "30000");
ServiceBroker broker = ServiceBroker.builder().transporter(kafka).build();
// broker.createService(new Service("test") {...});
broker.start();
Required dependency:| Modifier and Type | Class and Description |
|---|---|
protected static class |
KafkaTransporter.KafkaPoller |
| Modifier and Type | Field and Description |
|---|---|
protected java.util.Properties |
consumerProperties |
protected java.util.concurrent.ExecutorService |
executor
Executor of reader loop for incoming messages
|
protected KafkaTransporter.KafkaPoller |
poller |
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> |
producer |
protected java.util.Properties |
producerProperties |
protected boolean |
shutDownThreadPools |
protected java.lang.String[] |
urls |
checkTimeoutTimer, counterTransporterPacketsReceivedBytes, counterTransporterPacketsReceivedTotal, counterTransporterPacketsSentBytes, counterTransporterPacketsSentTotal, debug, debugHeartbeats, disconnectChannel, discoverBroadcastChannel, discoverChannel, eventbus, eventChannel, gaugeTransitConnected, heartbeatChannel, heartbeatInterval, heartbeatTimeout, heartBeatTimer, infoBroadcastChannel, infoChannel, infoScheduled, infoScheduledAt, instanceID, lastReceivedMessageAt, metrics, monitor, namespace, nodeID, nodes, offlineTimeout, PACKET_DISCONNECT, PACKET_DISCOVER, PACKET_EVENT, PACKET_HEARTBEAT, PACKET_INFO, PACKET_PING, PACKET_PONG, PACKET_REQUEST, PACKET_RESPONSE, pingChannel, pongChannel, preferHostname, prefix, protocolVersion, registry, requestChannel, responseChannel, scheduler, serializer, serviceInvoker, subscriptionTimeout, uidGenerator, usingJsonSerializerbroker, logger, nameMOLECULER_CACHER_CLEAN_TIME, MOLECULER_CACHER_CLEAN_TIME_DESC, MOLECULER_CACHER_CLEAN_TOTAL, MOLECULER_CACHER_CLEAN_TOTAL_DESC, MOLECULER_CACHER_DEL_TIME, MOLECULER_CACHER_DEL_TIME_DESC, MOLECULER_CACHER_DEL_TOTAL, MOLECULER_CACHER_DEL_TOTAL_DESC, MOLECULER_CACHER_EXPIRED_TOTAL, MOLECULER_CACHER_EXPIRED_TOTAL_DESC, MOLECULER_CACHER_FOUND_TOTAL, MOLECULER_CACHER_FOUND_TOTAL_DESC, MOLECULER_CACHER_GET_TIME, MOLECULER_CACHER_GET_TIME_DESC, MOLECULER_CACHER_GET_TOTAL, MOLECULER_CACHER_GET_TOTAL_DESC, MOLECULER_CACHER_SET_TIME, MOLECULER_CACHER_SET_TIME_DESC, MOLECULER_CACHER_SET_TOTAL, MOLECULER_CACHER_SET_TOTAL_DESC, MOLECULER_CIRCUIT_BREAKER_HALF_OPENED_ACTIVE, MOLECULER_CIRCUIT_BREAKER_HALF_OPENED_ACTIVE_DESC, MOLECULER_CIRCUIT_BREAKER_OPENED_ACTIVE, MOLECULER_CIRCUIT_BREAKER_OPENED_ACTIVE_DESC, MOLECULER_CIRCUIT_BREAKER_OPENED_TOTAL, MOLECULER_CIRCUIT_BREAKER_OPENED_TOTAL_DESC, MOLECULER_EVENT_BROADCAST_TOTAL, MOLECULER_EVENT_BROADCAST_TOTAL_DESC, MOLECULER_EVENT_BROADCASTLOCAL_TOTAL, MOLECULER_EVENT_BROADCASTLOCAL_TOTAL_DESC, MOLECULER_EVENT_EMIT_TOTAL, MOLECULER_EVENT_EMIT_TOTAL_DESC, MOLECULER_EVENT_RECEIVED_ACTIVE, MOLECULER_EVENT_RECEIVED_ACTIVE_DESC, MOLECULER_EVENT_RECEIVED_ERROR_TOTAL, MOLECULER_EVENT_RECEIVED_ERROR_TOTAL_DESC, MOLECULER_EVENT_RECEIVED_TIME, MOLECULER_EVENT_RECEIVED_TIME_DESC, MOLECULER_EVENT_RECEIVED_TOTAL, MOLECULER_EVENT_RECEIVED_TOTAL_DESC, MOLECULER_EXECUTOR, MOLECULER_REQUEST_ACTIVE, MOLECULER_REQUEST_ACTIVE_DESC, MOLECULER_REQUEST_ERROR_TOTAL, MOLECULER_REQUEST_ERROR_TOTAL_DESC, MOLECULER_REQUEST_LEVELS, MOLECULER_REQUEST_LEVELS_DESC, MOLECULER_REQUEST_RETRY_ATTEMPTS_TOTAL, MOLECULER_REQUEST_RETRY_ATTEMPTS_TOTAL_DESC, MOLECULER_REQUEST_TIME, MOLECULER_REQUEST_TIME_DESC, MOLECULER_REQUEST_TIMEOUT_TOTAL, MOLECULER_REQUEST_TIMEOUT_TOTAL_DESC, MOLECULER_REQUEST_TOTAL, MOLECULER_REQUEST_TOTAL_DESC, MOLECULER_SCHEDULER, MOLECULER_TRANSIT_CONNECTED, MOLECULER_TRANSIT_CONNECTED_DESC, MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL, MOLECULER_TRANSIT_ORPHAN_RESPONSE_TOTAL_DESC, MOLECULER_TRANSIT_REQUESTS_ACTIVE, MOLECULER_TRANSIT_REQUESTS_ACTIVE_DESC, MOLECULER_TRANSIT_STREAMS_RECEIVE_ACTIVE, MOLECULER_TRANSIT_STREAMS_RECEIVE_ACTIVE_DESC, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_BYTES, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_BYTES_DESC, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_TOTAL, MOLECULER_TRANSPORTER_PACKETS_RECEIVED_TOTAL_DESC, MOLECULER_TRANSPORTER_PACKETS_SENT_BYTES, MOLECULER_TRANSPORTER_PACKETS_SENT_BYTES_DESC, MOLECULER_TRANSPORTER_PACKETS_SENT_TOTAL, MOLECULER_TRANSPORTER_PACKETS_SENT_TOTAL_DESC| Constructor and Description |
|---|
KafkaTransporter() |
KafkaTransporter(java.lang.String... urls) |
| Modifier and Type | Method and Description |
|---|---|
void |
connect() |
protected void |
disconnect() |
protected void |
error(java.lang.Throwable cause)
Any I/O error occurred.
|
java.util.Properties |
getConsumerProperties() |
java.util.concurrent.ExecutorService |
getExecutor() |
java.util.Properties |
getProducerProperties() |
java.lang.String[] |
getUrls() |
boolean |
isShutDownThreadPools() |
void |
publish(java.lang.String channel,
io.datatree.Tree message) |
protected void |
reconnect(java.lang.Throwable cause) |
void |
setConsumerProperties(java.util.Properties consumerProperties) |
void |
setConsumerProperty(java.lang.String key,
java.lang.String value) |
void |
setExecutor(java.util.concurrent.ExecutorService executor) |
void |
setProducerProperties(java.util.Properties producerProperties) |
void |
setProducerProperty(java.lang.String key,
java.lang.String value) |
void |
setShutDownThreadPools(boolean shutDownThreadPools) |
void |
setUrls(java.lang.String[] urls) |
void |
stopped()
Closes transporter.
|
io.datatree.Promise |
subscribe(java.lang.String channel) |
broadcastInfoPacket, broadcastNodeConnected, broadcastNodeDisconnected, broadcastNodeUpdated, broadcastTransporterConnected, broadcastTransporterDisconnected, channel, checkTimeouts, connected, connected, createPingPacket, getAddress, getAllNodeIDs, getCpuUsage, getDescriptor, getHeartbeatInterval, getHeartbeatTimeout, getInstanceID, getLastHeartbeatTime, getNamespace, getOfflineTimeout, getPrefix, getSerializer, getSubscriptionTimeout, isDebug, isDebugHeartbeats, isOnline, isPreferHostname, processReceivedMessage, processReceivedMessage, publish, received, scheduleInfoPacket, sendClosePacket, sendDataPacket, sendDisconnectPacket, sendDiscoverPacket, sendErrorPacket, sendEventPacket, sendHeartbeatPacket, sendInfoPacket, sendPongPacket, sendRequestPacket, setDebug, setDebugHeartbeats, setHeartbeatInterval, setHeartbeatTimeout, setNamespace, setOfflineTimeout, setPreferHostname, setPrefix, setSerializer, setSubscriptionTimeout, started, subscribe, updateNodeInfogetBroker, getLogger, getNameprotected java.util.Properties producerProperties
protected java.util.Properties consumerProperties
protected java.lang.String[] urls
protected org.apache.kafka.clients.producer.KafkaProducer<byte[],byte[]> producer
protected KafkaTransporter.KafkaPoller poller
protected java.util.concurrent.ExecutorService executor
protected boolean shutDownThreadPools
public KafkaTransporter()
public KafkaTransporter(java.lang.String... urls)
public void connect()
connect in class Transporterprotected void disconnect()
protected void reconnect(java.lang.Throwable cause)
protected void error(java.lang.Throwable cause)
Transportererror in class Transportercause - I/O errorpublic void stopped()
stopped in interface MoleculerLifecyclestopped in class Transporterpublic io.datatree.Promise subscribe(java.lang.String channel)
subscribe in class Transporterpublic void publish(java.lang.String channel,
io.datatree.Tree message)
publish in class Transporterpublic void setProducerProperty(java.lang.String key,
java.lang.String value)
public void setConsumerProperty(java.lang.String key,
java.lang.String value)
public java.lang.String[] getUrls()
public void setUrls(java.lang.String[] urls)
public java.util.Properties getProducerProperties()
public void setProducerProperties(java.util.Properties producerProperties)
public java.util.Properties getConsumerProperties()
public void setConsumerProperties(java.util.Properties consumerProperties)
public java.util.concurrent.ExecutorService getExecutor()
public void setExecutor(java.util.concurrent.ExecutorService executor)
public boolean isShutDownThreadPools()
public void setShutDownThreadPools(boolean shutDownThreadPools)