-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
105 lines (97 loc) · 4.76 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
using System;
using System.Configuration;
using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Messages;
using DotNetWorkQueue.Transport.PostgreSQL;
using DotNetWorkQueue.Transport.PostgreSQL.Basic;
using SampleShared;
using Serilog;
namespace PostgreSQLProducer
{
class Program
{
static void Main(string[] args)
{
//we are using serilog for sample purposes.
var log = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] [{SourceContext}] {Message:lj}{NewLine}{Exception}")
.MinimumLevel.Debug()
.CreateLogger();
Log.Logger = log;
log.Information("Startup");
log.Information(SharedConfiguration.AllSettings);
var queueName = ConfigurationManager.AppSettings.ReadSetting("QueueName");
var connectionString = ConfigurationManager.AppSettings.ReadSetting("Database");
var queueConnection = new QueueConnection(queueName, connectionString);
//create the container for creating a new queue
using (var createQueueContainer = new QueueCreationContainer<PostgreSqlMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "PostgreSqlProducer", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var createQueue =
createQueueContainer.GetQueueCreation<PostgreSqlMessageQueueCreation>(queueConnection))
{
//Create the queue if it doesn't exist
if (!createQueue.QueueExists)
{
//queue options
createQueue.Options.EnableDelayedProcessing = true;
createQueue.Options.EnableHeartBeat = true;
createQueue.Options.EnableMessageExpiration = true;
createQueue.Options.EnableStatus = true;
createQueue.Options.EnableStatusTable = true;
var result = createQueue.CreateQueue();
log.Information(result.Status.ToString());
}
else log.Information("Queue already exists; not creating");
}
}
//create the producer
using (var queueContainer = new QueueContainer<PostgreSqlMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "PostgreSqlProducer", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection))
{
using (var admin = queueContainer.CreateAdminApi())
{
admin.AddQueueConnection(queueContainer, queueConnection);
RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin);
}
}
}
//if jaeger is using udp, sometimes the messages get lost; there doesn't seem to be a flush() call ?
if (SharedConfiguration.EnableTrace)
System.Threading.Thread.Sleep(2000);
}
/// <summary>
/// Creates an expired message by having it expire 1 second in the future and delaying processing for 5 seconds
/// </summary>
/// <returns></returns>
private static IAdditionalMessageData ExpiredData()
{
var data = new AdditionalMessageData();
data.SetExpiration(TimeSpan.FromSeconds(1));
data.SetDelay(TimeSpan.FromSeconds(5));
return data;
}
/// <summary>
/// Creates an expired message 24 hours from now
/// </summary>
/// <returns></returns>
private static IAdditionalMessageData ExpiredDataFuture()
{
var data = new AdditionalMessageData();
data.SetExpiration(TimeSpan.FromDays(1));
return data;
}
private static IAdditionalMessageData DelayedProcessing(int seconds)
{
var data = new AdditionalMessageData();
data.SetDelay(TimeSpan.FromSeconds(seconds));
data.SetExpiration(TimeSpan.FromDays(1));
return data;
}
}
}