kinsky.client
Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
->offset-metadata
(->offset-metadata {:keys [offset metadata]})Yield a OffsetAndMetadata from a clojure map.
->record
(->record payload)Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
->topic-partition
(->topic-partition {:keys [topic partition]})Yield a TopicPartition from a clojure map.
consumer
(consumer config)(consumer config callback)(consumer config kdeserializer vdeserializer)(consumer config callback kdeserializer vdeserializer)Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
consumer->driver
(consumer->driver consumer)(consumer->driver consumer run-signal)Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.
The consumer driver implements the following protocols:
- ConsumerDriver
- MetadataDriver
clojure.lang.IDeref:derefto access underlying KafkaConsumer instance.
consumer-records->data
(consumer-records->data crs)Yield the clojure representation of topic
ConsumerDriver
protocol
Driver interface for consumers
members
commit!
(commit! this)(commit! this topic-offsets)Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:
{:topic topic
:partition partition
:offset offset
:metadata metadata}
The topic and partition tuple must be unique across the whole list.
pause!
(pause! this)(pause! this topic-partitions)Pause consumption.
poll!
(poll! this timeout)Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.
{:partitions [["t" 0] ["t" 1]]
:topics ["t"]
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
position!
(position! this)(position! this topic-partition)Get the offset of the next record that will be fetched (if a record with that offset exists).
resume!
(resume! this topic-partitions)Resume consumption.
seek!
(seek! this)(seek! this topic-partition offset)Overrides the fetch offsets that the consumer will use on the next poll
stop!
(stop! this)(stop! this timeout)Stop consumption.
subscribe!
(subscribe! this topics)(subscribe! this topics listener)Subscribe to a topic or list of topics. The topics argument can be:
- A simple string when subscribing to a single topic
- A regex pattern to subscribe to matching topics
- A sequence of strings
The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.
When a function is supplied, it will be called on relance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.
subscription
(subscription this)Currently assigned topics
unsubscribe!
(unsubscribe! this)Unsubscribe from currently subscribed topics.
wake-up!
(wake-up! this)Safely wake-up a consumer which may be blocking during polling.
deserializer
(deserializer f)Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
MetadataDriver
protocol
Common properties for all drivers
members
partitions-for
(partitions-for this topic)Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:
{:topic "t"
:partition 0
:isr [{:host "x" :id 0 :port 9092}]
:leader {:host "x" :id 0 :port 9092}
:replicas [{:host "x" :id 0 :port 9092}]
opts->props
(opts->props opts)Kakfa configs are now maps of strings to strings. Morph an arbitrary clojure map into this representation.
partition-info->data
(partition-info->data pi)Yield a clojure representation of a partition-info.
producer
(producer config)(producer config serializer)(producer config kserializer vserializer)Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
producer->driver
(producer->driver producer)Yield a driver from a Kafka Producer. The producer driver implements the following protocols:
- ProducerDriver
- MetadataDriver
clojure.lang.IDeref:derefto access underlying KafkaProducer instance.
ProducerDriver
protocol
Driver interface for producers
members
flush!
(flush! this)Ensure that produced messages are flushed.
send!
(send! this record)(send! this topic k v)Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: :key, :topic, :partition, and :value.
rebalance-listener
(rebalance-listener callback)Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
record-xform
A transducer to explode grouped records into individual entities.
When sucessful, the output of kinsky.client/poll! takes the form:
{:partitions [["t" 0] ["t" 1]]
:topics #{"t"}
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:
({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
...)
safe-poll!
(safe-poll! consumer timeout)Implementation of poll which disregards wake-up exceptions
serializer
(serializer f)Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
topic-partition->data
(topic-partition->data tp)Yield a clojure representation of a topic-partition