KafkaInfo

class lsst.ts.salkafka.KafkaInfo(broker_url, registry_url, partitions, replication_factor, wait_for_ack, log)

Bases: object

Information and clients for using Kafka.

Parameters:
broker_url : str

Kafka broker URL, without the transport. For example: my.kafka:9000

registry_url : str

Schema Registry URL, including the transport. For example: https://registry.my.kafka/

partitions : int

Number of partitions for each Kafka topic.

replication_factor : int

Number of replicas for each Kafka partition.

wait_for_ack : int

0: do not wait (unsafe) 1: wait for first kafka broker to respond (recommended) 2: wait for all kafka brokers to respond

log : logging.Logger

Logger.

Methods Summary

close() Close the Kafka clients.
make_kafka_topics(topic_names) Initialize Kafka topics that do not already exist.
make_producer(avro_schema) Make and start a Kafka producer for a topic.
start() Start the Kafka clients.

Methods Documentation

close()

Close the Kafka clients.

make_kafka_topics(topic_names)

Initialize Kafka topics that do not already exist.

Parameters:
topic_names : list'[ `str ]

List of Kafka topic names.

Returns:
new_topic_names : list [str]

List of newly created Kafka topic names.

make_producer(avro_schema)

Make and start a Kafka producer for a topic.

Parameters:
avro_schema : dict

Avro schema for the topic.

Returns:
producer : aiokafka.AIOKafkaProducer

Kafka message producer.

start()

Start the Kafka clients.