-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
168 lines (157 loc) · 8.8 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
using SampleShared;
using Serilog;
using System;
using System.IO;
using DotNetWorkQueue;
using DotNetWorkQueue.Transport.SQLite.Basic;
using System.Configuration;
using DotNetWorkQueue.Configuration;
namespace SQliteScheduler
{
class Program
{
static void Main(string[] args)
{
//we are using serilog for sample purposes
var log = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] [{SourceContext}] {Message:lj}{NewLine}{Exception}")
.MinimumLevel.Debug()
.CreateLogger();
Log.Logger = log;
log.Information("Startup");
log.Information(SharedConfiguration.AllSettings);
//verify that the queue exists
var fileLocation = Path.Combine(Environment.ExpandEnvironmentVariables("%userprofile%"), "Documents");
var queueName = ConfigurationManager.AppSettings.ReadSetting("QueueName");
var connectionString =
$"Data Source={fileLocation}{ConfigurationManager.AppSettings.ReadSetting("Database")};Version=3;";
var queueConnection = new QueueConnection(queueName, connectionString);
using (var jobQueueCreation =
new JobQueueCreationContainer<SqLiteMessageQueueInit>(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteScheduler", serviceRegister),
options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var createQueue =
jobQueueCreation.GetQueueCreation<SqliteJobQueueCreation>(queueConnection))
{
//queue options
createQueue.Options.EnableDelayedProcessing = true;
createQueue.Options.EnableHeartBeat = true;
createQueue.Options.EnableMessageExpiration = true;
createQueue.Options.EnableStatus = true;
createQueue.Options.EnableStatusTable = true;
var result = createQueue.CreateJobSchedulerQueue(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteScheduler", serviceRegister), queueConnection,
options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos), false);
log.Information(result.Status.ToString());
}
}
using (var jobContainer = new JobSchedulerContainer(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteScheduler", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
using (var scheduler = jobContainer.CreateJobScheduler(serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteScheduler", serviceRegister),
serviceRegister =>
Injectors.AddInjectors(Helpers.CreateForSerilog(), SharedConfiguration.EnableTrace, SharedConfiguration.EnableMetrics, SharedConfiguration.EnableCompression, SharedConfiguration.EnableEncryption, "SQLiteScheduler", serviceRegister)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)
, options => Injectors.SetOptions(options, SharedConfiguration.EnableChaos)))
{
//start may be called before or after adding jobs
scheduler.Start();
var keepRunning = true;
IScheduledJob job1 = null;
IScheduledJob job2 = null;
IScheduledJob job3 = null;
while (keepRunning)
{
Console.WriteLine(@"a) Schedule job1
b) Schedule job2
c) Schedule job3
d) View scheduled jobs
e) Remove job1
f) Remove job2
g) Remove job3
q) Quit");
var key = char.ToLower(Console.ReadKey(true).KeyChar);
try
{
switch (key)
{
case 'a':
job1 = scheduler.AddUpdateJob<SqLiteMessageQueueInit, SqliteJobQueueCreation>("test job1",
queueConnection,
"sec(0,5,10,15,20,25,30,35,40,45,50,55)",
(message, workerNotification) => Console.WriteLine("test job1 " + message.MessageId.Id.Value));
log.Information("job scheduled");
break;
case 'b':
job2 = scheduler.AddUpdateJob<SqLiteMessageQueueInit, SqliteJobQueueCreation>("test job2",
queueConnection,
"min(*)",
(message, workerNotification) => Console.WriteLine("test job2 " + message.MessageId.Id.Value));
log.Information("job scheduled");
break;
case 'c':
job3 = scheduler.AddUpdateJob<SqLiteMessageQueueInit, SqliteJobQueueCreation>("test job3",
queueConnection,
"sec(30)",
(message, workerNotification) => Console.WriteLine("test job3 " + message.MessageId.Id.Value));
log.Information("job scheduled");
break;
case 'd':
var jobs = scheduler.GetAllJobs();
foreach (var job in jobs)
{
Log.Information("Job: {@job}", job);
}
break;
case 'e':
if (job1 != null)
{
job1.StopSchedule();
if (scheduler.RemoveJob(job1.Name))
{
job1 = null;
log.Information("job removed");
}
}
break;
case 'f':
if (job2 != null)
{
job2.StopSchedule();
if (scheduler.RemoveJob(job2.Name))
{
job2 = null;
log.Information("job removed");
}
}
break;
case 'g':
if (job3 != null)
{
job3.StopSchedule();
if (scheduler.RemoveJob(job3.Name))
{
job3 = null;
log.Information("job removed");
}
}
break;
case 'q':
Console.WriteLine("Quitting");
keepRunning = false;
break;
}
}
catch (Exception e)
{
log.Error(e, "Failed");
}
}
}
}
}
}
}