kinsky.async
Clojure core.async support in kinsky. See https://github.com/pyr/kinsky for example usage.
channel-listener
(channel-listener ch)A rebalance-listener compatible call back which produces all events onto a channel.
consumer
(consumer config)(consumer config kd vd)Build an async consumer. Yields a vector of record and control channels.
Arguments config ks and vs work as for kinsky.client/consumer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer: Maximum backlog of control channel messages.:output-buffer: Maximum queued consumed messages.:timeout: Poll interval:topic: Automatically subscribe to this topic before launching loop:duplex?: yields a duplex channel instead of a vector of two chans
The resulting control channel is used to interact with the consumer driver and expects map payloads, whose operation is determined by their :op key. The following commands are handled:
:subscribe:{:op :subscribe :topic "foo"}subscribe to a topic.:unsubscribe:{:op :unsubscribe}, unsubscribe from all topics.:partitions-for:{:op :partitions-for :topic "foo"}, yield partition info for the given topic. If a:responsekey is present, produce the response there instead of on the record channel.commit:{:op :commit}commit offsets, an optional:topic-offsetskey may be present for specific offset committing.:pause:{:op :pause}pause consumption.:resume:{:op :resume}resume consumption.:calllback:{:op :callback :callback (fn [d ch])}Execute a function of 2 arguments, the consumer driver and output channel, on a woken up driver.:stop:{:op :stop}stop and close consumer.
The resulting output channel will emit payloads with as maps containing a :type key where :type may be:
:record: A consumed record.:exception: An exception raised:rebalance: A rebalance event.:eof: The end of this stream.:partitions: The result of a:partitions-foroperation.:woken-up: A notification that the consumer was woken up.
poller-thread
(poller-thread driver inbuf outbuf timeout)Poll for next messages, catching exceptions and yielding them.
producer
(producer config)(producer config ks)(producer config ks vs)Build a producer, reading records to send from a channel.
Arguments config ks and vs work as for kinsky.client/producer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer: Maximum backlog of control channel messages.:output-buffer: Maximum queued consumed messages.
Yields a vector of two values [in out], an input channel and an output channel.
The resulting input channel is used to interact with the producer driver and expects map payloads, whose operation is determined by their :op key. The following commands are handled:
:record:{:op :record :topic "foo"}send a record out, also performed when no:opkey is present.:flush:{:op :flush}, flush unsent messages.:partitions-for:{:op :partitions-for :topic "foo"}, yield partition info for the given topic. If a:responsekey is present, produce the response there instead of on the record channel.:close:{:op :close}, close the producer.
The resulting output channel will emit payloads with as maps containing a :type key where :type may be:
:exception: An exception raised:partitions: The result of a:partitions-foroperation.:eof: The producer is closed.