Skip to content

Commit 67b7794

Browse files
feat: added poller and streamer components
1 parent 33cf519 commit 67b7794

File tree

12 files changed

+909
-6
lines changed

12 files changed

+909
-6
lines changed

internal/common/common.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ type Monitors struct {
8888
}
8989

9090
type Monitor interface {
91-
Connect(url string) error
91+
Start() error
9292
GetAssetQuotes(useCache ...bool) []AssetQuote
9393
SetSymbols(symbols []string)
94-
Close()
94+
Stop() error
9595
}
9696

9797
// Lot represents a cost basis lot
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package monitorCoinbase
2+
3+
import (
4+
"context"
5+
"slices"
6+
"sync"
7+
"time"
8+
9+
"github.com/achannarasappa/ticker/v4/internal/common"
10+
poller "github.com/achannarasappa/ticker/v4/internal/monitor/coinbase/poller"
11+
streamer "github.com/achannarasappa/ticker/v4/internal/monitor/coinbase/streamer"
12+
unary "github.com/achannarasappa/ticker/v4/internal/monitor/coinbase/unary"
13+
resty "github.com/go-resty/resty/v2"
14+
)
15+
16+
type MonitorCoinbase struct {
17+
unaryAPI *unary.UnaryAPI
18+
streamer *streamer.Streamer
19+
poller *poller.Poller
20+
unary *unary.UnaryAPI
21+
input input
22+
symbols []string
23+
assetQuotes []common.AssetQuote
24+
mu sync.Mutex
25+
ctx context.Context
26+
cancel context.CancelFunc
27+
}
28+
29+
type unaryAPI struct {
30+
symbols []string
31+
client resty.Client
32+
}
33+
34+
type input struct {
35+
symbols []string
36+
symbolsUnderlying []string
37+
}
38+
39+
// Config contains the required configuration for the Coinbase monitor
40+
type Config struct {
41+
Client resty.Client
42+
OnUpdate func()
43+
}
44+
45+
// Option defines an option for configuring the monitor
46+
type Option func(*MonitorCoinbase)
47+
48+
func NewMonitorCoinbase(config Config, opts ...Option) *MonitorCoinbase {
49+
50+
ctx, cancel := context.WithCancel(context.Background())
51+
52+
unaryAPI := unary.NewUnaryAPI(config.Client)
53+
54+
monitor := &MonitorCoinbase{
55+
streamer: streamer.NewStreamer(ctx),
56+
poller: poller.NewPoller(ctx, unaryAPI),
57+
unaryAPI: unaryAPI,
58+
ctx: ctx,
59+
cancel: cancel,
60+
}
61+
62+
for _, opt := range opts {
63+
opt(monitor)
64+
}
65+
66+
return monitor
67+
}
68+
69+
// WithSymbolsUnderlying sets the underlying symbols for the monitor
70+
func WithSymbolsUnderlying(symbols []string) Option {
71+
return func(m *MonitorCoinbase) {
72+
m.input.symbolsUnderlying = symbols
73+
}
74+
}
75+
76+
// WithStreamingURL sets the streaming URL for the monitor
77+
func WithStreamingURL(url string) Option {
78+
return func(m *MonitorCoinbase) {
79+
m.streamer.SetURL(url)
80+
}
81+
}
82+
83+
// WithRefreshInterval sets the refresh interval for the monitor
84+
func WithRefreshInterval(interval time.Duration) Option {
85+
return func(m *MonitorCoinbase) {
86+
m.poller.SetRefreshInterval(interval)
87+
}
88+
}
89+
90+
func (m *MonitorCoinbase) GetAssetQuotes(ignoreCache ...bool) []common.AssetQuote {
91+
92+
if len(ignoreCache) > 0 && ignoreCache[0] {
93+
return m.unaryAPI.GetAssetQuotes(m.symbols).AssetQuotes
94+
}
95+
96+
return m.assetQuotes
97+
}
98+
99+
func (m *MonitorCoinbase) SetSymbols(symbols []string) {
100+
101+
symbolsMerged := make([]string, 0)
102+
symbolsMerged = append(symbolsMerged, m.input.symbolsUnderlying...)
103+
symbolsMerged = append(symbolsMerged, symbols...)
104+
slices.Sort(symbolsMerged)
105+
symbolsMerged = slices.Compact(symbolsMerged)
106+
107+
m.input.symbols = symbols
108+
m.symbols = symbolsMerged
109+
110+
// Execute one unary API call to get data not sent by streaming API and set initial prices
111+
assetQuotesIndexed := m.unaryAPI.GetAssetQuotes(m.symbols) // TODO: update to return and handle error
112+
m.assetQuotes = assetQuotesIndexed.AssetQuotes // TODO: Add saving indexed quotes
113+
114+
// Coinbase steaming API for CBE (spot) only and not CDE (futures)
115+
m.streamer.SetSymbolsAndUpdateSubscriptions(symbols) // TODO: update to return and handle error
116+
117+
}
118+
119+
// Start the monitor
120+
func (m *MonitorCoinbase) Start() error {
121+
122+
var err error
123+
124+
err = m.streamer.Start()
125+
if err != nil {
126+
return err
127+
}
128+
129+
err = m.poller.Start()
130+
if err != nil {
131+
return err
132+
}
133+
134+
return nil
135+
}
136+
137+
func (m *MonitorCoinbase) Stop() error {
138+
139+
m.cancel()
140+
return nil
141+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package monitorCoinbase_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/go-resty/resty/v2"
7+
"github.com/jarcoal/httpmock"
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
)
11+
12+
var client = resty.New()
13+
14+
var _ = BeforeSuite(func() {
15+
httpmock.ActivateNonDefault(client.GetClient())
16+
})
17+
18+
var _ = BeforeEach(func() {
19+
httpmock.Reset()
20+
})
21+
22+
var _ = AfterSuite(func() {
23+
httpmock.DeactivateAndReset()
24+
})
25+
26+
func TestCoinbase(t *testing.T) {
27+
RegisterFailHandler(Fail)
28+
RunSpecs(t, "Coinbase Suite")
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package monitorCoinbase_test
2+
3+
// import (
4+
// "time"
5+
6+
// "github.com/go-resty/resty/v2"
7+
// . "github.com/onsi/ginkgo/v2"
8+
// . "github.com/onsi/gomega"
9+
10+
// monitorCoinbase "github.com/achannarasappa/ticker/v4/internal/monitor/coinbase"
11+
// )
12+
13+
// var _ = Describe("Monitor Coinbase", func() {
14+
15+
// Describe("NewMonitorCoinbase", func() {
16+
// PIt("should return a new MonitorCoinbase", func() {
17+
// monitor := monitorCoinbase.NewMonitorCoinbase(time.Second, *resty.New(), []string{"BTC-USD"}, func() {})
18+
19+
// Expect(monitor).NotTo(BeNil())
20+
// })
21+
// })
22+
23+
// Describe("GetAssetQuotes", func() {
24+
// PIt("should return the asset quotes", func() {
25+
// monitor := monitorCoinbase.NewMonitorCoinbase(time.Second, *resty.New(), []string{"BTC-USD"}, func() {})
26+
27+
// quotes := monitor.GetAssetQuotes()
28+
// Expect(quotes).NotTo(BeEmpty())
29+
// })
30+
// })
31+
// })
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package monitorCoinbase
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/achannarasappa/ticker/v4/internal/monitor/coinbase/unary"
9+
)
10+
11+
type Poller struct {
12+
refreshInterval time.Duration
13+
symbols []string
14+
isStarted bool
15+
ctx context.Context
16+
cancel context.CancelFunc
17+
unaryAPI *unary.UnaryAPI
18+
}
19+
20+
func NewPoller(ctx context.Context, unaryAPI *unary.UnaryAPI) *Poller {
21+
ctx, cancel := context.WithCancel(ctx)
22+
23+
return &Poller{
24+
refreshInterval: 0,
25+
isStarted: false,
26+
ctx: ctx,
27+
cancel: cancel,
28+
unaryAPI: unaryAPI,
29+
}
30+
}
31+
32+
func (p *Poller) SetSymbols(symbols []string) {
33+
p.symbols = symbols
34+
}
35+
36+
func (p *Poller) SetRefreshInterval(interval time.Duration) error {
37+
38+
if p.isStarted {
39+
return fmt.Errorf("cannot set refresh interval while poller is started")
40+
}
41+
42+
p.refreshInterval = interval
43+
return nil
44+
}
45+
46+
func (p *Poller) Start() error {
47+
if p.isStarted {
48+
return fmt.Errorf("poller already started")
49+
}
50+
51+
if p.refreshInterval <= 0 {
52+
return fmt.Errorf("refresh interval is not set")
53+
}
54+
55+
if len(p.symbols) == 0 {
56+
return fmt.Errorf("symbols are not set")
57+
}
58+
59+
p.isStarted = true
60+
61+
// Start polling goroutine
62+
go func() {
63+
ticker := time.NewTicker(p.refreshInterval)
64+
defer ticker.Stop()
65+
66+
// Initial poll
67+
p.unaryAPI.GetAssetQuotes(p.symbols)
68+
69+
for {
70+
select {
71+
case <-ticker.C:
72+
p.unaryAPI.GetAssetQuotes(p.symbols)
73+
case <-p.ctx.Done():
74+
return
75+
}
76+
}
77+
}()
78+
79+
return nil
80+
}

0 commit comments

Comments
 (0)