Skip to content

Commit 6a5050f

Browse files
feat: added yahoo monitor base without session management and currency conversion
1 parent 932dfc5 commit 6a5050f

File tree

16 files changed

+1786
-4
lines changed

16 files changed

+1786
-4
lines changed

internal/monitor/coinbase/monitor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,10 +757,10 @@ var _ = Describe("Monitor Coinbase", func() {
757757
),
758758
)
759759

760-
// Set up a channel to detect if onUpdate is called
760+
// Detect if the onUpdate function is called
761761
outputCalled := false
762762

763-
// Create a monitor with a short refresh interval for testing
763+
// Create a monitor with a short refresh interval
764764
monitor := monitorCoinbase.NewMonitorCoinbase(monitorCoinbase.Config{
765765
UnaryURL: server.URL(),
766766
}, monitorCoinbase.WithRefreshInterval(100*time.Millisecond))

internal/monitor/monitor.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55

66
c "github.com/achannarasappa/ticker/v4/internal/common"
77
monitorCoinbase "github.com/achannarasappa/ticker/v4/internal/monitor/coinbase"
8+
monitorYahoo "github.com/achannarasappa/ticker/v4/internal/monitor/yahoo"
89
"github.com/go-resty/resty/v2"
910
"github.com/gorilla/websocket"
1011
)
1112

1213
// Monitor represents a Coinbase market data monitor
1314
type Monitor struct {
14-
monitors map[c.QuoteSource]c.Monitor
15+
monitors map[c.QuoteSource]c.Monitor
16+
chanError chan error
1517
}
1618

