Skip to content

PACER

Consumers

By default, a dedicated exchange is created for each bounded context (e.g., synchronization, dataset ingestion). Within each exchange, multiple queues provide finer-grained routing and enable specific event types to be processed by dedicated consumers.

The PACER defines two categories of consumers:

  • General consumers: User-defined consumers responsible for implementing ICAT-related workflows and business logic.
  • Internal consumers: System-managed consumers responsible for message re-routing, retry handling, and dashboard logging.

Consumer types

Dashboard logging consumer

When the PACER Dashboard is deployed, consumers publish a message to the dashboard-logging-exchange after processing each message. These logging messages contain processing metadata and metrics, including:

  • Error information.
  • Execution timings.
  • Message payload.
  • Routing metadata.

A dedicated worker in the PACER Dashboard consumes messages from this exchange and stores them in the dashboard database, enabling monitoring, auditing, and troubleshooting.

Dead-letter consumer

If a message cannot be processed successfully, general consumers route it to a dedicated dead-letter exchange for retry handling. Messages are automatically requeued and retried after a delay, allowing transient failures to be resolved without manual intervention. By default, a message can be retried up to 15 times, although this limit is configurable.

The retry delay increases linearly according to the retry count:

delay = 60 × retry_number (seconds)

For example:

Retry Delay
1st attempt 60s
2nd attempt 120s
3rd attempt 180s

Messages that exceed the maximum retry limit are considered permanently failed.

General consumers are bound to a specific queue and implement one or more callback functions that are executed sequentially when a message is received.

Depending on the use case, consumers can be configured with integrations that enable communication with external systems such as ICAT, VISA, DataCite, and others.

Each callback is executed independently. If a callback fails, the remaining callbacks in the same consumer will continue processing the message unless explicitly configured otherwise.

Default consumers

The PACER comes with several built-in consumers that support multiple ICAT data ingestion workflows. Refer to the Operation page for more information.

graph TD

    P[Producer] --> ex-uos-sync[User / investigation sync exchange]
    P[Producer] --> ex-inv-ops[Investigation ops exchange]
    P[Producer] --> ex-dataset-ingest[Dataset ingest exchange]

    ex-logging[dashboard-logging-exchange] 
    ex-dead-letters[dead-letters-exchange]-->dead-letter-queue[Dead letters queue]
    dead-letter-queue-->dead-letters-con[Dead letters consumer]

    ex-uos-sync--> q-users[User sync  queue]
    ex-uos-sync--> q-investigations[Investigation sync  queue]

    ex-inv-ops-->q-ops[Inv. ops queue]

    ex-dataset-ingest-->q-ingest[Dataset ingest queue]

    q-users-->user-con[Users consumer]
    q-investigations-->inv-con[Investigations consumer]
    q-ops-->inv-ops-con[Inv. ops consumer]
    q-ingest-->dataset-con[Dataset consumer]

    user-con-->ex-logging
    inv-con-->ex-logging
    inv-ops-con-->ex-logging
    dataset-con-->ex-logging

    dataset-con-->ex-internal-ingest[Internal Dataset ingest exchange]

    ex-internal-ingest-->q-dataset-int[Internal dataset queue]
    ex-internal-ingest-->q-dataset-stats[Dataset statistics queue]
    ex-internal-ingest-->q-dataset-links[Dataset links queue]

    q-dataset-int-->dataset-int-con[Internal dataset consumer]
    q-dataset-stats-->dataset-stats-con[Dataset stats consumer]
    q-dataset-links-->dataset-links-con[Dataset links consumer]

    dataset-int-con-->ex-logging
    dataset-stats-con-->ex-logging
    dataset-links-con-->ex-logging

    ex-logging-->logging-q[Dashboard logging queue]
    logging-q-->dashboard-con[Dashboard consumer]

Configuration

The configuration of the PACER is managed through a YAML file. An example file can be found here.

Multiprocessing configuration

Warning

If future versions adopt Python sub-interpreters instead of multiprocessing, this configuration may be removed.

Parameter Type Default Info
multiprocessStartMethod str spawn Can be spawn, fork or forkserver.

Logging configuration

