PyKafka Usage Guide¶
This document contains prose explanations and examples of common patterns of PyKafka usage.
Setting the initial offset¶
This section applies to both the SimpleConsumer and the BalancedConsumer.
When a PyKafka consumer starts fetching messages from a topic, its starting position in the log is defined by two keyword arguments: auto_offset_reset and reset_offset_on_start.
consumer = topic.get_simple_consumer( consumer_group="mygroup", auto_offset_reset=OffsetType.EARLIEST, reset_offset_on_start=False )
The starting offset is also affected by whether or not the Kafka cluster holds any previously committed offsets for each consumer group/topic/partition set. In this document, a “new” group/topic/partition set is one for which Kafka does not hold any previously committed offsets, and an “existing” set is one for which Kafka does.
The consumer’s initial behavior can be summed up by these rules:
- For any new group/topic/partitions, message consumption will start from auto_offset_reset. This is true independent of the value of reset_offset_on_start.
- For any existing group/topic/partitions, assuming reset_offset_on_start=False, consumption will start from the offset immediately following the last committed offset (if the last committed offset was 4, consumption starts at 5). If reset_offset_on_start=True, consumption starts from auto_offset_reset. If there is no committed offset, the group/topic/partition is considered new.
Put another way: if reset_offset_on_start=True, consumption will start from auto_offset_reset. If it is False, where consumption starts is dependent on the existence of committed offsets for the group/topic/partition in question.
# assuming "mygroup" has no committed offsets # starts from the latest available offset consumer = topic.get_simple_consumer( consumer_group="mygroup", auto_offset_reset=OffsetType.LATEST ) consumer.consume() consumer.commit_offsets() # starts from the last committed offset consumer_2 = topic.get_simple_consumer( consumer_group="mygroup" ) # starts from the earliest available offset consumer_3 = topic.get_simple_consumer( consumer_group="mygroup", auto_offset_reset=OffsetType.EARLIEST, reset_offset_on_start=True )
This behavior is based on the auto.offset.reset section of the Kafka documentation.