API Reference

kafka

kafka.codec.snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32768)[source]

Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library

The block size (xerial_blocksize) controls how frequent the blocking occurs 32k is the default in the xerial library.

The format winds up being
Header
16 bytes
Block1 len Block1 data Blockn len
Blockn data
snappy bytes
BE int32 snappy bytes BE int32

It is important to not that the blocksize is the amount of uncompressed data presented to snappy at each block, whereas the blocklen is the number of bytes that will be present in the stream, that is the length will always be <= blocksize.

class kafka.common.BrokerMetadata(nodeId, host, port)
host

Alias for field number 1

nodeId

Alias for field number 0

port

Alias for field number 2

class kafka.common.FetchRequest(topic, partition, offset, max_bytes)
max_bytes

Alias for field number 3

offset

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.FetchResponse(topic, partition, error, highwaterMark, messages)
error

Alias for field number 2

highwaterMark

Alias for field number 3

messages

Alias for field number 4

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.KafkaMessage(topic, partition, offset, key, value)
key

Alias for field number 3

offset

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

value

Alias for field number 4

class kafka.common.Message(magic, attributes, key, value)
attributes

Alias for field number 1

key

Alias for field number 2

magic

Alias for field number 0

value

Alias for field number 3

class kafka.common.MetadataRequest(topics)
topics

Alias for field number 0

class kafka.common.MetadataResponse(brokers, topics)
brokers

Alias for field number 0

topics

Alias for field number 1

class kafka.common.OffsetAndMessage(offset, message)
message

Alias for field number 1

offset

Alias for field number 0

class kafka.common.OffsetCommitRequest(topic, partition, offset, metadata)
metadata

Alias for field number 3

offset

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.OffsetCommitResponse(topic, partition, error)
error

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.OffsetFetchRequest(topic, partition)
partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.OffsetFetchResponse(topic, partition, offset, metadata, error)
error

Alias for field number 4

metadata

Alias for field number 3

offset

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.OffsetRequest(topic, partition, time, max_offsets)
max_offsets

Alias for field number 3

partition

Alias for field number 1

time

Alias for field number 2

topic

Alias for field number 0

class kafka.common.OffsetResponse(topic, partition, error, offsets)
error

Alias for field number 2

offsets

Alias for field number 3

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.PartitionMetadata(topic, partition, leader, replicas, isr, error)
error

Alias for field number 5

isr

Alias for field number 4

leader

Alias for field number 2

partition

Alias for field number 1

replicas

Alias for field number 3

topic

Alias for field number 0

class kafka.common.ProduceRequest(topic, partition, messages)
messages

Alias for field number 2

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.ProduceResponse(topic, partition, error, offset)
error

Alias for field number 2

offset

Alias for field number 3

partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.TopicAndPartition(topic, partition)
partition

Alias for field number 1

topic

Alias for field number 0

class kafka.common.TopicMetadata(topic, error, partitions)
error

Alias for field number 1

partitions

Alias for field number 2

topic

Alias for field number 0

class kafka.conn.KafkaConnection(host, port, timeout=120)[source]

A socket connection to a single Kafka broker

This class is _not_ thread safe. Each call to send must be followed by a call to recv in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id.

Arguments:

host: the host name or IP address of a kafka broker port: the port number the kafka broker is listening on timeout: default 120. The socket timeout for sending and receiving data

in seconds. None means no timeout, so a request can block forever.
close()[source]

Shutdown and close the connection socket

copy()[source]

Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again return a new KafkaConnection object

recv(request_id)[source]

Get a response packet from Kafka

Arguments:
request_id: can be any int (only used for debug logging...)
Returns:
str: Encoded kafka packet response from server
reinit()[source]

Re-initialize the socket connection close current socket (if open) and start a fresh connection raise ConnectionError on error

send(request_id, payload)[source]

Send a request to Kafka

Arguments::
request_id (int): can be any int (used only for debug logging...) payload: an encoded kafka packet (see KafkaProtocol)
kafka.conn.collect_hosts(hosts, randomize=True)[source]