Configuration path: logging

Parameter Type Default Info
logLevel str INFO Logging level. Allowed values: DEBUG, INFO, WARNING, ERROR, CRITICAL.
printFormat str - Custom log message format used by log handlers.
console.enabled bool true Enable or disable console logging.
file.enabled bool false Enable or disable file logging.
file.path str - Path to the log file when file logging is enabled.
file.rotate bool - Enable log file rotation.
file.maxMBytes int - Maximum file size before rotation (MB).
file.backupCount int - Number of rotated log files to retain.

Logging to ElasticSearch

Configuration path: logging.elastic

Parameter Type Default Info
elastic.enabled bool false Enable logging to Elasticsearch.
elastic.serverUrl str - Elasticsearch server URL.
elastic.serviceName str - Service name attached to log records.
elastic.serviceEnvironment str - Environment identifier (e.g. prod, test).
elastic.indexName str - Elasticsearch index name.

Exchanges configuration

Configuration path: exchanges[]

Parameter Type Default Info
name str - Exchange name.
type str - Exchange type. Allowed values: direct, fanout, headers, topic, x-local-random.

Queues configuration

Configuration path: queues[]

Parameter Type Default Info
name str - Queue name.
exchange str - Exchange to bind the queue to.
routingKey str - Routing key used for message routing.
priorityEnabled bool false Enable priority in queues1.
maxPriorityLevel int 10 Maximum message priority when priorities are enabled.

Warning

On startup, the PACER declares and creates exchanges and queues according to the configured settings. If you enable priority queue support after the PACER has already been run, you must delete the existing queues and restart the PACER for the changes to take effect.

Consumers

Configuration path: consumers[]

Parameter Type Default Info
className str - Class name of the consumer.
module str - Module that contains the consumer's class.
enabled bool - If not enabled, the consumer's process won't be spawned.
queues[] list[str] - List of queues from which the consumer will retrieve messages.
workers int - Amount of workers in the consumer's context.
integrations list[str] - List of integrations enabled for the consumer.

Brokers configuration

Main broker

Configuration path: brokers.main

Parameter Type Default Info
protocol str - Broker protocol. Allowed values: amqp, amqps, redis, rediss, sqs, memory, filesystem.
host str - Broker hostname or IP address.
port int - Broker port.
username str - Authentication username.
password str - Authentication password.
vHost str - Virtual host used by the broker.

Recipient brokers

Configuration path: brokers.recipients[]

The PACER can forward the messages it processes to other brokers. This is useful, for example, for user and investigation synchronization. With this feature enabled, if you want to keep users and investigations synchronized with a staging environment, you can do so directly without the need for a separate producer for that environment.

Parameter Type Default Info
name str - Unique recipient broker name.
protocol str - Recipient broker protocol.
host str - Recipient broker hostname.
port int - Recipient broker port.
username str - Authentication username.
password str - Authentication password.
vHost str - Virtual host.
forwardingRules[].fromExchange str - Source exchange to monitor.
forwardingRules[].withRoutingKey str - Routing key pattern to match.
forwardingRules[].toBroker str - Recipient broker name that receives matched messages.

Ingestion settings configuration

Global

Configuration path: ingestionSettings

Parameter Scope Type Default Info
messageProcessingRetries Global int 5 Number of retries before a message is considered failed.

Dataset

Configuration path: ingestionSettings.dataset

