ComponentProducerSet

class lsst.ts.salkafka.ComponentProducerSet(kafka_config, log_level=20)

Bases: object

A collection of one or more ComponentProducers created from a command-line script.

The normal way to use this class is to run ComponentProducerSet.amain from a command-line script. If you wish to run it more directly (e.g. for unit tests), do one of the following:

  • To handle all topics for one or more SAL components:

    kafka_config = KafkaConfig(_kafka info_)
    producer_set = ComponentProducerSet(kafka_config=kafka_config)
    await producer_set.run_producers(components=_component names_)
    
  • To distribute production of one SAL component among multiple subprocesses:

    topic_names_set = TopicNamesSet.from_file(_filepath_)
    kafka_config = KafkaConfig(_kafka_info_)
    producer_set = ComponentProducerSet(kafka_config=kafka_config)
    await producer_set.run_distributed_producer(
        topic_names_set=topic_names_set,
    )
    
Parameters:
kafka_configKafkaConfig

Kafka configuration.

log_levelint, optional

Log level, e.g. logging.INFO.

Methods Summary

amain()

Parse command line arguments and create and run a ComponentProducerSet.

create_producer_subprocess(*, kafka_config, ...)

Create and run a producer for a subset of one SAL component's topics.

make_argument_parser()

Make a command-line argument parser.

run_distributed_producer(topic_names_set)

Produce messages for one SAL component, distributing the topics among multiple subprocesses.

run_producer_subprocess(component, index, ...)

Run a producer subprocess created by create_producer_subprocess.

run_producers(*, components[, queue_len])

Create and run component producers for one or more SAL components.

signal_handler()

wait_partial_producers_started(...)

Wait for all partial producers to report that they have started.

Methods Documentation

async classmethod amain()

Parse command line arguments and create and run a ComponentProducerSet.

async classmethod create_producer_subprocess(*, kafka_config, component, index, topic_names, log_level, started_queue, queue_len=100)

Create and run a producer for a subset of one SAL component’s topics.

Parameters:
kafka_configKafkaConfig

Kafka configuration. The partitions field is ignored, in favor of the value in topic_names.

componentstr

Name of a SAL component for which to handle a subset of topics.

indexint

Index of topic_names in TopicNamesSet; identifies this sub-producers.

topic_namesTopicNames

Topic names.

log_levelint, optional

Log level, e.g. logging.INFO.

started_queuemultiprocessing.Queue

A queue to which to publish the index when this producer has started running.

queue_lenint, optional

Length of the DDS read queue. Must be greater than or equal to salobj.domain.DDS_READ_QUEUE_LEN, which is the default.

static make_argument_parser()

Make a command-line argument parser.

async run_distributed_producer(topic_names_set)

Produce messages for one SAL component, distributing the topics among multiple subprocesses.

Parameters:
topic_names_setTopicNamesSet

Component name and topic names list.

async run_producer_subprocess(component, index, topic_names, started_queue, queue_len=100)

Run a producer subprocess created by create_producer_subprocess.

This is a separate method so it can be interrupted with the signal handler (which otherwise could not easily interrupt the creation of salobj.Domain and KafkaProducerFactory).

Parameters:
componentstr

Name of a SAL component for which to handle a subset of topics.

indexint

Index of topic_names in TopicNamesSet; identifies this sub-producers.

topic_namesTopicNames

Topic names.

started_queuemultiprocessing.Queue

A queue to which to publish the index when this producer has started running.

queue_lenint, optional

Length of the DDS read queue. Must be greater than or equal to salobj.domain.DDS_READ_QUEUE_LEN, which is the default.

Raises:
ValueError

If topic_names.partitions != self.kafka_config.partitions.

async run_producers(*, components, queue_len=100)

Create and run component producers for one or more SAL components.

This version produces all topics for each component.

Parameters:
componentsList [str]

Names of SAL components for which to produce Kafka messages.

queue_lenint, optional

Length of the DDS read queue. Must be greater than or equal to salobj.domain.DDS_READ_QUEUE_LEN, which is the default.

Raises:
ValueError

If components contains duplicate names.

RuntimeError

If there is no IDL file for one of the components.

signal_handler()
async wait_partial_producers_started(num_producers, started_queue)

Wait for all partial producers to report that they have started.

Parameters:
num_producersint

The number of partial producers to wait for

started_queuemultiprocessing.Queue

Queue that receives started notifications, as the index of ach ppartial producer that has started.