pykafka.cluster

class pykafka.cluster.Cluster(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')

Bases: object

A Cluster is a high-level abstraction of the collection of brokers and topics that makes up a real kafka cluster.

__init__(hosts, handler, socket_timeout_ms=30000, offsets_channel_socket_timeout_ms=10000, exclude_internal_topics=True, source_address='', zookeeper_hosts=None, ssl_config=None, broker_version='0.9.0')

Create a new Cluster instance.

Parameters:
  • hosts (str) – Comma-separated list of kafka hosts to which to connect.
  • zookeeper_hosts (str) – KazooClient-formatted string of ZooKeeper hosts to which to connect. If not None, this argument takes precedence over hosts
  • handler (pykafka.handlers.Handler) – The concurrency handler for network requests.
  • socket_timeout_ms (int) – The socket timeout (in milliseconds) for network requests
  • offsets_channel_socket_timeout_ms (int) – The socket timeout (in milliseconds) when reading responses for offset commit and offset fetch requests.
  • exclude_internal_topics (bool) – Whether messages from internal topics (specifically, the offsets topic) should be exposed to consumers.
  • source_address (str ‘host:port’) – The source address for socket connections
  • ssl_config (pykafka.connection.SslConfig) – Config object for SSL connection
  • broker_version (str) – The protocol version of the cluster being connected to. If this parameter doesn’t match the actual broker version, some pykafka features may not work properly.
__weakref__

list of weak references to the object (if defined)

_get_brokers_from_zookeeper(zk_connect)

Build a list of broker connection pairs from a ZooKeeper host

Parameters:zk_connect (str) – The ZooKeeper connect string of the instance to which to connect
_get_metadata(topics=None)

Get fresh cluster metadata from a broker.

_request_metadata(broker_connects, topics)

Request broker metadata from a set of brokers

Returns the result of the first successful metadata request

Parameters:broker_connects (Iterable of two-element sequences of the format (broker_host, broker_port)) – The set of brokers to which to attempt to connect
_update_brokers(broker_metadata)

Update brokers with fresh metadata.

Parameters:broker_metadata (Dict of {name: metadata} where metadata is pykafka.protocol.BrokerMetadata and name is str.) – Metadata for all brokers.
brokers

The dict of known brokers for this cluster

get_group_coordinator(consumer_group)

Get the broker designated as the group coordinator for this consumer group.

Based on Step 1 at https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

Parameters:consumer_group (str) – The name of the consumer group for which to find the offset manager.
get_managed_group_descriptions()

Return detailed descriptions of all managed consumer groups on this cluster

This function only returns descriptions for consumer groups created via the Group Management API, which pykafka refers to as :class:`ManagedBalancedConsumer`s

handler

The concurrency handler for network requests

topics

The dict of known topics for this cluster

NOTE: This dict is an instance of pykafka.cluster.TopicDict, which uses weak references and lazy evaluation to avoid instantiating unnecessary pykafka.Topic objects. Thus, the values displayed when printing client.topics on a freshly created pykafka.KafkaClient will be None. This simply means that the topic instances have not yet been created, but they will be when __getitem__ is called on the dictionary.

update()

Update known brokers and topics.