Parameter Scope Type Default Info
acceptXMLPayloads Dataset bool False Accept XML payloads during ingestion.
mandatoryPathsExistence Dataset bool True Validate that referenced dataset paths exist in filesystem.
mandatorySampleType Dataset bool True Require datasets to define an existing ICAT sample type.
checkAllowedLocationPaths Dataset bool True Restrict dataset locations to configured root paths.
allowedRootLocationPaths Dataset list[str] Allowed dataset root locations. E.g. if set to /data, it won't allow any dataset whose location is outside of this path.
internalDatasetExchangeName Dataset str dataset-internal-ingest-exchange Internal exchange used for dataset processing.
internalDatasetRoutingKey Dataset str dataset.internal_ingest Routing key for dataset messages.
internalStatisticsRoutingKey Dataset str statistics.internal_ingest Routing key for statistics messages.
internalDatasetLinksRoutingKey Dataset str dataset.internal_links Routing key for dataset link processing.
automaticDatasetLocationIndex Dataset bool False If enabled, it will automatically index all files in dataset's root location as datafiles.
maxDatafilesPerDataset Dataset int 30000 Maximum number of datafiles allowed per dataset. Upon reaching the limit, no more datafiles will be created in a dataset.
galleryFolderName Dataset str gallery Folder containing gallery images.
xmlNamespacesTransform[].schema Dataset str - XML namespace transformation origin.
xmlNamespacesTransform[].to Dataset str - XML namespace transformation destination.
galleryAcceptedUploadTypes Dataset list[str] Image extensions list Accepted image formats for gallery uploads.

Investigation

Configuration path: ingestionSettings.investigation

Parameter Scope Type Default Info
defaultEmbargoYears Investigation int 3 Default embargo period in years, added to the end date of an investigation to calculate its release date.
defaultFacilityName Investigation str - ICAT facility name.
defaultIndustrialInvestigationTypeName Investigation str INDUSTRIAL Default investigation type for industrial proposals. A release data is not set for industrial proposals.

Parameters

Configuration path: ingestionSettings.parameters

Parameter Scope Type Default Info
storeParametersValuesAlsoAsstr Parameters bool false Store parameter values as strs in addition to their native type.

Integration configuration

Configuration path: integrations

Message forwarding

Parameter Type Default Info
messageForwarding obj - Enable message forwarding integration.

ICAT

Parameter Type Default Info
enabled bool - Enable ICAT integration.
server.url str - ICAT server URL.
server.authPlugin str - ICAT authentication plugin.
server.username str - ICAT account username.
server.password str - ICAT account password.

VISA

The integration with VISA is strictly for user and investigation synchronization. The connection with VISA is done directly through its database.

Parameter Type Default Info
enabled bool - Enable VISA integration.
database.host str - VISA database host.
database.port int - VISA database port.
database.database str - Database name.
database.username str - Database username.
database.password str - Database password.

Dashboard

If the PACER dashboard extension is deployed, it can be configured here.

Parameter Type Default Info
enabled bool - Enable dashboard integration.
exchangeName str dashboard-logging-exchange Exchange used to publish monitoring events.
routingKey str message.logging Routing key used for dashboard messages.
celeryTask str dashboard.tasks.log_pacer_message Celery task invoked for dashboard processing.

DataCite

Through a specific consumer and client, the PACER can automatically mint and assign DOIs to the investigations in ICAT. This is done via DataCite's REST API2. Refer to their documentation for more details.

Parameter Type Default Info
enabled bool - Enable DataCite integration.
dataCatalogueDoiBaseUrl str - Base URL used in DOI landing pages.
publisher str - Publisher name registered with DataCite.
prefix str - DOI prefix assigned by DataCite.
sessionSuffix str - Optional DOI suffix component.
username str - DataCite API username.
password str - DataCite API password.
apiUrl str - DataCite API endpoint URL.
language str - Metadata language.
rightsName str - Rights statement name.
rightsSchemeUri str - Rights scheme URI.
rightsUri str - Rights information URI.
rightsIdentifierScheme str - Rights identifier scheme.
rightsIdentifier str - Rights identifier value.
funderName str - Funding organisation name.
funderIdentifier str - Funding organisation identifier.
funderIdentifierType str - Funding identifier type.

PaNOSC

Warning

This integration might be deprecated in future versions.

The PACER can integrate with PaNOSC's Search Scoring service3 to automatically index investigations into the PaNOSC data portal4.

Parameter Type Default Info
enabled bool - Enable PaNOSC integration.
apiUrl str - PaNOSC ingestion API URL.
username str - API username.
password str - API password.
searchApiUrl str - Search service API URL.

ICAT+

The PACER uses the ICAT+ API for uploading images into ICAT+'s logbook.

Parameter Type Default Info
enabled bool - Enable ICAT+ integration.
server.url str - ICAT+ server URL.
server.apiKey str - API key used for authentication.