-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
136 lines (119 loc) · 2.9 KB
/
main.go
File metadata and controls
136 lines (119 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
/* Idea :
// get all variables
// initialize all contexts
// consume message
// produce to topic
Impl :
// Consumer : Consumes message
// and calls Producer to produce consumed message
*/
/*
var (
chost = "localhost:9092"
phost = "localhost:9092"
ctopic = "consum"
ptopic = "produce"
)
*/
var (
chost string
phost string
ctopic string
ptopic string
)
func produce(phost string, producer sarama.SyncProducer, toproducemsg []byte) bool {
msg := &sarama.ProducerMessage{
Topic: ptopic,
Value: sarama.StringEncoder(toproducemsg),
}
_, _, err := producer.SendMessage(msg)
if err != nil {
log.Print(err)
return false
}
return true
}
func main() {
chos := flag.String("sh", "", "Source Host : List of source hosts and port: broker1:9092,broker2:9092")
ctop := flag.String("st", "", "Source Topic : Topic info of source: source-topic")
phos := flag.String("dh", "", "Desitnation Host : List of destination hosts and port: broker1:9092,broker2:9092")
ptop := flag.String("dt", "", "Desitnation Topic : Topic info of destination: destn-topic")
flag.Parse()
chost = *chos
phost = *phos
ctopic = *ctop
ptopic = *ptop
if phost == "" || chost == "" || ctopic == "" || ptopic == "" {
flag.Usage()
os.Exit(1)
}
// INITIATING PRODUCER CONFIGS
pconfig := sarama.NewConfig()
pconfig.Producer.Return.Successes = true
pconfig.Producer.Return.Errors = true
pconfig.Producer.RequiredAcks = sarama.WaitForAll
pconfig.Producer.Retry.Max = 5
pbrokers := []string{phost}
producer, err := sarama.NewSyncProducer(pbrokers, pconfig)
if err != nil {
log.Print(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Print(err)
}
}()
// INITATING CONSUMER CONFIGS
cconfig := sarama.NewConfig()
cconfig.Consumer.Return.Errors = true
brokers := []string{chost}
master, err := sarama.NewConsumer(brokers, cconfig)
if err != nil {
log.Print(err)
}
defer func() {
if err := master.Close(); err != nil {
log.Print(err)
}
}()
consumer, err := master.ConsumePartition(ctopic, 0, sarama.OffsetOldest)
if err != nil {
log.Print(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
msgCount := 0
errorCount := 0
doneCh := make(chan struct{})
log.Println("Estb. connection between kafka end-points. Starting the kafka streams..")
go func() {
for {
select {
case err := <-consumer.Errors():
log.Print(err)
case msg := <-consumer.Messages():
if produce(phost, producer, msg.Value) {
msgCount++
} else {
errorCount++
}
case <-signals:
log.Print("Stopping the streams..")
log.Printf("Total messages processed successfully : %d \n", msgCount)
log.Printf("Total messages failed : %d \n", errorCount)
doneCh <- struct{}{}
}
}
}()
<-doneCh
fmt.Println("Total count of messages processed : ", msgCount, "messages")
}