ZeroMQ Support

Spring Integration provides components to support ZeroMQ communication in the application. The implementation is based on the well-supported Java API of the JeroMQ library. All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making interactions with these components lock-free and thread-safe.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.0.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-zeromq:6.0.13"

ZeroMQ Proxy

The ZeroMqProxy is a Spring-friendly wrapper for the built-in ZMQ.proxy() function. It encapsulates socket lifecycles and thread management. The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API. Alongside with the standard ZContext it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER. This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy. See ZeroMqProxy.Type for details.

The ZeroMqProxy implements SmartLifecycle to create, bind and configure the sockets and to start ZMQ.proxy() in a dedicated thread from an Executor (if any). The binding for frontend and backend sockets is done over the tcp:// protocol onto all of the available network interfaces with the provided ports. Otherwise, they are bound to random ports which can be obtained later via the respective getFrontendPort() and getBackendPort() API methods.

The control socket is exposed as a SocketType.PAIR with an inter-thread transport on the "inproc://" + beanName + ".control" address; it can be obtained via getControlAddress(). It should be used with the same application from another SocketType.PAIR socket to send ZMQ.PROXY_TERMINATE, ZMQ.PROXY_PAUSE and/or ZMQ.PROXY_RESUME commands. The ZeroMqProxy performs a ZMQ.PROXY_TERMINATE command when stop() is called for its lifecycle to terminate the ZMQ.proxy() loop and close all the bound sockets gracefully.

The setExposeCaptureSocket(boolean) option causes this component to bind an additional inter-thread socket with SocketType.PUB to capture and publish all the communication between the frontend and backend sockets as it states with ZMQ.proxy() implementation. This socket is bound to the "inproc://" + beanName + ".capture" address and doesn’t expect any specific subscription for filtering.

The frontend and backend sockets can be customized with additional properties, such as read/write timeout or security. This customization is available through setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) and setBackendSocketConfigurer(Consumer<ZMQ.Socket>) callbacks, respectively.

The ZeroMqProxy could be provided as simple bean like this:

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

All the client nodes should connect to the host of this proxy via tcp:// and use the respective port of their interest.

ZeroMQ Message Channel

The ZeroMqChannel is a SubscribableChannel which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction. It can work in a PUB/SUB mode (defaults to PUSH/PULL); it can also be used as a local inter-thread channel (uses PAIR sockets) - the connectUrl is not provided in this case. In distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. The connection url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy. For convenience, the channel could be supplied with the ZeroMqProxy instance instead of connection string, if it is configured in the same application as the proxy.

Both sending and receiving sockets are managed in their own dedicated threads making this channel concurrency-friendly. This way we can publish and consume to/from a ZeroMqChannel from different threads without synchronization.

By default, the ZeroMqChannel uses an EmbeddedJsonHeadersMessageMapper to (de)serialize the Message (including headers) from/to byte[] using a Jackson JSON processor. This logic can be configured via setMessageMapper(BytesMessageMapper).

Sending and receiving sockets can be customized for any options (read/write timeout, security etc.) via respective setSendSocketConfigurer(Consumer<ZMQ.Socket>) and setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) callbacks.

The internal logic of the ZeroMqChannel is based on the reactive streams via Project Reactor Flux and Mono operators. This provides easier threading control and allows lock-free concurrent publication and consumption to/from the channel. Local PUB/SUB logic is implemented as a Flux.publish() operator to allow all of the local subscribers to this channel to receive the same published message, as distributed subscribers to the PUB socket.

The following is a simple example of a ZeroMqChannel configuration:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ Inbound Channel Adapter

The ZeroMqMessageProducer is a MessageProducerSupport implementation with reactive semantics. It constantly reads the data from a ZeroMQ socket in a non-blocking manner and publishes the messages to an infinite Flux which is subscribed to by a FluxMessageChannel or explicitly in the start() method, if the output channel is not reactive. When no data are received on the socket, a consumeDelay (defaults to 1 second) is applied before the next read attempt.

Only SocketType.PAIR, SocketType.PULL and SocketType.SUB are supported by the ZeroMqMessageProducer. This component can connect to the remote socket or bind onto TCP protocol with the provided or random port. The actual port can be obtained via getBoundPort() after this component is started and ZeroMQ socket is bound. The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) callback.

If the receiveRaw option is set to true, a ZMsg, consumed from the socket, is sent as is in the payload of the produced Message: it’s up to the downstream flow to parse and convert the ZMsg. Otherwise, an InboundMessageMapper is used to convert the consumed data into a Message. If the received ZMsg is multi-frame, the first frame is treated as the ZeroMqHeaders.TOPIC header this ZeroMQ message was published to.

With SocketType.SUB, the ZeroMqMessageProducer uses the provided topics option for subscriptions; defaults to subscribe to all. Subscriptions can be adjusted at runtime using subscribeToTopics() and unsubscribeFromTopics() @ManagedOperation s.

Here is a sample of ZeroMqMessageProducer configuration:

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ Outbound Channel Adapter

The ZeroMqMessageHandler is a ReactiveMessageHandler implementation to produce publish messages into a ZeroMQ socket. Only SocketType.PAIR, SocketType.PUSH and SocketType.PUB are supported. The ZeroMqMessageHandler only supports connecting the ZeroMQ socket; binding is not supported. When the SocketType.PUB is used, the topicExpression is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (SocketType.SUB) must receive the topic frame first before parsing the actual data. When the payload of the request message is a ZMsg, no conversion or topic extraction is performed: the ZMsg is sent into a socket as is and it is not destroyed for possible further reuse. Otherwise, an OutboundMessageMapper<byte[]> is used to convert a request message (or just its payload) into a ZeroMQ frame to publish. By default, a ConvertingBytesMessageMapper is used supplied with a ConfigurableCompositeMessageConverter. The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) callback.

Here is a sample of ZeroMqMessageHandler configuration:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL Support

The spring-integration-zeromq provide a convenient Java DSL fluent API via ZeroMq factory and IntegrationComponentSpec implementations for the components mentioned above.

This is a sample of Java DSL for ZeroMqChannel:

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

The Inbound Channel Adapter for ZeroMQ Java DSL is:

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

The Outbound Channel Adapter for ZeroMQ Java DSL is:

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}