PACER¶
Consumers¶
By default, a dedicated exchange is created for each bounded context (e.g., synchronization, dataset ingestion). Within each exchange, multiple queues provide finer-grained routing and enable specific event types to be processed by dedicated consumers.
The PACER defines two categories of consumers:
- General consumers: User-defined consumers responsible for implementing ICAT-related workflows and business logic.
- Internal consumers: System-managed consumers responsible for message re-routing, retry handling, and dashboard logging.
Consumer types¶
Dashboard logging consumer¶
When the PACER Dashboard is deployed, consumers publish a message to the dashboard-logging-exchange after processing each message. These logging messages contain processing metadata and metrics, including:
- Error information.
- Execution timings.
- Message payload.
- Routing metadata.
A dedicated worker in the PACER Dashboard consumes messages from this exchange and stores them in the dashboard database, enabling monitoring, auditing, and troubleshooting.
Dead-letter consumer¶
If a message cannot be processed successfully, general consumers route it to a dedicated dead-letter exchange for retry handling. Messages are automatically requeued and retried after a delay, allowing transient failures to be resolved without manual intervention. By default, a message can be retried up to 15 times, although this limit is configurable.
The retry delay increases linearly according to the retry count:
delay = 60 × retry_number (seconds)
For example:
| Retry | Delay |
|---|---|
| 1st attempt | 60s |
| 2nd attempt | 120s |
| 3rd attempt | 180s |
Messages that exceed the maximum retry limit are considered permanently failed.
General consumers are bound to a specific queue and implement one or more callback functions that are executed sequentially when a message is received.
Depending on the use case, consumers can be configured with integrations that enable communication with external systems such as ICAT, VISA, DataCite, and others.
Each callback is executed independently. If a callback fails, the remaining callbacks in the same consumer will continue processing the message unless explicitly configured otherwise.
Default consumers¶
The PACER comes with several built-in consumers that support multiple ICAT data ingestion workflows. Refer to the Operation page for more information.
graph TD
P[Producer] --> ex-uos-sync[User / investigation sync exchange]
P[Producer] --> ex-inv-ops[Investigation ops exchange]
P[Producer] --> ex-dataset-ingest[Dataset ingest exchange]
ex-logging[dashboard-logging-exchange]
ex-dead-letters[dead-letters-exchange]-->dead-letter-queue[Dead letters queue]
dead-letter-queue-->dead-letters-con[Dead letters consumer]
ex-uos-sync--> q-users[User sync queue]
ex-uos-sync--> q-investigations[Investigation sync queue]
ex-inv-ops-->q-ops[Inv. ops queue]
ex-dataset-ingest-->q-ingest[Dataset ingest queue]
q-users-->user-con[Users consumer]
q-investigations-->inv-con[Investigations consumer]
q-ops-->inv-ops-con[Inv. ops consumer]
q-ingest-->dataset-con[Dataset consumer]
user-con-->ex-logging
inv-con-->ex-logging
inv-ops-con-->ex-logging
dataset-con-->ex-logging
dataset-con-->ex-internal-ingest[Internal Dataset ingest exchange]
ex-internal-ingest-->q-dataset-int[Internal dataset queue]
ex-internal-ingest-->q-dataset-stats[Dataset statistics queue]
ex-internal-ingest-->q-dataset-links[Dataset links queue]
q-dataset-int-->dataset-int-con[Internal dataset consumer]
q-dataset-stats-->dataset-stats-con[Dataset stats consumer]
q-dataset-links-->dataset-links-con[Dataset links consumer]
dataset-int-con-->ex-logging
dataset-stats-con-->ex-logging
dataset-links-con-->ex-logging
ex-logging-->logging-q[Dashboard logging queue]
logging-q-->dashboard-con[Dashboard consumer]
Configuration¶
The configuration of the PACER is managed through a YAML file. An example file can be found here.
Multiprocessing configuration¶
Warning
If future versions adopt Python sub-interpreters instead of multiprocessing, this configuration may be removed.
| Parameter | Type | Default | Info |
|---|---|---|---|
multiprocessStartMethod |
str | spawn |
Can be spawn, fork or forkserver. |
Logging configuration¶
Configuration path: logging
| Parameter | Type | Default | Info |
|---|---|---|---|
logLevel |
str |
INFO |
Logging level. Allowed values: DEBUG, INFO, WARNING, ERROR, CRITICAL. |
printFormat |
str |
- | Custom log message format used by log handlers. |
console.enabled |
bool |
true |
Enable or disable console logging. |
file.enabled |
bool |
false |
Enable or disable file logging. |
file.path |
str |
- | Path to the log file when file logging is enabled. |
file.rotate |
bool |
- | Enable log file rotation. |
file.maxMBytes |
int |
- | Maximum file size before rotation (MB). |
file.backupCount |
int |
- | Number of rotated log files to retain. |
Logging to ElasticSearch¶
Configuration path: logging.elastic
| Parameter | Type | Default | Info |
|---|---|---|---|
elastic.enabled |
bool |
false |
Enable logging to Elasticsearch. |
elastic.serverUrl |
str |
- | Elasticsearch server URL. |
elastic.serviceName |
str |
- | Service name attached to log records. |
elastic.serviceEnvironment |
str |
- | Environment identifier (e.g. prod, test). |
elastic.indexName |
str |
- | Elasticsearch index name. |
Exchanges configuration¶
Configuration path: exchanges[]
| Parameter | Type | Default | Info |
|---|---|---|---|
name |
str |
- | Exchange name. |
type |
str |
- | Exchange type. Allowed values: direct, fanout, headers, topic, x-local-random. |
Queues configuration¶
Configuration path: queues[]
| Parameter | Type | Default | Info |
|---|---|---|---|
name |
str |
- | Queue name. |
exchange |
str |
- | Exchange to bind the queue to. |
routingKey |
str |
- | Routing key used for message routing. |
priorityEnabled |
bool |
false |
Enable priority in queues1. |
maxPriorityLevel |
int |
10 |
Maximum message priority when priorities are enabled. |
Warning
On startup, the PACER declares and creates exchanges and queues according to the configured settings. If you enable priority queue support after the PACER has already been run, you must delete the existing queues and restart the PACER for the changes to take effect.
Consumers¶
Configuration path: consumers[]
| Parameter | Type | Default | Info |
|---|---|---|---|
className |
str |
- | Class name of the consumer. |
module |
str |
- | Module that contains the consumer's class. |
enabled |
bool |
- | If not enabled, the consumer's process won't be spawned. |
queues[] |
list[str] |
- | List of queues from which the consumer will retrieve messages. |
workers |
int |
- | Amount of workers in the consumer's context. |
integrations |
list[str] |
- | List of integrations enabled for the consumer. |
Brokers configuration¶
Main broker¶
Configuration path: brokers.main
| Parameter | Type | Default | Info |
|---|---|---|---|
protocol |
str |
- | Broker protocol. Allowed values: amqp, amqps, redis, rediss, sqs, memory, filesystem. |
host |
str |
- | Broker hostname or IP address. |
port |
int |
- | Broker port. |
username |
str |
- | Authentication username. |
password |
str |
- | Authentication password. |
vHost |
str |
- | Virtual host used by the broker. |
Recipient brokers¶
Configuration path: brokers.recipients[]
The PACER can forward the messages it processes to other brokers. This is useful, for example, for user and investigation synchronization. With this feature enabled, if you want to keep users and investigations synchronized with a staging environment, you can do so directly without the need for a separate producer for that environment.
| Parameter | Type | Default | Info |
|---|---|---|---|
name |
str |
- | Unique recipient broker name. |
protocol |
str |
- | Recipient broker protocol. |
host |
str |
- | Recipient broker hostname. |
port |
int |
- | Recipient broker port. |
username |
str |
- | Authentication username. |
password |
str |
- | Authentication password. |
vHost |
str |
- | Virtual host. |
forwardingRules[].fromExchange |
str |
- | Source exchange to monitor. |
forwardingRules[].withRoutingKey |
str |
- | Routing key pattern to match. |
forwardingRules[].toBroker |
str |
- | Recipient broker name that receives matched messages. |
Ingestion settings configuration¶
Global¶
Configuration path: ingestionSettings
| Parameter | Scope | Type | Default | Info |
|---|---|---|---|---|
messageProcessingRetries |
Global | int |
5 |
Number of retries before a message is considered failed. |
Dataset¶
Configuration path: ingestionSettings.dataset
| Parameter | Scope | Type | Default | Info |
|---|---|---|---|---|
acceptXMLPayloads |
Dataset | bool |
False |
Accept XML payloads during ingestion. |
mandatoryPathsExistence |
Dataset | bool |
True |
Validate that referenced dataset paths exist in filesystem. |
mandatorySampleType |
Dataset | bool |
True |
Require datasets to define an existing ICAT sample type. |
checkAllowedLocationPaths |
Dataset | bool |
True |
Restrict dataset locations to configured root paths. |
allowedRootLocationPaths |
Dataset | list[str] |
Allowed dataset root locations. E.g. if set to /data, it won't allow any dataset whose location is outside of this path. |
|
internalDatasetExchangeName |
Dataset | str |
dataset-internal-ingest-exchange |
Internal exchange used for dataset processing. |
internalDatasetRoutingKey |
Dataset | str |
dataset.internal_ingest |
Routing key for dataset messages. |
internalStatisticsRoutingKey |
Dataset | str |
statistics.internal_ingest |
Routing key for statistics messages. |
internalDatasetLinksRoutingKey |
Dataset | str |
dataset.internal_links |
Routing key for dataset link processing. |
automaticDatasetLocationIndex |
Dataset | bool |
False |
If enabled, it will automatically index all files in dataset's root location as datafiles. |
maxDatafilesPerDataset |
Dataset | int |
30000 |
Maximum number of datafiles allowed per dataset. Upon reaching the limit, no more datafiles will be created in a dataset. |
galleryFolderName |
Dataset | str |
gallery |
Folder containing gallery images. |
xmlNamespacesTransform[].schema |
Dataset | str |
- | XML namespace transformation origin. |
xmlNamespacesTransform[].to |
Dataset | str |
- | XML namespace transformation destination. |
galleryAcceptedUploadTypes |
Dataset | list[str] |
Image extensions list | Accepted image formats for gallery uploads. |
Investigation¶
Configuration path: ingestionSettings.investigation
| Parameter | Scope | Type | Default | Info |
|---|---|---|---|---|
defaultEmbargoYears |
Investigation | int |
3 |
Default embargo period in years, added to the end date of an investigation to calculate its release date. |
defaultFacilityName |
Investigation | str |
- | ICAT facility name. |
defaultIndustrialInvestigationTypeName |
Investigation | str |
INDUSTRIAL |
Default investigation type for industrial proposals. A release data is not set for industrial proposals. |
Parameters¶
Configuration path: ingestionSettings.parameters
| Parameter | Scope | Type | Default | Info |
|---|---|---|---|---|
storeParametersValuesAlsoAsstr |
Parameters | bool |
false |
Store parameter values as strs in addition to their native type. |
Integration configuration¶
Configuration path: integrations
Message forwarding¶
| Parameter | Type | Default | Info |
|---|---|---|---|
messageForwarding |
obj |
- | Enable message forwarding integration. |
ICAT¶
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable ICAT integration. |
server.url |
str |
- | ICAT server URL. |
server.authPlugin |
str |
- | ICAT authentication plugin. |
server.username |
str |
- | ICAT account username. |
server.password |
str |
- | ICAT account password. |
VISA¶
The integration with VISA is strictly for user and investigation synchronization. The connection with VISA is done directly through its database.
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable VISA integration. |
database.host |
str |
- | VISA database host. |
database.port |
int |
- | VISA database port. |
database.database |
str |
- | Database name. |
database.username |
str |
- | Database username. |
database.password |
str |
- | Database password. |
Dashboard¶
If the PACER dashboard extension is deployed, it can be configured here.
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable dashboard integration. |
exchangeName |
str |
dashboard-logging-exchange |
Exchange used to publish monitoring events. |
routingKey |
str |
message.logging |
Routing key used for dashboard messages. |
celeryTask |
str |
dashboard.tasks.log_pacer_message |
Celery task invoked for dashboard processing. |
DataCite¶
Through a specific consumer and client, the PACER can automatically mint and assign DOIs to the investigations in ICAT. This is done via DataCite's REST API2. Refer to their documentation for more details.
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable DataCite integration. |
dataCatalogueDoiBaseUrl |
str |
- | Base URL used in DOI landing pages. |
publisher |
str |
- | Publisher name registered with DataCite. |
prefix |
str |
- | DOI prefix assigned by DataCite. |
sessionSuffix |
str |
- | Optional DOI suffix component. |
username |
str |
- | DataCite API username. |
password |
str |
- | DataCite API password. |
apiUrl |
str |
- | DataCite API endpoint URL. |
language |
str |
- | Metadata language. |
rightsName |
str |
- | Rights statement name. |
rightsSchemeUri |
str |
- | Rights scheme URI. |
rightsUri |
str |
- | Rights information URI. |
rightsIdentifierScheme |
str |
- | Rights identifier scheme. |
rightsIdentifier |
str |
- | Rights identifier value. |
funderName |
str |
- | Funding organisation name. |
funderIdentifier |
str |
- | Funding organisation identifier. |
funderIdentifierType |
str |
- | Funding identifier type. |
PaNOSC¶
Warning
This integration might be deprecated in future versions.
The PACER can integrate with PaNOSC's Search Scoring service3 to automatically index investigations into the PaNOSC data portal4.
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable PaNOSC integration. |
apiUrl |
str |
- | PaNOSC ingestion API URL. |
username |
str |
- | API username. |
password |
str |
- | API password. |
searchApiUrl |
str |
- | Search service API URL. |
ICAT+¶
The PACER uses the ICAT+ API for uploading images into ICAT+'s logbook.
| Parameter | Type | Default | Info |
|---|---|---|---|
enabled |
bool |
- | Enable ICAT+ integration. |
server.url |
str |
- | ICAT+ server URL. |
server.apiKey |
str |
- | API key used for authentication. |