Collects a comma-separated set of hosts (host:port) and optionally randomize the returned list.

Context manager to commit/rollback consumer offsets.

class kafka.context.OffsetCommitContext(consumer)[source]

Provides commit/rollback semantics around a SimpleConsumer.

Usage assumes that auto_commit is disabled, that messages are consumed in batches, and that the consuming process will record its own successful processing of each message. Both the commit and rollback operations respect a “high-water mark” to ensure that last unsuccessfully processed message will be retried.

Example:

consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()

while some_condition:
    with OffsetCommitContext(consumer) as context:
        messages = consumer.get_messages(count, block=False)

        for partition, message in messages:
            if can_process(message):
                context.mark(partition, message.offset)
            else:
                break

        if not context:
            sleep(delay)

These semantics allow for deferred message processing (e.g. if can_process compares message time to clock time) and for repeated processing of the last unsuccessful message (until some external error is resolved).

commit()[source]

Commit this context’s offsets:

  • If the high-water mark has moved, commit up to and position the consumer at the high-water mark.
  • Otherwise, reset to the consumer to the initial offsets.
commit_partition_offsets(partition_offsets)[source]

Commit explicit partition/offset pairs.

handle_out_of_range()[source]

Handle out of range condition by seeking to the beginning of valid ranges.

This assumes that an out of range doesn’t happen by seeking past the end of valid ranges – which is far less likely.

mark(partition, offset)[source]

Set the high-water mark in the current context.

In order to know the current partition, it is helpful to initialize the consumer to provide partition info via:

consumer.provide_partition_info()
rollback()[source]

Rollback this context:

  • Position the consumer at the initial offsets.
update_consumer_offsets(partition_offsets)[source]

Update consumer offsets to explicit positions.

class kafka.protocol.KafkaProtocol[source]

Class to encapsulate all of the protocol encoding/decoding. This class does not have any state associated with it, it is purely for organization.

classmethod decode_fetch_response(data)[source]

Decode bytes to a FetchResponse

Arguments:
data: bytes to decode
classmethod decode_metadata_response(data)[source]

Decode bytes to a MetadataResponse

Arguments:
data: bytes to decode
classmethod decode_offset_commit_response(data)[source]

Decode bytes to an OffsetCommitResponse

Arguments:
data: bytes to decode
classmethod decode_offset_fetch_response(data)[source]

Decode bytes to an OffsetFetchResponse

Arguments:
data: bytes to decode
classmethod decode_offset_response(data)[source]

Decode bytes to an OffsetResponse

Arguments:
data: bytes to decode
classmethod decode_produce_response(data)[source]

Decode bytes to a ProduceResponse

Arguments:
data: bytes to decode
classmethod encode_fetch_request(client_id, correlation_id, payloads=None, max_wait_time=100, min_bytes=4096)[source]

Encodes some FetchRequest structs

Arguments:

client_id: string correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before

returning the response
classmethod encode_metadata_request(client_id, correlation_id, topics=None, payloads=None)[source]

Encode a MetadataRequest

Arguments:
client_id: string correlation_id: int topics: list of strings
classmethod encode_offset_commit_request(client_id, correlation_id, group, payloads)[source]

Encode some OffsetCommitRequest structs

Arguments:
client_id: string correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest
classmethod encode_offset_fetch_request(client_id, correlation_id, group, payloads)[source]

Encode some OffsetFetchRequest structs

Arguments:
client_id: string correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest
classmethod encode_produce_request(client_id, correlation_id, payloads=None, acks=1, timeout=1000)[source]

Encode some ProduceRequest structs

Arguments:

client_id: string correlation_id: int payloads: list of ProduceRequest acks: How “acky” you want the request to be

0: immediate response 1: written to disk by the leader 2+: waits for this many number of replicas to sync -1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas.
This is _not_ a socket timeout
kafka.protocol.create_gzip_message(payloads, key=None)[source]

Construct a Gzipped Message containing multiple Messages

The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.

Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka key: bytes, a key used for partition routing (optional)
kafka.protocol.create_message(payload, key=None)[source]

