kinsky.async

Clojure core.async support in kinsky. See https://github.com/pyr/kinsky for example usage.

channel-listener

(channel-listener ch)

A rebalance-listener compatible call back which produces all events onto a channel.

close-poller

(close-poller ctl out driver)

consumer

(consumer config)(consumer config kd vd)

Build an async consumer. Yields a vector of record and control channels.

Arguments config ks and vs work as for kinsky.client/consumer. The config map must be a valid consumer configuration map and may contain the following additional keys:

  • :input-buffer: Maximum backlog of control channel messages.
  • :output-buffer: Maximum queued consumed messages.
  • :timeout: Poll interval
  • :topic : Automatically subscribe to this topic before launching loop
  • :duplex?: yields a duplex channel instead of a vector of two chans

The resulting control channel is used to interact with the consumer driver and expects map payloads, whose operation is determined by their :op key. The following commands are handled:

  • :subscribe: {:op :subscribe :topic "foo"} subscribe to a topic.
  • :unsubscribe: {:op :unsubscribe}, unsubscribe from all topics.
  • :partitions-for: {:op :partitions-for :topic "foo"}, yield partition info for the given topic. If a :response key is present, produce the response there instead of on the record channel.
  • commit: {:op :commit} commit offsets, an optional :topic-offsets key may be present for specific offset committing.
  • :pause: {:op :pause} pause consumption.
  • :resume: {:op :resume} resume consumption.
  • :calllback: {:op :callback :callback (fn [d ch])} Execute a function of 2 arguments, the consumer driver and output channel, on a woken up driver.
  • :stop: {:op :stop} stop and close consumer.

The resulting output channel will emit payloads with as maps containing a :type key where :type may be:

  • :record: A consumed record.
  • :exception: An exception raised
  • :rebalance: A rebalance event.
  • :eof: The end of this stream.
  • :partitions: The result of a :partitions-for operation.
  • :woken-up: A notification that the consumer was woken up.

default-input-buffer

Default amount of messages buffered on control channels.

default-output-buffer

Default amount of messages buffered on the record channel.

default-timeout

Default timeout, by default we poll at 100ms intervals.

duplex

(duplex up down)(duplex up down indexed)

exception?

(exception? e)

Test if a value is a subclass of Exception

get-within

(get-within ctl tm)

make-consumer

(make-consumer config kd vd)

Build a consumer, with or without deserializers

make-producer

(make-producer config ks vs)

Build a producer, with or without serializers

poller-ctl

(poller-ctl ctl out driver timeout)

poller-thread

(poller-thread driver inbuf outbuf timeout)

Poll for next messages, catching exceptions and yielding them.

producer

(producer config)(producer config ks)(producer config ks vs)

Build a producer, reading records to send from a channel.

Arguments config ks and vs work as for kinsky.client/producer. The config map must be a valid consumer configuration map and may contain the following additional keys:

  • :input-buffer: Maximum backlog of control channel messages.
  • :output-buffer: Maximum queued consumed messages.

Yields a vector of two values [in out], an input channel and an output channel.

The resulting input channel is used to interact with the producer driver and expects map payloads, whose operation is determined by their :op key. The following commands are handled:

  • :record: {:op :record :topic "foo"} send a record out, also performed when no :op key is present.
  • :flush: {:op :flush}, flush unsent messages.
  • :partitions-for: {:op :partitions-for :topic "foo"}, yield partition info for the given topic. If a :response key is present, produce the response there instead of on the record channel.
  • :close: {:op :close}, close the producer.

The resulting output channel will emit payloads with as maps containing a :type key where :type may be:

  • :exception: An exception raised
  • :partitions: The result of a :partitions-for operation.
  • :eof: The producer is closed.

record-xform

Rely on the standard transducer but indicate that this is a record.

safe-poll

(safe-poll ctl recs out driver timeout)