kinsky.async
Clojure core.async support in kinsky. See https://github.com/pyr/kinsky for example usage.
channel-listener
(channel-listener ch)
A rebalance-listener compatible call back which produces all events onto a channel.
consumer
(consumer config)
(consumer config kd vd)
Build an async consumer. Yields a vector of record and control channels.
Arguments config ks and vs work as for kinsky.client/consumer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer
: Maximum backlog of control channel messages.:output-buffer
: Maximum queued consumed messages.:timeout
: Poll interval:topic
: Automatically subscribe to this topic before launching loop:duplex?
: yields a duplex channel instead of a vector of two chans
The resulting control channel is used to interact with the consumer driver and expects map payloads, whose operation is determined by their :op
key. The following commands are handled:
:subscribe
:{:op :subscribe :topic "foo"}
subscribe to a topic.:unsubscribe
:{:op :unsubscribe}
, unsubscribe from all topics.:partitions-for
:{:op :partitions-for :topic "foo"}
, yield partition info for the given topic. If a:response
key is present, produce the response there instead of on the record channel.commit
:{:op :commit}
commit offsets, an optional:topic-offsets
key may be present for specific offset committing.:pause
:{:op :pause}
pause consumption.:resume
:{:op :resume}
resume consumption.:calllback
:{:op :callback :callback (fn [d ch])}
Execute a function of 2 arguments, the consumer driver and output channel, on a woken up driver.:stop
:{:op :stop}
stop and close consumer.
The resulting output channel will emit payloads with as maps containing a :type
key where :type
may be:
:record
: A consumed record.:exception
: An exception raised:rebalance
: A rebalance event.:eof
: The end of this stream.:partitions
: The result of a:partitions-for
operation.:woken-up
: A notification that the consumer was woken up.
poller-thread
(poller-thread driver inbuf outbuf timeout)
Poll for next messages, catching exceptions and yielding them.
producer
(producer config)
(producer config ks)
(producer config ks vs)
Build a producer, reading records to send from a channel.
Arguments config ks and vs work as for kinsky.client/producer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer
: Maximum backlog of control channel messages.:output-buffer
: Maximum queued consumed messages.
Yields a vector of two values [in out]
, an input channel and an output channel.
The resulting input channel is used to interact with the producer driver and expects map payloads, whose operation is determined by their :op
key. The following commands are handled:
:record
:{:op :record :topic "foo"}
send a record out, also performed when no:op
key is present.:flush
:{:op :flush}
, flush unsent messages.:partitions-for
:{:op :partitions-for :topic "foo"}
, yield partition info for the given topic. If a:response
key is present, produce the response there instead of on the record channel.:close
:{:op :close}
, close the producer.
The resulting output channel will emit payloads with as maps containing a :type
key where :type
may be:
:exception
: An exception raised:partitions
: The result of a:partitions-for
operation.:eof
: The producer is closed.