-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
131 lines (121 loc) · 6.03 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
using System;
using System.Configuration;
using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Messages;
using DotNetWorkQueue.Transport.SqlServer;
using DotNetWorkQueue.Transport.SqlServer.Basic;
using DotNetWorkQueue.Transport.SqlServer.Schema;
using SampleShared;
using Serilog;
namespace SQLServerProducer
{
class Program
{
private static bool _userData;
static void Main(string[] args)
{
//we are using serilog for sample purposes
var log = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] [{SourceContext}] {Message:lj}{NewLine}{Exception}")
.MinimumLevel.Debug()
.CreateLogger();
Log.Logger = log;
log.Information("Startup");
log.Information(SharedConfiguration.AllSettings);
var queueName = ConfigurationManager.AppSettings.ReadSetting("QueueName");
var connectionString = ConfigurationManager.AppSettings.ReadSetting("Database");
var queueConnection = new QueueConnection(queueName, connectionString);
//create the container for creating a new queue
using (var createQueueContainer = new QueueCreationContainer<SqlServerMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLServerProducer", serviceRegister),
options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var createQueue =
createQueueContainer.GetQueueCreation<SqlServerMessageQueueCreation>(queueConnection))
{
var enabledUserColumns = ConfigurationManager.AppSettings.ReadSetting("UseUserDequeue");
if (bool.Parse(enabledUserColumns))
{
_userData = true;
}
//Create the queue if it doesn't exist
if (!createQueue.QueueExists)
{
//queue options
createQueue.Options.EnableDelayedProcessing = true;
createQueue.Options.EnableHeartBeat = true;
createQueue.Options.EnableMessageExpiration = true;
createQueue.Options.EnableStatus = true;
createQueue.Options.EnableStatusTable = true;
if (!string.IsNullOrEmpty(enabledUserColumns) && bool.Parse(enabledUserColumns))
{
createQueue.Options.AdditionalColumnsOnMetaData = true;
createQueue.Options.AdditionalColumns.Add(new Column("DayOfWeek", ColumnTypes.Int, true, null));
}
var result = createQueue.CreateQueue();
log.Information(result.Status.ToString());
}
else log.Warning("Queue already exists; not creating; note that any setting changes won't be applied");
}
}
//create the producer
using (var queueContainer = new QueueContainer<SqlServerMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLServerProducer", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection))
{
using (var admin = queueContainer.CreateAdminApi())
{
admin.AddQueueConnection(queueContainer, queueConnection);
RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin);
}
}
}
//if jaeger is using udp, sometimes the messages get lost; there doesn't seem to be a flush() call ?
if (SharedConfiguration.EnableTrace)
System.Threading.Thread.Sleep(2000);
}
/// <summary>
/// Creates an expired message by having it expire 1 second in the future and delaying processing for 5 seconds
/// </summary>
/// <returns></returns>
private static IAdditionalMessageData ExpiredData()
{
var data = new AdditionalMessageData();
data.SetExpiration(TimeSpan.FromSeconds(1));
data.SetDelay(TimeSpan.FromSeconds(5));
if (_userData)
{
data.AdditionalMetaData.Add(new AdditionalMetaData<int>("DayOfWeek", Convert.ToInt32(DateTime.Today.DayOfWeek)));
}
return data;
}
/// <summary>
/// Creates an expired message 24 hours from now
/// </summary>
/// <returns></returns>
private static IAdditionalMessageData ExpiredDataFuture()
{
var data = new AdditionalMessageData();
data.SetExpiration(TimeSpan.FromDays(1));
if (_userData)
{
data.AdditionalMetaData.Add(new AdditionalMetaData<int>("DayOfWeek", Convert.ToInt32(DateTime.Today.DayOfWeek)));
}
return data;
}
private static IAdditionalMessageData DelayedProcessing(int seconds)
{
var data = new AdditionalMessageData();
data.SetDelay(TimeSpan.FromSeconds(seconds));
data.SetExpiration(TimeSpan.FromDays(1));
if (_userData)
{
data.AdditionalMetaData.Add(new AdditionalMetaData<int>("DayOfWeek", Convert.ToInt32(DateTime.Today.DayOfWeek)));
}
return data;
}
}
}