-
Notifications
You must be signed in to change notification settings - Fork 62
Enrichments
ACE uses the RabbitMQ messaging system to provide enrichment. For more information about RabbitMQ and a better understanding, they offer a great tutorial in numerous different languages (RabbitMQ Tutorials). ACE currently uses C# for RabbitMQ, so all code samples provided will be in C#.
In order to extend the current enrichment capability of ACE, there are several necessary steps:
- Creating a Queue
- Creating a Binding
- Creating an Enrichment Consumer
The following sections will cover each step in detail and provide a C# code sample
ACE uses queues to store the messages in transition. For each additional enrichment or new ACE output method, it will be necessary to create a new queue. Queues may be created with the following options:
- durable - Survives RabbitMQ Server stop/reboot
- exclusive - Used only by one connection, deleted on close
- autoDelete - Deletes after last consumer disconnects
- arguments - Optional for additional capability
Example code to create a new queue in C#
//Creating new queue
channel.QueueDeclare(queue: "queue_name",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
In order to properly route the messages from the exchange to the correct queue, a queue binding must be created. When creating the queue binding, the following are specified:
- queue - Name of the queue to route matching messages
- exchange - ACE is designed to always use the "ace_exchange"
- routingKey - Specifies the conditions for routing, for adding enrichments it would be the enrichment name followed by ".#" This matches any routing key starting with the enrichment and having any number of routing keys after
Example code to create a new binding in C#
//Creating new exchange binding for new enrichment
channel.QueueBind(queue: "queue_name",
exchange: "ace_exchange",
routingKey: "newEnrichment.#");
The final step is to create the consumer, which will actually perform the specified enrichment, The general process for creating the consumer:
- Obtain values from message
- Parse the message from JSON
- Using message information, perform enrichment actions (such as hash lookups)
- Reformat message back to JSON object
- Remove enrichment from RoutingKey
- Publish enriched message back to the "ace_exchange" for further processing
Example code to create a new enrichment consumer in C#
// Create new consumer object
EventingBasicConsumer newEnrichmentConsumer = new EventingBasicConsumer(channel);
newEnrichmentConsumer.Received += (model, ea) =>
{
// Values received in message
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
// New Enrichment Action
try
{
//Parse JSON
JObject originalMessage = JObject.Parse(message);
// Perform desired enrichment action
// Add new data to message
originalMessage.Add("Name1", "Value");
originalMessage.Add("Name2", "Value2");
// Recreate JSON for export
string enrichedMessage = originalMessage.ToString(Newtonsoft.Json.Formatting.None);
body = Encoding.UTF8.GetBytes(enrichedMessage);
}
catch (Exception e)
{
Console.WriteLine("Exception" + e);
}
// parse enrichment off front of routing key
string[] words = routingKey.Split('.');
words = words.Skip(1).ToArray();
routingKey = string.Join(".", words);
// Ack recieving the message from the queue
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
// Publish new enriched message back to ACE exchange for routing
channel.BasicPublish(exchange: "ace_exchange",
routingKey: routingKey,
basicProperties: null,
body: body);
};