kinsky.client

Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.

->deserializer

(->deserializer x)

->offset-metadata

(->offset-metadata {:keys [offset metadata]})

Yield a OffsetAndMetadata from a clojure map.

->record

(->record payload)

Build a producer record from a clojure map. Leave ProducerRecord instances untouched.

->serializer

(->serializer x)

->topic-partition

(->topic-partition {:keys [topic partition]})

Yield a TopicPartition from a clojure map.

->topics

(->topics topics)

Yield a valid object for subscription

consumer

(consumer config)(consumer config callback)(consumer config kdeserializer vdeserializer)(consumer config callback kdeserializer vdeserializer)

Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.

consumer->driver

(consumer->driver consumer)(consumer->driver consumer run-signal)

Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.

The consumer driver implements the following protocols:

consumer-records->data

(consumer-records->data crs)

Yield the clojure representation of topic

ConsumerDriver

protocol

Driver interface for consumers

members

commit!

(commit! this)(commit! this topic-offsets)

Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:

{:topic     topic
 :partition partition
 :offset    offset
 :metadata  metadata}

The topic and partition tuple must be unique across the whole list.

pause!

(pause! this)(pause! this topic-partitions)

Pause consumption.

poll!

(poll! this timeout)

Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.

{:partitions [["t" 0] ["t" 1]]
 :topics     ["t"]
 :count      2
 :by-partition {["t" 0] [{:key       "k0"
                            :offset    1
                            :partition 0
                            :topic     "t"
                            :value     "v0"}]
                ["t" 1] [{:key       "k1"
                            :offset    1
                            :partition 1
                            :topic     "t"
                            :value     "v1"}]}
 :by-topic      {"t" [{:key       "k0"
                         :offset    1
                         :partition 0
                         :topic     "t"
                         :value     "v0"}
                        {:key       "k1"
                         :offset    1
                         :partition 1
                         :topic     "t"
                         :value     "v1"}]}}

position!

(position! this)(position! this topic-partition)

Get the offset of the next record that will be fetched (if a record with that offset exists).

resume!

(resume! this topic-partitions)

Resume consumption.

seek!

(seek! this)(seek! this topic-partition offset)

Overrides the fetch offsets that the consumer will use on the next poll

stop!

(stop! this)(stop! this timeout)

Stop consumption.

subscribe!

(subscribe! this topics)(subscribe! this topics listener)

Subscribe to a topic or list of topics. The topics argument can be:

  • A simple string when subscribing to a single topic
  • A regex pattern to subscribe to matching topics
  • A sequence of strings

The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.

When a function is supplied, it will be called on relance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.

subscription

(subscription this)

Currently assigned topics

unsubscribe!

(unsubscribe! this)

Unsubscribe from currently subscribed topics.

wake-up!

(wake-up! this)

Safely wake-up a consumer which may be blocking during polling.

cr->data

(cr->data cr)

Yield a clojure representation of a consumer record

deserializer

(deserializer f)

Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.

deserializers

edn-deserializer

(edn-deserializer)(edn-deserializer reader-opts)

Deserialize EDN.

edn-serializer

(edn-serializer)

Serialize as EDN.

GenericDriver

protocol

members

close!

(close! this)(close! this timeout)

Close this driver

json-deserializer

(json-deserializer)

Deserialize JSON.

json-serializer

(json-serializer)

Serialize as JSON through cheshire.

keyword-deserializer

(keyword-deserializer)

Deserialize a string and then keywordize it.

keyword-serializer

(keyword-serializer)

Serialize keywords to strings, useful for keys.

MetadataDriver

protocol

Common properties for all drivers

members

partitions-for

(partitions-for this topic)

Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:

{:topic     "t"
 :partition 0
 :isr       [{:host "x" :id 0 :port 9092}]
 :leader    {:host "x" :id 0 :port 9092}
 :replicas  [{:host "x" :id 0 :port 9092}]

node->data

(node->data n)

Yield a clojure representation of a node.

opts->props

(opts->props opts)

Kakfa configs are now maps of strings to strings. Morph an arbitrary clojure map into this representation.

partition-info->data

(partition-info->data pi)

Yield a clojure representation of a partition-info.

producer

(producer config)(producer config serializer)(producer config kserializer vserializer)

Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.

producer->driver

(producer->driver producer)

Yield a driver from a Kafka Producer. The producer driver implements the following protocols:

ProducerDriver

protocol

Driver interface for producers

members

flush!

(flush! this)

Ensure that produced messages are flushed.

send!

(send! this record)(send! this topic k v)

Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: :key, :topic, :partition, and :value.

rebalance-listener

(rebalance-listener callback)

Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.

record-xform

A transducer to explode grouped records into individual entities.

When sucessful, the output of kinsky.client/poll! takes the form:

{:partitions   [["t" 0] ["t" 1]]
 :topics       #{"t"}
 :count        2
 :by-partition {["t" 0] [{:key       "k0"
                            :offset    1
                            :partition 0
                            :topic     "t"
                            :value     "v0"}]
                ["t" 1] [{:key       "k1"
                            :offset    1
                            :partition 1
                            :topic     "t"
                            :value     "v1"}]}
 :by-topic      {"t" [{:key       "k0"
                         :offset    1
                         :partition 0
                         :topic     "t"
                         :value     "v0"}
                        {:key       "k1"
                         :offset    1
                         :partition 1
                         :topic     "t"
                         :value     "v1"}]}}

To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:

({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
 {:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
 ...)

safe-poll!

(safe-poll! consumer timeout)

Implementation of poll which disregards wake-up exceptions

serializer

(serializer f)

Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.

serializers

string-deserializer

(string-deserializer)

Kafka’s own string deserializer

string-serializer

(string-serializer)

Kafka’s own string serializer.

topic-partition->data

(topic-partition->data tp)

Yield a clojure representation of a topic-partition