-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
106 lines (96 loc) · 6.11 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using DotNetWorkQueue;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.Transport.SQLite.Basic;
using SampleShared;
using Serilog;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
namespace SQLiteConsumerLinq
{
class Program
{
static void Main(string[] args)
{
//we are using serilog for sample purposes
var log = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] [{SourceContext}] {Message:lj}{NewLine}{Exception}")
.MinimumLevel.Debug()
.CreateLogger();
Log.Logger = log;
log.Information("Startup");
log.Information(SharedConfiguration.AllSettings);
//verify that the queue exists
var fileLocation = Path.Combine(Environment.ExpandEnvironmentVariables("%userprofile%"), "Documents");
var queueName = ConfigurationManager.AppSettings.ReadSetting("QueueName");
var connectionString =
$"Data Source={fileLocation}{ConfigurationManager.AppSettings.ReadSetting("Database")};Version=3;";
var queueConnection = new QueueConnection(queueName, connectionString);
using (var createQueueContainer = new QueueCreationContainer<SqLiteMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics,
SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteConsumerLinq",
serviceRegister), options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var createQueue =
createQueueContainer.GetQueueCreation<SqLiteMessageQueueCreation>(queueConnection))
{
if (!createQueue.QueueExists)
{
//the consumer can't do anything if the queue hasn't been created
Log.Error(
$"Could not find {connectionString}. Verify that you have run the producer, which will create the queue");
return;
}
}
}
using (var schedulerContainer = new SchedulerContainer(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics,
SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteConsumerLinq",
serviceRegister), options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var scheduler = schedulerContainer.CreateTaskScheduler())
{
var factory = schedulerContainer.CreateTaskFactory(scheduler);
factory.Scheduler.Configuration.MaximumThreads = 8; //8 background threads
//note - the same factory can be passed to multiple queue instances - don't dispose the scheduler container until all queues have finished
factory.Scheduler.Start(); //the scheduler must be started before passing it to a queue
using (var queueContainer = new QueueContainer<SqLiteMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics,
SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption,
"SQLiteConsumerLinq", serviceRegister), options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var queue =
queueContainer.CreateConsumerMethodQueueScheduler(queueConnection, factory))
{
//set some processing options and start looking for work
//in the async model, the worker count is how many threads are querying the queue - the scheduler runs the work
queue.Configuration.Worker.WorkerCount =
1; //lets just run 1 thread that queries the database
queue.Configuration.HeartBeat.UpdateTime = "sec(*%10)"; //set a heartbeat every 10 seconds
queue.Configuration.HeartBeat.MonitorTime =
TimeSpan.FromSeconds(15); //check for dead records every 15 seconds
queue.Configuration.HeartBeat.Time =
TimeSpan.FromSeconds(
35); //records with no heartbeat after 35 seconds are considered dead
//an invalid data exception will be re-tried 3 times, with delays of 3, 6 and then finally 9 seconds
queue.Configuration.TransportConfiguration.RetryDelayBehavior.Add(
typeof(InvalidDataException),
new List<TimeSpan>
{TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(6), TimeSpan.FromSeconds(9)});
queue.Configuration.MessageExpiration.Enabled = true;
queue.Configuration.MessageExpiration.MonitorTime =
TimeSpan.FromSeconds(20); //check for expired messages every 20 seconds
queue.Start(CreateNotifications.Create(log));
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
//if jaeger is using udp, sometimes the messages get lost; there doesn't seem to be a flush() call ?
if (SharedConfiguration.EnableTrace)
System.Threading.Thread.Sleep(2000);
}
}
}
}
}
}
}