Development¶
Adding new consumers¶
The PACER provides a base class for which anyone can create a general consumer.
class DatasetsConsumer(PACERConsumer):
def __init__(self, *args, **kwargs) -> None:
super().__init__(
dashboard_message_type="dataset-ingestion" # (1)
*args, **kwargs)
self.reject_msg_at_first_callback_error = True # (2)
self.tasks = DatasetsTasks(self.logger)
def get_message_object_identifiers(self, message: Message,
shared_obj_identifiers: dict = {}) -> dict: # (3)
return {
"instrument": message['instrument'],
"dataset": message['dataset'],
**shared_obj_identifiers}
@callback_order(1) # (4)
def callback_func_main_dataset_creation(self, _body,
message: Message, *args, **kwargs) -> None:
do_something()
- The
dashboard_message_typeidentifies the type of message being logged in the dashboard. - If
reject_msg_at_first_callback_erroris set toTrue, the consumer will reject the message if any callback function raises an error. This ensures that the message is not processed further if there is an issue with any of the callback functions. - Messages logged to the dashboard have a
JSONFieldthat is used for searching in the database without using the message's payload. Theget_message_object_identifiersfunction returns adictwhich is used for populating this field. - Callbacks are executed in an arbitrary order. If execution order is important, use the callback_order decorator to explicitly define the order in which callbacks are executed.
By design, each flow is implemented in an external module called tasks. The consumer callbacks invoke functions from this module, which contain the actual business logic of the flow.
This separation ensures that flow implementations remain decoupled from the messaging library and can be reused independently in the future.
Payload parsing and validation are handled with Pydantic1 in a specific module (helpers.models) to also ensure
reusability and
maintain decoupling between components. For legacy XML payloads, the validation is done after the conversion to JSON.
Running the unit tests¶
The PACER uses the project icat-testbox to run its unit tests. This project provides a REST API that enables on-demand provisioning of ICAT test instances for executing tests.
This is the recommended approach for running the unit tests, although they can also be executed against a standard ICAT instance. All configuration options are managed through environment variables.
| Variable | Default | Info |
|---|---|---|
PACER_TEST_BACKEND |
testbox |
Value can be testbox or server. |
ICAT_TESTBOX_SERVER_PROTOCOL |
http |
|
ICAT_TESTBOX_SERVER_HOST |
- | |
ICAT_TESTBOX_SERVER_PORT |
5000 |
|
ICAT_TESTBOX_AUTHN_DB_VERSION |
3.0.0 |
The version of the authn.db plugin to be used in the ICAT unit test instance. |
ICAT_TESTBOX_ICAT_SERVER_VERSION |
6.2.0 |
The version of the icat.server plugin to be used in the ICAT unit test instance. |
ICAT_TESTBOX_AUTHN_DB_VERSION |
3.0.0 |
The version of the authn.db plugin to be used in the ICAT unit test instance. |
ICAT_TESTBOX_DB_FIXTURE_LOAD |
True |
Wheter the provisioned unit test instance should load some intial fixtures or not. |
ICAT_TESTBOX_DELETE_AFTER_TESTS_RUN |
False |
Delete ICAT test instance after the tests execution (useful for CI/CD pipelines). |
ICAT_AUTH_PLUGIN |
db |
|
ICAT_SERVER_URL |
- | This is automatically set when using the testbox test backend. |
ICAT_AUTH_USERNAME |
- | |
ICAT_AUTH_PASSWORD |
- |
Producer example¶
...
from pika import PlainCredentials, ConnectionParameters, BlockingConnection, BasicProperties
from pika.exceptions import UnroutableError
class UserProducer:
@classmethod
def __get_broker_connection(cls) -> BlockingConnection or None:
host: str = settings.PACER_RMQ_HOST
port: int = settings.PACER_RMQ_PORT
username: str = settings.PACER_RMQ_USERNAME
password: str = settings.PACER_RMQ_PASSWORD
vhost: str = settings.PACER_RMQ_VIRTUAL_HOST
if not host: return None
credentials: PlainCredentials or None = None
if username and settings.PACER_RMQ_PASSWORD:
credentials: PlainCredentials = PlainCredentials(username, password)
params: ConnectionParameters = ConnectionParameters(
host=settings.PACER_RMQ_HOST,
**({"credentials": credentials} if credentials is not None else {}),
**({"port": port} if port is not None else {}),
**({"virtual_host": vhost} if vhost is not None else {"virtual_host": "/"})
)
return BlockingConnection(params)
@classmethod
def __send_messages_to_broker(cls, messages: list, routing_key: str = None, headers: dict = None):
broker_conn: BlockingConnection or None = cls.__get_broker_connection()
properties: BasicProperties = BasicProperties(headers=headers) if headers else None
if not broker_conn: return
with broker_conn.channel() as channel:
channel.confirm_delivery()
for message in messages:
logger.debug(f"Sending message to broker: {message}")
try:
channel.basic_publish(
exchange=settings.PACER_RMQ_EXCHANGE,
routing_key=routing_key,
body=message,
properties=properties)
logger.debug(f"Message confirmed by broker")
except UnroutableError as e:
logger.error(f"Message NOT sent to broker: {e}")
broker_conn.close()
@classmethod
def __publish_generic_object_update(cls, serializer_cls: ModelSerializer, routing_key: str, headers: dict,
updated_object: any = None,
objects_queryset: QuerySet = None):
if updated_object:
logger.info(f"Publishing update for {updated_object} and routing_key={routing_key}")
serializer = serializer_cls(updated_object)
json_msg: str = json.dumps(serializer.data, ensure_ascii=False)
cls.__send_messages_to_broker([json_msg], routing_key, headers=headers)
elif objects_queryset:
logger.info(f"Publishing update for {objects_queryset.count()} objects and routing_key={routing_key}")
serializer = serializer_cls(objects_queryset, many=True)
json_msgs: list = [json.dumps(item, ensure_ascii=False) for item in serializer.data]
cls.__send_messages_to_broker(json_msgs, routing_key, headers=headers)
@classmethod
def publish_user_update(cls, updated_object: any = None, objects_queryset: QuerySet = None):
if settings.TESTING_MODE: return
routing_key: str = settings.PACER_RMQ_USERS_ROUTING_KEY
serializer: Serializer = import_string('user.serializers.profile.PACERUserProfileSerializer')
headers: dict = {"source": "uos", "content-type": "application/json"}
cls.__publish_generic_object_update(serializer, routing_key, headers, updated_object, objects_queryset)