kinsky.client
Small clojure shim on top of the Kafka client API See https://github.com/pyr/kinsky for example usage.
->offset-metadata
(->offset-metadata {:keys [offset metadata]})
Yield a OffsetAndMetadata from a clojure map.
->record
(->record payload)
Build a producer record from a clojure map. Leave ProducerRecord instances untouched.
->topic-partition
(->topic-partition {:keys [topic partition]})
Yield a TopicPartition from a clojure map.
consumer
(consumer config)
(consumer config callback)
(consumer config kdeserializer vdeserializer)
(consumer config callback kdeserializer vdeserializer)
Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. If deserializers are provided, use them otherwise expect deserializer class name in the config map.
consumer->driver
(consumer->driver consumer)
(consumer->driver consumer run-signal)
Given a consumer-driver and an optional callback to callback to call when stopping, yield a consumer driver.
The consumer driver implements the following protocols:
- ConsumerDriver
- MetadataDriver
clojure.lang.IDeref
:deref
to access underlying KafkaConsumer instance.
consumer-records->data
(consumer-records->data crs)
Yield the clojure representation of topic
ConsumerDriver
protocol
Driver interface for consumers
members
commit!
(commit! this)
(commit! this topic-offsets)
Commit offsets for a consumer. The topic-offsets argument must be a list of maps of the form:
{:topic topic
:partition partition
:offset offset
:metadata metadata}
The topic and partition tuple must be unique across the whole list.
pause!
(pause! this)
(pause! this topic-partitions)
Pause consumption.
poll!
(poll! this timeout)
Poll for new messages. Timeout in ms. The result is a data representation of a ConsumerRecords instance.
{:partitions [["t" 0] ["t" 1]]
:topics ["t"]
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
position!
(position! this)
(position! this topic-partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
resume!
(resume! this topic-partitions)
Resume consumption.
seek!
(seek! this)
(seek! this topic-partition offset)
Overrides the fetch offsets that the consumer will use on the next poll
stop!
(stop! this)
(stop! this timeout)
Stop consumption.
subscribe!
(subscribe! this topics)
(subscribe! this topics listener)
Subscribe to a topic or list of topics. The topics argument can be:
- A simple string when subscribing to a single topic
- A regex pattern to subscribe to matching topics
- A sequence of strings
The optional listener argument is either a callback function or an implementation of ConsumerRebalanceListener.
When a function is supplied, it will be called on relance events with a map representing the event, see kinsky.client/rebalance-listener for details on the map format.
subscription
(subscription this)
Currently assigned topics
unsubscribe!
(unsubscribe! this)
Unsubscribe from currently subscribed topics.
wake-up!
(wake-up! this)
Safely wake-up a consumer which may be blocking during polling.
deserializer
(deserializer f)
Yield an instance of a deserializer from a function of two arguments: a topic and the payload to deserialize.
MetadataDriver
protocol
Common properties for all drivers
members
partitions-for
(partitions-for this topic)
Retrieve partition ownership information for a topic. The result is a data representation of a PartitionInfo list. The structure for a partition info map is:
{:topic "t"
:partition 0
:isr [{:host "x" :id 0 :port 9092}]
:leader {:host "x" :id 0 :port 9092}
:replicas [{:host "x" :id 0 :port 9092}]
opts->props
(opts->props opts)
Kakfa configs are now maps of strings to strings. Morph an arbitrary clojure map into this representation.
partition-info->data
(partition-info->data pi)
Yield a clojure representation of a partition-info.
producer
(producer config)
(producer config serializer)
(producer config kserializer vserializer)
Create a producer from a configuration and optional serializers. If a single serializer is provided, it will be used for both keys and values. If none are provided, the configuration is expected to hold serializer class names.
producer->driver
(producer->driver producer)
Yield a driver from a Kafka Producer. The producer driver implements the following protocols:
- ProducerDriver
- MetadataDriver
clojure.lang.IDeref
:deref
to access underlying KafkaProducer instance.
ProducerDriver
protocol
Driver interface for producers
members
flush!
(flush! this)
Ensure that produced messages are flushed.
send!
(send! this record)
(send! this topic k v)
Produce a record on a topic. When using the single arity version, a map with the following possible keys is expected: :key
, :topic
, :partition
, and :value
.
rebalance-listener
(rebalance-listener callback)
Wrap a callback to yield an instance of a Kafka ConsumerRebalanceListener. The callback is a function of one argument, a map containing the following keys: :event, :topic and :partition. :event will be either :assigned or :revoked.
record-xform
A transducer to explode grouped records into individual entities.
When sucessful, the output of kinsky.client/poll! takes the form:
{:partitions [["t" 0] ["t" 1]]
:topics #{"t"}
:count 2
:by-partition {["t" 0] [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}]
["t" 1] [{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}
:by-topic {"t" [{:key "k0"
:offset 1
:partition 0
:topic "t"
:value "v0"}
{:key "k1"
:offset 1
:partition 1
:topic "t"
:value "v1"}]}}
To make working with the output channel easier, this transducer morphs these messages into a list of distinct records:
({:key "k0" :offset 1 :partition 0 :topic "t" :value "v0"}
{:key "k1" :offset 1 :partition 1 :topic "t" :value "v1"}
...)
safe-poll!
(safe-poll! consumer timeout)
Implementation of poll which disregards wake-up exceptions
serializer
(serializer f)
Yield an instance of a serializer from a function of two arguments: a topic and the payload to serialize.
topic-partition->data
(topic-partition->data tp)
Yield a clojure representation of a topic-partition