pykafka.balancedconsumer

class pykafka.balancedconsumer.BalancedConsumer(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False)

Bases: object

A self-balancing consumer for Kafka that uses ZooKeeper to communicate with other balancing consumers.

Maintains a single instance of SimpleConsumer, periodically using the consumer rebalancing algorithm to reassign partitions to this SimpleConsumer.

__init__(topic, cluster, consumer_group, fetch_message_max_bytes=1048576, num_consumer_fetchers=1, auto_commit_enable=False, auto_commit_interval_ms=60000, queued_max_messages=2000, fetch_min_bytes=1, fetch_error_backoff_ms=500, fetch_wait_max_ms=100, offsets_channel_backoff_ms=1000, offsets_commit_max_retries=5, auto_offset_reset=-2, consumer_timeout_ms=-1, rebalance_max_retries=5, rebalance_backoff_ms=2000, zookeeper_connection_timeout_ms=6000, zookeeper_connect='127.0.0.1:2181', zookeeper=None, auto_start=True, reset_offset_on_start=False, post_rebalance_callback=None, use_rdkafka=False, compacted_topic=False)

Create a BalancedConsumer instance

Parameters:
  • topic (pykafka.topic.Topic) – The topic this consumer should consume
  • cluster (pykafka.cluster.Cluster) – The cluster to which this consumer should connect
  • consumer_group (bytes) – The name of the consumer group this consumer should join.
  • fetch_message_max_bytes (int) – The number of bytes of messages to attempt to fetch with each fetch request
  • num_consumer_fetchers (int) – The number of workers used to make FetchRequests
  • auto_commit_enable (bool) – If true, periodically commit to kafka the offset of messages already fetched by this consumer. This also requires that consumer_group is not None.
  • auto_commit_interval_ms (int) – The frequency (in milliseconds) at which the consumer’s offsets are committed to kafka. This setting is ignored if auto_commit_enable is False.
  • queued_max_messages (int) – The maximum number of messages buffered for consumption in the internal pykafka.simpleconsumer.SimpleConsumer
  • fetch_min_bytes (int) – The minimum amount of data (in bytes) that the server should return for a fetch request. If insufficient data is available, the request will block until sufficient data is available.
  • fetch_error_backoff_ms (int) – UNUSED. See pykafka.simpleconsumer.SimpleConsumer.
  • fetch_wait_max_ms (int) – The maximum amount of time (in milliseconds) that the server will block before answering a fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes.
  • offsets_channel_backoff_ms (int) – Backoff time to retry failed offset commits and fetches.
  • offsets_commit_max_retries (int) – The number of times the offset commit worker should retry before raising an error.
  • auto_offset_reset (pykafka.common.OffsetType) – What to do if an offset is out of range. This setting indicates how to reset the consumer’s internal offset counter when an OffsetOutOfRangeError is encountered.
  • consumer_timeout_ms (int) – Amount of time (in milliseconds) the consumer may spend without messages available for consumption before returning None.
  • rebalance_max_retries (int) – The number of times the rebalance should retry before raising an error.
  • rebalance_backoff_ms (int) – Backoff time (in milliseconds) between retries during rebalance.
  • zookeeper_connection_timeout_ms (int) – The maximum time (in milliseconds) that the consumer waits while establishing a connection to zookeeper.
  • zookeeper_connect (str) – Comma-separated (ip1:port1,ip2:port2) strings indicating the zookeeper nodes to which to connect.
  • zookeeper (kazoo.client.KazooClient) – A KazooClient connected to a Zookeeper instance. If provided, zookeeper_connect is ignored.
  • auto_start (bool) – Whether the consumer should begin communicating with zookeeper after __init__ is complete. If false, communication can be started with start().
  • reset_offset_on_start (bool) – Whether the consumer should reset its internal offset counter to self._auto_offset_reset and commit that offset immediately upon starting up
  • post_rebalance_callback (function) – A function to be called when a rebalance is in progress. This function should accept three arguments: the pykafka.balancedconsumer.BalancedConsumer instance that just completed its rebalance, a dict of partitions that it owned before the rebalance, and a dict of partitions it owns after the rebalance. These dicts map partition ids to the most recently known offsets for those partitions. This function can optionally return a dictionary mapping partition ids to offsets. If it does, the consumer will reset its offsets to the supplied values before continuing consumption. Note that the BalancedConsumer is in a poorly defined state at the time this callback runs, so that accessing its properties (such as held_offsets or partitions) might yield confusing results. Instead, the callback should really rely on the provided partition-id dicts, which are well-defined.
  • use_rdkafka (bool) – Use librdkafka-backed consumer if available
  • compacted_topic (bool) – Set to read from a compacted topic. Forces consumer to use less stringent message ordering logic because compacted topics do not provide offsets in strict incrementing order.
