Skip to content

Commit 9ad8671

Browse files
committed
[aggregator] Add ActivePlacement method to TCP client
1 parent 0342a92 commit 9ad8671

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

src/aggregator/client/tcp_client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,23 @@ func (c *TCPClient) WriteForwarded(
222222
return c.write(metric.ID, metric.TimeNanos, payload)
223223
}
224224

225+
// ActivePlacement returns a copy of the currently active placement and its version.
226+
func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) {
227+
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
228+
if err != nil {
229+
return nil, 0, err
230+
}
231+
defer onStagedPlacementDoneFn()
232+
233+
placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
234+
if err != nil {
235+
return nil, 0, err
236+
}
237+
defer onPlacementDoneFn()
238+
239+
return placement.Clone(), stagedPlacement.Version(), nil
240+
}
241+
225242
// Flush flushes any remaining data buffered by the client.
226243
func (c *TCPClient) Flush() error {
227244
c.metrics.flush.Inc(1)

src/aggregator/client/tcp_client_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/golang/mock/gomock"
32+
"github.com/stretchr/testify/assert"
3233
"github.com/stretchr/testify/require"
3334

3435
"github.com/m3db/m3/src/cluster/kv/mem"
@@ -786,6 +787,30 @@ func TestTCPClientWriteTimeRangeFor(t *testing.T) {
786787
}
787788
}
788789

790+
func TestTCPClientActivePlacement(t *testing.T) {
791+
var (
792+
c = mustNewTestTCPClient(t, testOptions())
793+
emptyPl = placement.NewPlacement()
794+
ctrl = gomock.NewController(t)
795+
mockPl = placement.NewMockPlacement(ctrl)
796+
stagedPlacement = placement.NewMockActiveStagedPlacement(ctrl)
797+
watcher = placement.NewMockStagedPlacementWatcher(ctrl)
798+
doneCalls int
799+
)
800+
801+
c.placementWatcher = watcher
802+
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil)
803+
stagedPlacement.EXPECT().Version().Return(42)
804+
stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil)
805+
mockPl.EXPECT().Clone().Return(emptyPl)
806+
807+
pl, v, err := c.ActivePlacement()
808+
assert.NoError(t, err)
809+
assert.Equal(t, 42, v)
810+
assert.Equal(t, 2, doneCalls)
811+
assert.Equal(t, emptyPl, pl)
812+
}
813+
789814
func TestTCPClientInitAndClose(t *testing.T) {
790815
c := mustNewTestTCPClient(t, testOptions())
791816
require.NoError(t, c.Init())

0 commit comments

Comments
 (0)