ComponentProducerSet¶
- class lsst.ts.salkafka.ComponentProducerSet(kafka_config, log_level=20)¶
Bases:
object
A collection of one or more
ComponentProducer
s created from a command-line script.The normal way to use this class is to run
ComponentProducerSet.amain
from a command-line script. If you wish to run it more directly (e.g. for unit tests), do one of the following:To handle all topics for one or more SAL components:
kafka_config = KafkaConfig(_kafka info_) producer_set = ComponentProducerSet(kafka_config=kafka_config) await producer_set.run_producers(components=_component names_)
To distribute production of one SAL component among multiple subprocesses:
topic_names_set = TopicNamesSet.from_file(_filepath_) kafka_config = KafkaConfig(_kafka_info_) producer_set = ComponentProducerSet(kafka_config=kafka_config) await producer_set.run_distributed_producer( topic_names_set=topic_names_set, )
- Parameters:
- kafka_config
KafkaConfig
Kafka configuration.
- log_level
int
, optional Log level, e.g. logging.INFO.
- kafka_config
Methods Summary
amain
()Parse command line arguments and create and run a
ComponentProducerSet
.create_producer_subprocess
(*, kafka_config, ...)Create and run a producer for a subset of one SAL component's topics.
Make a command-line argument parser.
run_distributed_producer
(topic_names_set)Produce messages for one SAL component, distributing the topics among multiple subprocesses.
run_producer_subprocess
(component, index, ...)Run a producer subprocess created by create_producer_subprocess.
run_producers
(*, components[, queue_len])Create and run component producers for one or more SAL components.
Wait for all partial producers to report that they have started.
Methods Documentation
- async classmethod amain()¶
Parse command line arguments and create and run a
ComponentProducerSet
.
- async classmethod create_producer_subprocess(*, kafka_config, component, index, topic_names, log_level, started_queue, queue_len=100)¶
Create and run a producer for a subset of one SAL component’s topics.
- Parameters:
- kafka_config
KafkaConfig
Kafka configuration. The
partitions
field is ignored, in favor of the value intopic_names
.- component
str
Name of a SAL component for which to handle a subset of topics.
- index
int
Index of topic_names in TopicNamesSet; identifies this sub-producers.
- topic_names
TopicNames
Topic names.
- log_level
int
, optional Log level, e.g. logging.INFO.
- started_queue
multiprocessing.Queue
A queue to which to publish the index when this producer has started running.
- queue_len
int
, optional Length of the DDS read queue. Must be greater than or equal to
salobj.domain.DDS_READ_QUEUE_LEN
, which is the default.
- kafka_config
- static make_argument_parser()¶
Make a command-line argument parser.
- async run_distributed_producer(topic_names_set)¶
Produce messages for one SAL component, distributing the topics among multiple subprocesses.
- Parameters:
- topic_names_set
TopicNamesSet
Component name and topic names list.
- topic_names_set
- async run_producer_subprocess(component, index, topic_names, started_queue, queue_len=100)¶
Run a producer subprocess created by create_producer_subprocess.
This is a separate method so it can be interrupted with the signal handler (which otherwise could not easily interrupt the creation of salobj.Domain and KafkaProducerFactory).
- Parameters:
- component
str
Name of a SAL component for which to handle a subset of topics.
- index
int
Index of topic_names in TopicNamesSet; identifies this sub-producers.
- topic_names
TopicNames
Topic names.
- started_queue
multiprocessing.Queue
A queue to which to publish the index when this producer has started running.
- queue_len
int
, optional Length of the DDS read queue. Must be greater than or equal to
salobj.domain.DDS_READ_QUEUE_LEN
, which is the default.
- component
- Raises:
- ValueError
If topic_names.partitions != self.kafka_config.partitions.
- async run_producers(*, components, queue_len=100)¶
Create and run component producers for one or more SAL components.
This version produces all topics for each component.
- Parameters:
- Raises:
- ValueError
If components contains duplicate names.
- RuntimeError
If there is no IDL file for one of the components.
- signal_handler()¶
- async wait_partial_producers_started(num_producers, started_queue)¶
Wait for all partial producers to report that they have started.
- Parameters:
- num_producers
int
The number of partial producers to wait for
- started_queue
multiprocessing.Queue
Queue that receives started notifications, as the index of ach ppartial producer that has started.
- num_producers