pykafka.connection

class pykafka.connection.SslConfig(cafile, certfile=None, keyfile=None, password=None)

Bases: object

Config object for SSL connections

This aims to pick optimal defaults for the majority of use cases. If you have special requirements (eg. you want to enable hostname checking), you may monkey-patch self._wrap_socket (see _legacy_wrap_socket() for an example) before passing the SslConfig to KafkaClient init, like so:

config = SslConfig(cafile=’/your/ca/file’) config._wrap_socket = config._legacy_wrap_socket() client = KafkaClient(‘localhost:<ssl-port>’, ssl_config=config)

Alternatively, completely supplanting this class with your own is also simple: if you are not going to be using the pykafka.rdkafka classes, only a method wrap_socket() is expected (so you can eg. simply pass in a plain ssl.SSLContext instance instead). The pykafka.rdkafka classes require four further attributes: cafile, certfile, keyfile, and password (the SslConfig.__init__ docstring explains their meaning)

__init__(cafile, certfile=None, keyfile=None, password=None)

Specify certificates for SSL connection

Parameters:
  • cafile (str) – Path to trusted CA certificate
  • certfile (str) – Path to client certificate
  • keyfile (str) – Path to client private-key file
  • password (bytes) – Password for private key
__weakref__

list of weak references to the object (if defined)

_legacy_wrap_socket()

Create socket-wrapper on a pre-2.7.9 Python interpreter

wrap_socket(sock)

Wrap a socket in an SSL context (see ssl.wrap_socket)

Parameters:socket (socket.socket) – Plain socket
class pykafka.connection.BrokerConnection(host, port, handler, buffer_size=1048576, source_host='', source_port=0, ssl_config=None)

Bases: object

BrokerConnection thinly wraps a socket.create_connection call and handles the sending and receiving of data that conform to the kafka binary protocol over that socket.

__del__()

Close this connection when the object is deleted.

__init__(host, port, handler, buffer_size=1048576, source_host='', source_port=0, ssl_config=None)

Initialize a socket connection to Kafka.

Parameters:
  • host (str) – The host to which to connect
  • port (int) – The port on the host to which to connect. Assumed to be an ssl-endpoint if (and only if) ssl_config is also provided
  • handler (pykafka.handlers.Handler) – The pykafka.handlers.Handler instance to use when creating a connection
  • buffer_size (int) – The size (in bytes) of the buffer in which to hold response data.
  • source_host (str) – The host portion of the source address for the socket connection
  • source_port (int) – The port portion of the source address for the socket connection
  • ssl_config (pykafka.connection.SslConfig) – Config object for SSL connection
__weakref__

list of weak references to the object (if defined)

connect(timeout)

Connect to the broker.

connected

Returns true if the socket connection is open.

disconnect()

Disconnect from the broker.

reconnect()

Disconnect from the broker, then reconnect

request(request)

Send a request over the socket connection

response()

Wait for a response from the broker