Construct a Message

Arguments:
payload: bytes, the payload to send to Kafka key: bytes, a key used for partition routing (optional)
kafka.protocol.create_message_set(messages, codec=0, key=None)[source]

Create a message set using the given codec.

If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, return a list containing a single codec-encoded message.

kafka.protocol.create_snappy_message(payloads, key=None)[source]

Construct a Snappy Message containing multiple Messages

The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka.

Arguments:
payloads: list(bytes), a list of payload to send be sent to Kafka key: bytes, a key used for partition routing (optional)
class kafka.util.ReentrantTimer(t, fn, *args, **kwargs)[source]

A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer)

Arguments:

t: timer interval in milliseconds fn: a callable to invoke args: tuple of args to be passed to function kwargs: keyword arguments to be passed to function
kafka.util.kafka_bytestring(s)[source]

Takes a string or bytes instance Returns bytes, encoding strings in utf-8 as necessary

kafka.consumer

class kafka.consumer.base.Consumer(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)[source]

Base class to be used by other consumers. Not to be used directly

This base class provides logic for

  • initialization and fetching metadata of partitions
  • Auto-commit logic
  • APIs for fetching pending message count
commit(partitions=None)[source]

Commit offsets for this consumer

Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them
pending(partitions=None)[source]

Gets the pending message count

Keyword Arguments:
partitions (list): list of partitions to check for, default is to check all
class kafka.consumer.kafka.KafkaConsumer(*topics, **configs)[source]

A simpler kafka consumer

# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1')
for m in kafka:
  print m

# Alternate interface: next()
print kafka.next()

# Alternate interface: batch iteration
while True:
  for m in kafka.fetch_messages():
    print m
  print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset management
kafka = KafkaConsumer('topic1', 'topic2',
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

# Infinite iteration
for m in kafka:
  process_message(m)
  kafka.task_done(m)

# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
  for m in kafka.fetch_messages():
    process_message(m)
    kafka.task_done(m)

messages (m) are namedtuples with attributes:

  • m.topic: topic name (str)
  • m.partition: partition number (int)
  • m.offset: message offset on topic-partition log (int)
  • m.key: key (bytes - can be None)
  • m.value: message (output of deserializer_class - default is raw bytes)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

commit()[source]

Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.

Note: this functionality requires server version >=0.8.1.1 See this wiki page.

configure(**configs)[source]

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

fetch_messages()[source]

Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class

Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy

Key configuration parameters:

  • fetch_message_max_bytes
  • fetch_max_wait_ms
  • fetch_min_bytes
  • deserializer_class
  • auto_offset_reset
get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)[source]

Request available fetch offsets for a single topic/partition

Arguments:

topic (str) partition (int) request_time_ms (int): Used to ask for all messages before a

certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.

max_num_offsets (int)

Returns:
offsets (list)
next()[source]

Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely

Note that this is also the method called internally during iteration:

for m in consumer:
    pass
offsets(group=None)[source]
Keyword Arguments:
group: Either “fetch”, “commit”, “task_done”, or “highwater”.
If no group specified, returns all groups.
Returns:
A copy of internal offsets struct
set_topic_partitions(*topics)[source]

Set the topic/partitions to consume Optionally specify offsets to start from

Accepts types:

  • str (utf-8): topic name (will consume all available partitions)

  • tuple: (topic, partition)

  • dict:
    • { topic: partition }
    • { topic: [partition list] }
    • { topic: (partition tuple,) }

Optionally, offsets can be specified directly:

  • tuple: (topic, partition, offset)
  • dict: { (topic, partition): offset, ... }

Example:

kafka = KafkaConsumer()

# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
task_done(message)[source]

Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()

class kafka.consumer.kafka.OffsetsStruct(fetch, highwater, commit, task_done)
commit

Alias for field number 2

fetch

Alias for field number 0

highwater

Alias for field number 1

task_done

Alias for field number 3

class kafka.consumer.multiprocess.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)[source]

