pykafka.client

Author: Keith Bourgoin, Emmett Butler

class pykafka.client.KafkaClient(hosts='127.0.0.1:9092', zookeeper_hosts=None, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, use_greenlets=False, exclude_internal_topics=True, source_address='', ssl_config=None, broker_version='0.9.0')

Bases: object

A high-level pythonic client for Kafka

NOTE: KafkaClient holds weak references to Topic instances via pykafka.cluster.TopicDict. To perform operations directly on these topics, such as examining their partition lists, client code must hold a strong reference to the topics it cares about. If client code doesn’t need to examine Topic instances directly, no strong references are necessary.

Notes on Zookeeper: Zookeeper is used by kafka and its clients to store several types of information, including broker host strings, partition ownerships, and depending on your kafka version, consumer offsets. The kafka-console-* tools rely on zookeeper to discover brokers - this is why you can’t directly specify a broker to these tools and are required to give a zookeeper host string. In theory, this insulates you as a user of the console tools from having to care about which specific brokers in your kafka cluster might be accessible at any given time.

In pykafka, the paradigm is slightly different, though the above method is also supported. When you instantiate a KafkaClient, you can specify either hosts or zookeeper_hosts. hosts is a comma-separated list of brokers to which to connect, and zookeeper_hosts is a zookeeper connection string. If you specify zookeeper_hosts, it overrides hosts. Thus you can create a KafkaClient that is connected to your kafka cluster by providing either a zookeeper or a broker connection string.

As for why the specific components do and don’t require knowledge of the zookeeper cluster, there are some different reasons. SimpleConsumer, since it does not perform consumption balancing, does not actually require access to zookeeper at all. Since kafka 0.8.2, consumer offset information is stored by the kafka broker itself instead of the zookeeper cluster. The BalancedConsumer, by contrast, requires explicit knowledge of the zookeeper cluster because it performs consumption balancing. Zookeeper stores the information about which consumers own which partitions and provides a central repository of that information for all consumers to read. The BalancedConsumer cannot do what it does without direct access to zookeeper for this reason. Note that the ManagedBalancedConsumer, which works with kafka 0.9 and above, removes this dependency on zookeeper from the balanced consumption process by storing partition ownership information in the kafka broker.

The Producer is allowed to send messages to whatever partitions it wants. In pykafka, by default the partition for each message is chosen randomly to provide an even distribution of messages across partitions. The producer actually doesn’t do anything that requires information stored in zookeeper, and since the connection to the kafka cluster is handled by the above-mentioned logic in KafkaClient, it doesn’t need the zookeeper host string at all.

__init__(hosts='127.0.0.1:9092', zookeeper_hosts=None, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, use_greenlets=False, exclude_internal_topics=True, source_address='', ssl_config=None, broker_version='0.9.0')

Create a connection to a Kafka cluster.

Documentation for source_address can be found at https://docs.python.org/2/library/socket.html#socket.create_connection

Parameters:
  • hosts (str) – Comma-separated list of kafka hosts to which to connect. If ssl_config is specified, the ports specified here are assumed to be SSL ports
  • zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect. If not None, this argument takes precedence over hosts
  • socket_timeout_ms (int) – The socket timeout (in milliseconds) for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout (in milliseconds) when reading responses for offset commit and offset fetch requests.
  • use_greenlets (bool) – Whether to perform parallel operations on greenlets instead of OS threads
  • exclude_internal_topics (bool) – Whether messages from internal topics (specifically, the offsets topic) should be exposed to the consumer.
  • source_address (str ‘host:port’) – The source address for socket connections
  • ssl_config (pykafka.connection.SslConfig) – Config object for SSL connection
  • broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
__weakref__

list of weak references to the object (if defined)

update_cluster()

Update known brokers and topics.

Updates each Topic and Broker, adding new ones as found, with current metadata from the cluster.