1
+ package main
2
+
3
+ import (
4
+ "context"
5
+ "fmt"
6
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
7
+ "github.com/jitsucom/bulker/jitsubase/appbase"
8
+ "github.com/jitsucom/bulker/jitsubase/utils"
9
+ "github.com/jitsucom/bulker/kafkabase"
10
+ "net/http"
11
+ "time"
12
+ )
13
+
14
+ type Context struct {
15
+ config * Config
16
+ kafkaConfig * kafka.ConfigMap
17
+ repository appbase.Repository [Streams ]
18
+ producer * kafkabase.Producer
19
+ server * http.Server
20
+ reprocessingManager * ReprocessingJobManager
21
+ }
22
+
23
+ func (a * Context ) InitContext (settings * appbase.AppSettings ) error {
24
+ var err error
25
+ a .config = & Config {}
26
+ err = appbase .InitAppConfig (a .config , settings )
27
+ if err != nil {
28
+ return err
29
+ }
30
+
31
+ // Initialize repository
32
+ a .repository = NewStreamsRepository (a .config .RepositoryURL , a .config .RepositoryAuthToken , a .config .RepositoryRefreshPeriodSec , a .config .CacheDir )
33
+
34
+ // Initialize Kafka config
35
+ a .kafkaConfig = a .config .GetKafkaConfig ()
36
+
37
+ // Initialize producer
38
+ producerConfig := kafka .ConfigMap (utils .MapPutAll (kafka.ConfigMap {
39
+ "queue.buffering.max.messages" : a .config .ProducerQueueSize ,
40
+ "batch.size" : a .config .ProducerBatchSize ,
41
+ "linger.ms" : a .config .ProducerLingerMs ,
42
+ "compression.type" : a .config .KafkaTopicCompression ,
43
+ }, * a .kafkaConfig ))
44
+
45
+ a .producer , err = kafkabase .NewProducer (& a .config .KafkaConfig , & producerConfig , true , nil )
46
+ if err != nil {
47
+ return err
48
+ }
49
+ a .producer .Start ()
50
+
51
+ // Initialize reprocessing manager
52
+ a .reprocessingManager , err = NewReprocessingJobManager (a .producer , a .repository , a .config )
53
+ if err != nil {
54
+ return fmt .Errorf ("failed to initialize reprocessing manager: %w" , err )
55
+ }
56
+
57
+ // Initialize HTTP server with router
58
+ router := NewRouter (a )
59
+ a .server = & http.Server {
60
+ Addr : fmt .Sprintf ("0.0.0.0:%d" , a .config .HTTPPort ),
61
+ Handler : router .Engine (),
62
+ ReadTimeout : time .Second * 5 ,
63
+ ReadHeaderTimeout : time .Second * 5 ,
64
+ IdleTimeout : time .Second * 65 ,
65
+ }
66
+
67
+ return nil
68
+ }
69
+
70
+ func (a * Context ) Cleanup () error {
71
+ // Close reprocessing manager first to cancel running jobs and log final status
72
+ if a .reprocessingManager != nil {
73
+ _ = a .reprocessingManager .Close ()
74
+ }
75
+
76
+ // Close producer
77
+ if a .producer != nil {
78
+ _ = a .producer .Close ()
79
+ }
80
+
81
+ // Shutdown HTTP server
82
+ if a .server != nil {
83
+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
84
+ defer cancel ()
85
+ _ = a .server .Shutdown (ctx )
86
+ }
87
+
88
+ return nil
89
+ }
90
+
91
+ func (a * Context ) ShutdownSignal () error {
92
+ a .server .SetKeepAlivesEnabled (false )
93
+ _ = a .server .Shutdown (context .Background ())
94
+ return nil
95
+ }
96
+
97
+ func (a * Context ) Server () * http.Server {
98
+ return a .server
99
+ }
100
+
101
+ func (a * Context ) Config () * Config {
102
+ return a .config
103
+ }
0 commit comments