A consumer implementation that consumes partitions for a topic in parallel using multiple processes

Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume
Keyword Arguments:

auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume

before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
num_procs: Number of processes to start for consuming messages.
The available partitions will be divided among these processes
partitions_per_proc: Number of partitions to be allocated per process
(overrides num_procs)

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_messages(count=1, block=True, timeout=10)[source]

Fetch the specified number of messages

Keyword Arguments:

count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. timeout: If block is True, the function will block for the specified

time (in seconds) until count messages is fetched. If None, it will block forever.
class kafka.consumer.simple.FetchContext(consumer, block, timeout)[source]

Class for managing the state of a consumer during fetch

class kafka.consumer.simple.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)[source]

A simple consumer implementation that consumes all/specified partitions for a topic

Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume
Keyword Arguments:

partitions: An optional list of partitions to consume the data from

auto_commit: default True. Whether or not to auto commit the offsets

auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit

fetch_size_bytes: number of bytes to request in a FetchRequest

buffer_size: default 4K. Initial number of bytes to tell kafka we
have available. This will double as needed.
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
available. None means no limit.
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no timeout, so it will wait forever.

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_messages(count=1, block=True, timeout=0.1)[source]

Fetch the specified number of messages

Keyword Arguments:

count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. timeout: If block is True, the function will block for the specified

time (in seconds) until count messages is fetched. If None, it will block forever.
provide_partition_info()[source]

Indicates that partition info must be returned by the consumer

seek(offset, whence)[source]

Alter the current offset in the consumer, similar to fseek

Arguments:

offset: how much to modify the offset whence: where to modify it from

  • 0 is relative to the earliest available offset (head)
  • 1 is relative to the current offset
  • 2 is relative to the latest known offset (tail)

kafka.partitioner

class kafka.partitioner.base.Partitioner(partitions)[source]

Base class for a partitioner

partition(key, partitions)[source]

Takes a string key and num_partitions as argument and returns a partition to be used for the message

Arguments:
partitions: The list of partitions is passed in every call. This
may look like an overhead, but it will be useful (in future) when we handle cases like rebalancing
class kafka.partitioner.hashed.HashedPartitioner(partitions)[source]

Implements a partitioner which selects the target partition based on the hash of the key

class kafka.partitioner.roundrobin.RoundRobinPartitioner(partitions)[source]

Implements a round robin partitioner which sends data to partitions in a round robin fashion

kafka.producer

class kafka.producer.base.Producer(client, async=False, req_acks=1, ack_timeout=1000, codec=None, batch_send=False, batch_send_every_n=20, batch_send_every_t=20)[source]

Base class to be used by producers

Arguments:

client: The Kafka client instance to use async: If set to true, the messages are sent asynchronously via another

thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR!
req_acks: A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement

batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout

send_messages(topic, partition, *msg)[source]

Helper method to send produce requests @param: topic, name of topic for produce request – type str @param: partition, partition number for produce request – type int @param: *msg, one or more message payloads – type bytes @returns: ResponseRequest returned by server raises on error

Note that msg type must be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like unicode_message.encode(‘utf-8’)

All messages produced via this method will set the message ‘key’ to Null

stop(timeout=1)[source]

Stop the producer. Optionally wait for the specified timeout before forcefully cleaning up.

class kafka.producer.keyed.KeyedProducer(client, partitioner=None, async=False, req_acks=1, ack_timeout=1000, codec=None, batch_send=False, batch_send_every_n=20, batch_send_every_t=20)[source]

A producer which distributes messages to partitions based on the key

Arguments:
client: The kafka client instance
Keyword Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement

batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout

class kafka.producer.simple.SimpleProducer(client, async=False, req_acks=1, ack_timeout=1000, codec=None, batch_send=False, batch_send_every_n=20, batch_send_every_t=20, random_start=True)[source]

A simple, round-robin producer. Each message goes to exactly one partition

Arguments:
client: The Kafka client instance to use
Keyword Arguments:
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks: A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement

batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout random_start: If true, randomize the initial partition which the

the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition