-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
74 lines (68 loc) · 4.23 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Transport.PostgreSQL.Basic;
using SampleShared;
using Serilog;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
namespace PostGreSQLConsumer
{
class Program
{
static void Main(string[] args)
{
//we are using serilog for sample purposes
var log = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] [{SourceContext}] {Message:lj}{NewLine}{Exception}")
.MinimumLevel.Debug()
.CreateLogger();
Log.Logger = log;
log.Information("Startup");
log.Information(SharedConfiguration.AllSettings);
//verify that the queue exists
var queueName = ConfigurationManager.AppSettings.ReadSetting("QueueName");
var connectionString = ConfigurationManager.AppSettings.ReadSetting("Database");
var queueConnection = new QueueConnection(queueName, connectionString);
using (var createQueueContainer = new QueueCreationContainer<PostgreSqlMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "PostgreSqlConsumer", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var createQueue =
createQueueContainer.GetQueueCreation<PostgreSqlMessageQueueCreation>(queueConnection))
{
if (!createQueue.QueueExists)
{
//the consumer can't do anything if the queue hasn't been created
Log.Error($"Could not find {connectionString}. Verify that you have run the producer, which will create the queue");
return;
}
}
}
using (var queueContainer = new QueueContainer<PostgreSqlMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "PostgreSqlConsumer", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var queue = queueContainer.CreateConsumer(queueConnection))
{
//set some processing options and start looking for work
queue.Configuration.Worker.WorkerCount = 4; //lets run 4 worker threads
queue.Configuration.HeartBeat.UpdateTime = "sec(*%10)"; //set a heartbeat every 10 seconds
queue.Configuration.HeartBeat.MonitorTime = TimeSpan.FromSeconds(15); //check for dead records every 15 seconds
queue.Configuration.HeartBeat.Time = TimeSpan.FromSeconds(35); //records with no heartbeat after 35 seconds are considered dead
//an invalid data exception will be re-tried 3 times, with delays of 3, 6 and then finally 9 seconds
queue.Configuration.TransportConfiguration.RetryDelayBehavior.Add(typeof(InvalidDataException), new List<TimeSpan> { TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(6), TimeSpan.FromSeconds(9) });
queue.Configuration.MessageExpiration.Enabled = true;
queue.Configuration.MessageExpiration.MonitorTime = TimeSpan.FromSeconds(20); //check for expired messages every 20 seconds
queue.Start<SimpleMessage>(MessageProcessing.HandleMessages, CreateNotifications.Create(log));
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}
//if jaeger is using udp, sometimes the messages get lost; there doesn't seem to be a flush() call ?
if (SharedConfiguration.EnableTrace)
System.Threading.Thread.Sleep(2000);
}
}
}