Interface StreamReceiver<K,V extends Record<K,?>>
- Type Parameters:
K
- Stream key and Stream field type.V
- Stream value type.
Once created, a StreamReceiver
can subscribe to a Redis Stream and consume incoming records
.
Consider a Flux
of Record
infinite. Cancelling the Subscription
terminates eventually background polling. Records are converted using key and value
serializers
to support various serialization strategies.
StreamReceiver
supports three modes of stream consumption:
- Standalone
- Using a
Consumer
with externalacknowledge
- Using a
Consumer
with auto-acknowledge
ReadOffset
, StreamReceiver
applies an individual strategy to obtain the next ReadOffset
:
Standalone
ReadOffset.from(String)
Offset using a particular record Id: Start with the given offset and use the last seenrecord Id
.ReadOffset.lastConsumed()
Last consumed: Start with the latest offset ($
) and use the last seenrecord Id
.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset ($
) for subsequent reads.
Using
Consumer
ReadOffset.from(String)
Offset using a particular record Id: Start with the given offset and use the last seenrecord Id
.ReadOffset.lastConsumed()
Last consumed: Start with the last consumed record by the consumer (>
) and use the last consumed record by the consumer (>
) for subsequent reads.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset ($
) for subsequent reads.
ReadOffset.latest()
bears the chance of dropped records as records can arrive in the time
during polling is suspended. Use recordId's as offset or ReadOffset.lastConsumed()
to minimize the chance of
record loss.
StreamReceiver
propagates errors during stream reads and deserialization as terminal error signal by default.
Configuring a resume function
allows conditional resumption by
dropping the record or by propagating the error to terminate the subscription.
See the following example code how to use StreamReceiver
:
ReactiveRedisConnectionFactory factory = …; StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory); Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream")); recordFlux.doOnNext(record -> …);
- Since:
- 2.2
- Author:
- Mark Paluch, Eddie McDaniel
- See Also:
-
Nested Class Summary
Modifier and TypeInterfaceDescriptionstatic class
StreamReceiver.StreamReceiverOptions<K,
V extends Record<K, ?>> Options forStreamReceiver
.static class
StreamReceiver.StreamReceiverOptionsBuilder<K,
V extends Record<K, ?>> Builder forStreamReceiver.StreamReceiverOptions
. -
Method Summary
Modifier and TypeMethodDescriptioncreate
(ReactiveRedisConnectionFactory connectionFactory) static <K,
V extends Record<K, ?>>
StreamReceiver<K,V> create
(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) Create a newStreamReceiver
givenReactiveRedisConnectionFactory
andStreamReceiver.StreamReceiverOptions
.reactor.core.publisher.Flux<V>
receive
(Consumer consumer, StreamOffset<K> streamOffset) reactor.core.publisher.Flux<V>
receive
(StreamOffset<K> streamOffset) reactor.core.publisher.Flux<V>
receiveAutoAck
(Consumer consumer, StreamOffset<K> streamOffset)
-
Method Details
-
create
static StreamReceiver<String,MapRecord<String, createString, String>> (ReactiveRedisConnectionFactory connectionFactory) - Parameters:
connectionFactory
- must not be null.- Returns:
- the new
StreamReceiver
.
-
create
static <K,V extends Record<K, StreamReceiver<K,?>> V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) Create a newStreamReceiver
givenReactiveRedisConnectionFactory
andStreamReceiver.StreamReceiverOptions
.- Parameters:
connectionFactory
- must not be null.options
- must not be null.- Returns:
- the new
StreamReceiver
.
-
receive
Starts a Redis Stream consumer that consumesrecords
from thestream
. Records are consumed from Redis and delivered on the returnedFlux
when requests are made on the Flux. The receiver is closed when the returnedFlux
terminates.Every record must be acknowledged using
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)
- Parameters:
streamOffset
- the stream along its offset.- Returns:
- Flux of inbound
Record
s. - See Also:
-
receiveAutoAck
Starts a Redis Stream consumer that consumesrecords
from thestream
. Records are consumed from Redis and delivered on the returnedFlux
when requests are made on the Flux. The receiver is closed when the returnedFlux
terminates.Every record is acknowledged when received.
- Parameters:
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.- Returns:
- Flux of inbound
Record
s. - See Also:
-
receive
Starts a Redis Stream consumer that consumesrecords
from thestream
. Records are consumed from Redis and delivered on the returnedFlux
when requests are made on the Flux. The receiver is closed when the returnedFlux
terminates.Every record must be acknowledged using
ReactiveStreamOperations.acknowledge(Object, String, String...)
after processing.- Parameters:
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.- Returns:
- Flux of inbound
Record
s. - See Also:
-