Skip to content

Development

Adding new consumers

The PACER provides a base class for which anyone can create a general consumer.

class DatasetsConsumer(PACERConsumer):

    def __init__(self, *args, **kwargs) -> None:

        super().__init__(
            dashboard_message_type="dataset-ingestion" # (1)
            *args, **kwargs)
        self.reject_msg_at_first_callback_error = True # (2)

        self.tasks = DatasetsTasks(self.logger)

    def get_message_object_identifiers(self, message: Message, 
            shared_obj_identifiers: dict = {}) -> dict: # (3)
        return {
                "instrument": message['instrument'], 
                "dataset": message['dataset'], 
                **shared_obj_identifiers}

    @callback_order(1) # (4)
    def callback_func_main_dataset_creation(self, _body, 
       message: Message, *args, **kwargs) -> None:
        do_something()
  1. The dashboard_message_type identifies the type of message being logged in the dashboard.
  2. If reject_msg_at_first_callback_error is set to True, the consumer will reject the message if any callback function raises an error. This ensures that the message is not processed further if there is an issue with any of the callback functions.
  3. Messages logged to the dashboard have a JSONField that is used for searching in the database without using the message's payload. The get_message_object_identifiers function returns a dict which is used for populating this field.
  4. Callbacks are executed in an arbitrary order. If execution order is important, use the callback_order decorator to explicitly define the order in which callbacks are executed.

By design, each flow is implemented in an external module called tasks. The consumer callbacks invoke functions from this module, which contain the actual business logic of the flow.

This separation ensures that flow implementations remain decoupled from the messaging library and can be reused independently in the future.

Payload parsing and validation are handled with Pydantic1 in a specific module (helpers.models) to also ensure reusability and maintain decoupling between components. For legacy XML payloads, the validation is done after the conversion to JSON.

Running the unit tests

The PACER uses the project icat-testbox to run its unit tests. This project provides a REST API that enables on-demand provisioning of ICAT test instances for executing tests.

This is the recommended approach for running the unit tests, although they can also be executed against a standard ICAT instance. All configuration options are managed through environment variables.

Variable Default Info
PACER_TEST_BACKEND testbox Value can be testbox or server.
ICAT_TESTBOX_SERVER_PROTOCOL http
ICAT_TESTBOX_SERVER_HOST -
ICAT_TESTBOX_SERVER_PORT 5000
ICAT_TESTBOX_AUTHN_DB_VERSION 3.0.0 The version of the authn.db plugin to be used in the ICAT unit test instance.
ICAT_TESTBOX_ICAT_SERVER_VERSION 6.2.0 The version of the icat.server plugin to be used in the ICAT unit test instance.
ICAT_TESTBOX_AUTHN_DB_VERSION 3.0.0 The version of the authn.db plugin to be used in the ICAT unit test instance.
ICAT_TESTBOX_DB_FIXTURE_LOAD True Wheter the provisioned unit test instance should load some intial fixtures or not.
ICAT_TESTBOX_DELETE_AFTER_TESTS_RUN False Delete ICAT test instance after the tests execution (useful for CI/CD pipelines).
ICAT_AUTH_PLUGIN db
ICAT_SERVER_URL - This is automatically set when using the testbox test backend.
ICAT_AUTH_USERNAME -
ICAT_AUTH_PASSWORD -

Producer example

...
from pika import PlainCredentials, ConnectionParameters, BlockingConnection, BasicProperties
from pika.exceptions import UnroutableError


class UserProducer:

    @classmethod
    def __get_broker_connection(cls) -> BlockingConnection or None:
        host: str = settings.PACER_RMQ_HOST
        port: int = settings.PACER_RMQ_PORT
        username: str = settings.PACER_RMQ_USERNAME
        password: str = settings.PACER_RMQ_PASSWORD
        vhost: str = settings.PACER_RMQ_VIRTUAL_HOST

        if not host: return None

        credentials: PlainCredentials or None = None
        if username and settings.PACER_RMQ_PASSWORD:
            credentials: PlainCredentials = PlainCredentials(username, password)

        params: ConnectionParameters = ConnectionParameters(
            host=settings.PACER_RMQ_HOST,
            **({"credentials": credentials} if credentials is not None else {}),
            **({"port": port} if port is not None else {}),
            **({"virtual_host": vhost} if vhost is not None else {"virtual_host": "/"})
        )

        return BlockingConnection(params)

    @classmethod
    def __send_messages_to_broker(cls, messages: list, routing_key: str = None, headers: dict = None):
        broker_conn: BlockingConnection or None = cls.__get_broker_connection()
        properties: BasicProperties = BasicProperties(headers=headers) if headers else None
        if not broker_conn: return

        with broker_conn.channel() as channel:
            channel.confirm_delivery()
            for message in messages:
                logger.debug(f"Sending message to broker: {message}")
                try:
                    channel.basic_publish(
                        exchange=settings.PACER_RMQ_EXCHANGE,
                        routing_key=routing_key,
                        body=message,
                        properties=properties)
                    logger.debug(f"Message confirmed by broker")
                except UnroutableError as e:
                    logger.error(f"Message NOT sent to broker: {e}")

        broker_conn.close()

    @classmethod
    def __publish_generic_object_update(cls, serializer_cls: ModelSerializer, routing_key: str, headers: dict,
                                        updated_object: any = None,
                                        objects_queryset: QuerySet = None):
        if updated_object:
            logger.info(f"Publishing update for {updated_object} and routing_key={routing_key}")
            serializer = serializer_cls(updated_object)
            json_msg: str = json.dumps(serializer.data, ensure_ascii=False)
            cls.__send_messages_to_broker([json_msg], routing_key, headers=headers)
        elif objects_queryset:
            logger.info(f"Publishing update for {objects_queryset.count()} objects and routing_key={routing_key}")
            serializer = serializer_cls(objects_queryset, many=True)
            json_msgs: list = [json.dumps(item, ensure_ascii=False) for item in serializer.data]
            cls.__send_messages_to_broker(json_msgs, routing_key, headers=headers)

    @classmethod
    def publish_user_update(cls, updated_object: any = None, objects_queryset: QuerySet = None):
        if settings.TESTING_MODE: return

        routing_key: str = settings.PACER_RMQ_USERS_ROUTING_KEY
        serializer: Serializer = import_string('user.serializers.profile.PACERUserProfileSerializer')

        headers: dict = {"source": "uos", "content-type": "application/json"}
        cls.__publish_generic_object_update(serializer, routing_key, headers, updated_object, objects_queryset)