__iter__()

Yield an infinite stream of messages until the consumer times out

__weakref__

list of weak references to the object (if defined)

_add_partitions(partitions)

Add partitions to the zookeeper registry for this consumer.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to add.
_add_self()

Register this consumer in zookeeper.

_build_watch_callback(fn, proxy)

Return a function that’s safe to use as a ChildrenWatch callback

Fixes the issue from https://github.com/Parsely/pykafka/issues/345

_decide_partitions(participants, consumer_id=None)

Decide which partitions belong to this consumer.

Uses the consumer rebalancing algorithm described here https://kafka.apache.org/documentation/#impl_consumerrebalance

It is very important that the participants array is sorted, since this algorithm runs on each consumer and indexes into the same array. The same array index operation must return the same result on each consumer.

Parameters:
  • participants (Iterable of bytes) – Sorted list of ids of all other consumers in this consumer group.
  • consumer_id – The ID of the consumer for which to generate a partition assignment. Defaults to self._consumer_id
_get_held_partitions()

Build a set of partitions zookeeper says we own

_get_internal_consumer(partitions=None, start=True)

Instantiate a SimpleConsumer for internal use.

If there is already a SimpleConsumer instance held by this object, disable its workers and mark it for garbage collection before creating a new one.

_get_participants()

Use zookeeper to get the other consumers of this topic.

Returns:A sorted list of the ids of other consumers of this consumer’s topic
_partitions

Convenient shorthand for set of partitions internally held

_path_from_partition(p)

Given a partition, return its path in zookeeper.

_path_self

Path where this consumer should be registered in zookeeper

_raise_worker_exceptions()

Raises exceptions encountered on worker threads

_rebalance()

Start the rebalancing process for this consumer

This method is called whenever a zookeeper watch is triggered.

_remove_partitions(partitions)

Remove partitions from the zookeeper registry for this consumer.

Parameters:partitions (Iterable of pykafka.partition.Partition) – The partitions to remove.
_set_watches()

Set watches in zookeeper that will trigger rebalances.

Rebalances should be triggered whenever a broker, topic, or consumer znode is changed in zookeeper. This ensures that the balance of the consumer group remains up-to-date with the current state of the cluster.

_setup_internal_consumer(partitions=None, start=True)

Instantiate an internal SimpleConsumer instance

_setup_zookeeper(zookeeper_connect, timeout)

Open a connection to a ZooKeeper host.

Parameters:
  • zookeeper_connect (str) – The ‘ip:port’ address of the zookeeper node to which to connect.
  • timeout (int) – Connection timeout (in milliseconds)
_update_member_assignment()

Decide and assign new partitions for this consumer

commit_offsets()

Commit offsets for this consumer’s partitions

Uses the offset commit/fetch API

consume(block=True)

Get one message from the consumer

Parameters:block (bool) – Whether to block while waiting for a message
held_offsets

Return a map from partition id to held offset for each partition

partitions

A list of the partitions that this consumer consumes

reset_offsets(partition_offsets=None)

Reset offsets for the specified partitions

Issue an OffsetRequest for each partition and set the appropriate returned offset in the consumer’s internal offset counter.

Parameters:partition_offsets (Sequence of tuples of the form (pykafka.partition.Partition, int)) – (partition, timestamp_or_offset) pairs to reset where partition is the partition for which to reset the offset and timestamp_or_offset is EITHER the timestamp of the message whose offset the partition should have OR the new offset the partition should have

NOTE: If an instance of timestamp_or_offset is treated by kafka as an invalid offset timestamp, this function directly sets the consumer’s internal offset counter for that partition to that instance of timestamp_or_offset. On the next fetch request, the consumer attempts to fetch messages starting from that offset. See the following link for more information on what kafka treats as a valid offset timestamp: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest

start()

Open connections and join a consumer group.

stop()

Close the zookeeper connection and stop consuming.

This method should be called as part of a graceful shutdown process.

topic

The topic this consumer consumes