1719
type ConfigMonitor struct {
@@ -28,20 +30,34 @@ type ConfigUpdateFns struct {
2830

2931
// New creates a new instance of the Coinbase monitor
3032
func NewMonitor(configMonitor ConfigMonitor) (*Monitor, error) {
33+
34+
chanError := make(chan error, 5)
35+
3136
var coinbase *monitorCoinbase.MonitorCoinbase
3237
coinbase = monitorCoinbase.NewMonitorCoinbase(
3338
monitorCoinbase.Config{
3439
UnaryURL: "https://api.coinbase.com",
35-
ChanError: make(chan error, 5),
40+
ChanError: chanError,
3641
},
3742
monitorCoinbase.WithStreamingURL("wss://ws-feed.exchange.coinbase.com"),
3843
monitorCoinbase.WithRefreshInterval(time.Duration(configMonitor.Config.RefreshInterval)*time.Second),
3944
)
4045

46+
var yahoo *monitorYahoo.MonitorYahoo
47+
yahoo = monitorYahoo.NewMonitorYahoo(
48+
monitorYahoo.Config{
49+
UnaryURL: "https://query1.finance.yahoo.com",
50+
ChanError: chanError,
51+
},
52+
monitorYahoo.WithRefreshInterval(time.Duration(configMonitor.Config.RefreshInterval)*time.Second),
53+
)
54+
4155
m := &Monitor{
4256
monitors: map[c.QuoteSource]c.Monitor{
4357
c.QuoteSourceCoinbase: coinbase,
58+
c.QuoteSourceYahoo: yahoo,
4459
},
60+
chanError: chanError,
4561
}
4662

4763
return m, nil
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package monitorYahoo_test
2+
3+
import (
4+
"github.com/achannarasappa/ticker/v4/internal/monitor/yahoo/unary"
5+
)
6+
7+
var (
8+
responseQuote1Fixture = unary.Response{
9+
QuoteResponse: unary.ResponseQuoteResponse{
10+
Quotes: []unary.ResponseQuote{
11+
quoteCloudflareFixture,
12+
quoteGoogleFixture,
13+
},
14+
Error: nil,
15+
},
16+
}
17+
quoteCloudflareFixture = unary.ResponseQuote{
18+
MarketState: "REGULAR",
19+
ShortName: "Cloudflare, Inc.",
20+
PreMarketChange: unary.ResponseFieldFloat{Raw: 1.0399933, Fmt: "1.0399933"},
21+
PreMarketChangePercent: unary.ResponseFieldFloat{Raw: 1.2238094, Fmt: "1.2238094"},
22+
PreMarketPrice: unary.ResponseFieldFloat{Raw: 86.03, Fmt: "86.03"},
23+
RegularMarketChange: unary.ResponseFieldFloat{Raw: 3.0800018, Fmt: "3.0800018"},
24+
RegularMarketChangePercent: unary.ResponseFieldFloat{Raw: 3.7606857, Fmt: "3.7606857"},
25+
RegularMarketPrice: unary.ResponseFieldFloat{Raw: 84.98, Fmt: "84.98"},
26+
RegularMarketPreviousClose: unary.ResponseFieldFloat{Raw: 84.00, Fmt: "84.00"},
27+
RegularMarketOpen: unary.ResponseFieldFloat{Raw: 85.22, Fmt: "85.22"},
28+
RegularMarketDayHigh: unary.ResponseFieldFloat{Raw: 90.00, Fmt: "90.00"},
29+
RegularMarketDayLow: unary.ResponseFieldFloat{Raw: 80.00, Fmt: "80.00"},
30+
PostMarketChange: unary.ResponseFieldFloat{Raw: 1.37627, Fmt: "1.37627"},
31+
PostMarketChangePercent: unary.ResponseFieldFloat{Raw: 1.35735, Fmt: "1.35735"},
32+
PostMarketPrice: unary.ResponseFieldFloat{Raw: 86.56, Fmt: "86.56"},
33+
Symbol: "NET",
34+
}
35+
quoteGoogleFixture = unary.ResponseQuote{
36+
MarketState: "REGULAR",
37+
ShortName: "Google Inc.",
38+
PreMarketChange: unary.ResponseFieldFloat{Raw: 1.0399933, Fmt: "1.0399933"},
39+
PreMarketChangePercent: unary.ResponseFieldFloat{Raw: 1.2238094, Fmt: "1.2238094"},
40+
PreMarketPrice: unary.ResponseFieldFloat{Raw: 166.03, Fmt: "166.03"},
41+
RegularMarketChange: unary.ResponseFieldFloat{Raw: 3.0800018, Fmt: "3.0800018"},
42+
RegularMarketChangePercent: unary.ResponseFieldFloat{Raw: 3.7606857, Fmt: "3.7606857"},
43+
RegularMarketPrice: unary.ResponseFieldFloat{Raw: 166.25, Fmt: "166.25"},
44+
RegularMarketPreviousClose: unary.ResponseFieldFloat{Raw: 165.00, Fmt: "165.00"},
45+
RegularMarketOpen: unary.ResponseFieldFloat{Raw: 165.00, Fmt: "165.00"},
46+
RegularMarketDayHigh: unary.ResponseFieldFloat{Raw: 167.00, Fmt: "167.00"},
47+
RegularMarketDayLow: unary.ResponseFieldFloat{Raw: 164.00, Fmt: "164.00"},
48+
PostMarketChange: unary.ResponseFieldFloat{Raw: 1.37627, Fmt: "1.37627"},
49+
PostMarketChangePercent: unary.ResponseFieldFloat{Raw: 1.35735, Fmt: "1.35735"},
50+
PostMarketPrice: unary.ResponseFieldFloat{Raw: 167.62, Fmt: "167.62"},
51+
Symbol: "GOOG",
52+
}
53+
quoteMetaFixture = unary.ResponseQuote{
54+
MarketState: "REGULAR",
55+
ShortName: "Meta Platforms Inc.",
56+
PreMarketChange: unary.ResponseFieldFloat{Raw: 2, Fmt: "2"},
57+
PreMarketChangePercent: unary.ResponseFieldFloat{Raw: 0.6666667, Fmt: "0.6666667"},
58+
PreMarketPrice: unary.ResponseFieldFloat{Raw: 300.00, Fmt: "300.00"},
59+
RegularMarketChange: unary.ResponseFieldFloat{Raw: 3.00, Fmt: "3.00"},
60+
RegularMarketChangePercent: unary.ResponseFieldFloat{Raw: 1.00, Fmt: "1.00"},
61+
RegularMarketPrice: unary.ResponseFieldFloat{Raw: 303.00, Fmt: "303.00"},
62+
RegularMarketPreviousClose: unary.ResponseFieldFloat{Raw: 300.00, Fmt: "300.00"},
63+
RegularMarketOpen: unary.ResponseFieldFloat{Raw: 300.00, Fmt: "300.00"},
64+
RegularMarketDayHigh: unary.ResponseFieldFloat{Raw: 305.00, Fmt: "305.00"},
65+
RegularMarketDayLow: unary.ResponseFieldFloat{Raw: 295.00, Fmt: "295.00"},
66+
PostMarketChange: unary.ResponseFieldFloat{Raw: 1.00, Fmt: "1.00"},
67+
PostMarketChangePercent: unary.ResponseFieldFloat{Raw: 0.3333333, Fmt: "0.3333333"},
68+
PostMarketPrice: unary.ResponseFieldFloat{Raw: 304.37, Fmt: "304.37"},
69+
Symbol: "FB",
70+
}
71+
)

internal/monitor/yahoo/monitor.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
package monitorYahoo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"slices"
7+
"sync"
8+
"time"
9+
10+
c "github.com/achannarasappa/ticker/v4/internal/common"
11+
poller "github.com/achannarasappa/ticker/v4/internal/monitor/yahoo/poller"
12+
unary "github.com/achannarasappa/ticker/v4/internal/monitor/yahoo/unary"
13+
)
14+
15+
type MonitorYahoo struct {
16+
unaryAPI *unary.UnaryAPI
17+
poller *poller.Poller
18+
unary *unary.UnaryAPI
19+
input input
20+
productIds []string // Yahoo APIs refer to trading pairs as Product IDs which symbols ticker accepts with a -USD suffix
21+
productIdsPolling []string
22+
assetQuotesCache []c.AssetQuote // Asset quotes for all assets retrieved at start or on symbol change
23+
assetQuotesCacheLookup map[string]*c.AssetQuote // Asset quotes for all assets retrieved at least once (symbol change does not remove symbols)
24+
chanPollUpdateAssetQuote chan c.MessageUpdate[c.AssetQuote]
25+
chanError chan error
26+
mu sync.RWMutex
27+
ctx context.Context
28+
cancel context.CancelFunc
29+
isStarted bool
30+
onUpdateAssetQuote func(symbol string, assetQuote c.AssetQuote)
31+
onUpdateAssetQuotes func(assetQuotes []c.AssetQuote)
32+
}
33+
34+
type input struct {
35+
productIds []string
36+
productIdsLookup map[string]bool
37+
}
38+
39+
// Config contains the required configuration for the Yahoo monitor
40+
type Config struct {
41+
UnaryURL string
42+
ChanError chan error
43+
}
44+
45+
// Option defines an option for configuring the monitor
46+
type Option func(*MonitorYahoo)
47+
48+
func NewMonitorYahoo(config Config, opts ...Option) *MonitorYahoo {
49+
50+
ctx, cancel := context.WithCancel(context.Background())
51+
52+
unaryAPI := unary.NewUnaryAPI(config.UnaryURL)
53+
54+
monitor := &MonitorYahoo{
55+
assetQuotesCacheLookup: make(map[string]*c.AssetQuote),
56+
assetQuotesCache: make([]c.AssetQuote, 0),
57+
chanPollUpdateAssetQuote: make(chan c.MessageUpdate[c.AssetQuote]),
58+
chanError: config.ChanError,
59+
unaryAPI: unaryAPI,
60+
ctx: ctx,
61+
cancel: cancel,
62+
onUpdateAssetQuote: func(symbol string, assetQuote c.AssetQuote) {},
63+
onUpdateAssetQuotes: func(assetQuotes []c.AssetQuote) {},
64+
}
65+
66+
pollerConfig := poller.PollerConfig{
67+
ChanUpdateAssetQuote: monitor.chanPollUpdateAssetQuote,
68+
ChanError: monitor.chanError,
69+
UnaryAPI: unaryAPI,
70+
}
71+
monitor.poller = poller.NewPoller(ctx, pollerConfig)
72+
73+
for _, opt := range opts {
74+
opt(monitor)
75+
}
76+
77+
return monitor
78+
}
79+
80+
// WithRefreshInterval sets the refresh interval for the monitor
81+
func WithRefreshInterval(interval time.Duration) Option {
82+
return func(m *MonitorYahoo) {
83+
m.poller.SetRefreshInterval(interval)
84+
}
85+
}
86+
87+
// SetOnUpdate sets the onUpdate function for the monitor
88+
func (m *MonitorYahoo) SetOnUpdateAssetQuote(onUpdate func(symbol string, assetQuote c.AssetQuote)) {
89+
m.onUpdateAssetQuote = onUpdate
90+
}
91+
92+
func (m *MonitorYahoo) SetOnUpdateAssetQuotes(onUpdate func(assetQuotes []c.AssetQuote)) {
93+
m.onUpdateAssetQuotes = onUpdate
94+
}
95+
96+
func (m *MonitorYahoo) GetAssetQuotes(ignoreCache ...bool) ([]c.AssetQuote, error) {
97+
if len(ignoreCache) > 0 && ignoreCache[0] {
98+
assetQuotes, err := m.getAssetQuotesAndReplaceCache()
99+
if err != nil {
100+
return []c.AssetQuote{}, err
101+
}
102+
return assetQuotes, nil
103+
}
104+
105+
m.mu.RLock()
106+
defer m.mu.RUnlock()
107+
108+
return m.assetQuotesCache, nil
109+
}
110+
111+
func (m *MonitorYahoo) SetSymbols(productIds []string) error {
112+
113+
var err error
114+
115+
m.mu.Lock()
116+
117+
// Deduplicate productIds since input may have duplicates
118+
slices.Sort(productIds)
119+
m.productIds = slices.Compact(productIds)
120+
m.productIdsPolling = m.productIds
121+
m.input.productIds = productIds
122+
m.input.productIdsLookup = make(map[string]bool)
123+
for _, productId := range productIds {
124+
m.input.productIdsLookup[productId] = true
125+
}
126+
127+
m.mu.Unlock()
128+
129+
_, err = m.getAssetQuotesAndReplaceCache()
130+
if err != nil {
131+
return err
132+
}
133+
134+
m.poller.SetSymbols(m.productIdsPolling)
135+
136+
m.onUpdateAssetQuotes(m.assetQuotesCache)
137+
138+
return nil
139+
140+
}
141+
142+
// Start the monitor
143+
func (m *MonitorYahoo) Start() error {
144+
145+
var err error
146+
147+
if m.isStarted {
148+
return fmt.Errorf("monitor already started")
149+
}
150+
151+
// On start, get initial quotes from unary API
152+
_, err = m.getAssetQuotesAndReplaceCache()
153+
if err != nil {
154+
return err
155+
}
156+
157+
err = m.poller.Start()
158+
if err != nil {
159+
return err
160+
}
161+
162+
go m.handleUpdates()
163+
164+
m.isStarted = true
165+
166+
return nil
167+
}
168+
169+
func (m *MonitorYahoo) Stop() error {
170+
171+
if !m.isStarted {
172+
return fmt.Errorf("monitor not started")
173+
}
174+
175+
m.cancel()
176+
return nil
177+
}
178+
179+
func (m *MonitorYahoo) handleUpdates() {
180+
for {
181+
select {
182+
case <-m.ctx.Done():
183+
return
184+
185+
case updateMessage := <-m.chanPollUpdateAssetQuote:
186+
// Check if cache exists and values have changed before acquiring write lock
187+
m.mu.RLock()
188+
189+
assetQuote, exists := m.assetQuotesCacheLookup[updateMessage.ID]
190+
191+
if !exists {
192+
// If product id does not exist in cache, skip update
193+
// TODO: log product not found in cache - should not happen
194+
m.mu.RUnlock()
195+
continue
196+
}
197+
198+
// Skip update if nothing has changed
199+
if assetQuote.QuotePrice.Price == updateMessage.Data.QuotePrice.Price &&
200+
assetQuote.Exchange.IsActive == updateMessage.Data.Exchange.IsActive &&
201+
assetQuote.QuotePrice.PriceDayHigh == updateMessage.Data.QuotePrice.PriceDayHigh {
202+
203+
m.mu.RUnlock()
204+
continue
205+
}
206+
m.mu.RUnlock()
207+
208+
// Price is different so update cache
209+
m.mu.Lock()
210+
211+
assetQuote.QuotePrice.Price = updateMessage.Data.QuotePrice.Price
212+
assetQuote.QuotePrice.Change = updateMessage.Data.QuotePrice.Change
213+
assetQuote.QuotePrice.ChangePercent = updateMessage.Data.QuotePrice.ChangePercent
214+
assetQuote.QuotePrice.PriceDayHigh = updateMessage.Data.QuotePrice.PriceDayHigh
215+
assetQuote.QuotePrice.PriceDayLow = updateMessage.Data.QuotePrice.PriceDayLow
216+
assetQuote.QuotePrice.PriceOpen = updateMessage.Data.QuotePrice.PriceOpen
217+
assetQuote.QuotePrice.PricePrevClose = updateMessage.Data.QuotePrice.PricePrevClose
218+
assetQuote.QuoteExtended.FiftyTwoWeekHigh = updateMessage.Data.QuoteExtended.FiftyTwoWeekHigh
219+
assetQuote.QuoteExtended.FiftyTwoWeekLow = updateMessage.Data.QuoteExtended.FiftyTwoWeekLow
220+
assetQuote.QuoteExtended.MarketCap = updateMessage.Data.QuoteExtended.MarketCap
221+
assetQuote.QuoteExtended.Volume = updateMessage.Data.QuoteExtended.Volume
222+
assetQuote.Exchange.IsActive = updateMessage.Data.Exchange.IsActive
223+
assetQuote.Exchange.IsRegularTradingSession = updateMessage.Data.Exchange.IsRegularTradingSession
224+
225+
m.mu.Unlock()
226+
227+
m.onUpdateAssetQuote(assetQuote.Symbol, *assetQuote)
228+
229+
continue
230+
231+
default:
232+
}
233+
}
234+
}
235+
236+
// Get asset quotes from unary API, add futures quotes, filter out assets not explicitly requested, and replace the asset quotes cache
237+
func (m *MonitorYahoo) getAssetQuotesAndReplaceCache() ([]c.AssetQuote, error) {
238+
239+
assetQuotes, assetQuotesByProductId, err := m.unaryAPI.GetAssetQuotes(m.productIds)
240+
if err != nil {
241+
return []c.AssetQuote{}, err
242+
}
243+
244+
// Filter asset quotes to only include explicitly requested ones
245+
assetQuotesEnriched := make([]c.AssetQuote, 0, len(m.input.productIds))
246+
247+
for _, quote := range assetQuotes {
248+
assetQuotesEnriched = append(assetQuotesEnriched, quote)
249+
}
250+
251+
// Lock updates to asset quotes while symbols are changed and subscriptions updates. ensure data from unary call supercedes potentially oudated streaming data
252+
m.mu.Lock()
253+
defer m.mu.Unlock()
254+
255+
m.assetQuotesCache = assetQuotesEnriched
256+
m.assetQuotesCacheLookup = assetQuotesByProductId
257+
258+
return m.assetQuotesCache, nil
259+
}

0 commit comments

Comments
 (0)