Skip to content

Commit a9a9acb

Browse files
rodrigozhoudnr
authored andcommitted
Add support for ES Scroll for Scan API (#4614)
<!-- Describe what has changed in this PR --> **What changed?** Add support for ES Scroll for Scan API. This is a revert of #4223 and #4249. <!-- Tell your future self why have you made these changes --> **Why?** ES 7.10 hosted in AWS is the OSS flavor, not the default one which includes support for PIT. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Add unit tests. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** No. <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?**
1 parent 67de6df commit a9a9acb

File tree

5 files changed

+374
-49
lines changed

5 files changed

+374
-49
lines changed

common/persistence/visibility/store/elasticsearch/client/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ type (
5252
WaitForYellowStatus(ctx context.Context, index string) (string, error)
5353
GetMapping(ctx context.Context, index string) (map[string]string, error)
5454

55+
OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*elastic.SearchResult, error)
56+
Scroll(ctx context.Context, id string, keepAliveInterval string) (*elastic.SearchResult, error)
57+
CloseScroll(ctx context.Context, id string) error
58+
59+
IsPointInTimeSupported(ctx context.Context) bool
5560
OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error)
5661
ClosePointInTime(ctx context.Context, id string) (bool, error)
5762
}
@@ -79,6 +84,7 @@ type (
7984
Sorter []elastic.Sorter
8085

8186
SearchAfter []interface{}
87+
ScrollID string
8288
PointInTime *elastic.PointInTime
8389
}
8490
)

common/persistence/visibility/store/elasticsearch/client/client_mock.go

Lines changed: 116 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/visibility/store/elasticsearch/client/client_v7.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ import (
3030
"net/http"
3131
"net/url"
3232
"strings"
33+
"sync"
3334
"time"
3435

36+
"github.com/blang/semver/v4"
3537
"github.com/olivere/elastic/v7"
3638
"github.com/olivere/elastic/v7/uritemplates"
3739
enumspb "go.temporal.io/api/enums/v1"
@@ -44,9 +46,20 @@ type (
4446
clientImpl struct {
4547
esClient *elastic.Client
4648
url url.URL
49+
50+
initIsPointInTimeSupported sync.Once
51+
isPointInTimeSupported bool
4752
}
4853
)
4954

55+
const (
56+
pointInTimeSupportedFlavor = "default" // the other flavor is "oss"
57+
)
58+
59+
var (
60+
pointInTimeSupportedIn = semver.MustParseRange(">=7.10.0")
61+
)
62+
5063
var _ Client = (*clientImpl)(nil)
5164

5265
// newClient create a ES client
@@ -138,6 +151,56 @@ func (c *clientImpl) Search(ctx context.Context, p *SearchParameters) (*elastic.
138151
return searchService.Do(ctx)
139152
}
140153

154+
func (c *clientImpl) OpenScroll(
155+
ctx context.Context,
156+
p *SearchParameters,
157+
keepAliveInterval string,
158+
) (*elastic.SearchResult, error) {
159+
scrollService := elastic.NewScrollService(c.esClient).
160+
Index(p.Index).
161+
Query(p.Query).
162+
SortBy(p.Sorter...).
163+
KeepAlive(keepAliveInterval)
164+
if p.PageSize != 0 {
165+
scrollService.Size(p.PageSize)
166+
}
167+
return scrollService.Do(ctx)
168+
}
169+
170+
func (c *clientImpl) Scroll(
171+
ctx context.Context,
172+
id string,
173+
keepAliveInterval string,
174+
) (*elastic.SearchResult, error) {
175+
return elastic.NewScrollService(c.esClient).ScrollId(id).KeepAlive(keepAliveInterval).Do(ctx)
176+
}
177+
178+
func (c *clientImpl) CloseScroll(ctx context.Context, id string) error {
179+
return elastic.NewScrollService(c.esClient).ScrollId(id).Clear(ctx)
180+
}
181+
182+
func (c *clientImpl) IsPointInTimeSupported(ctx context.Context) bool {
183+
c.initIsPointInTimeSupported.Do(func() {
184+
c.isPointInTimeSupported = c.queryPointInTimeSupported(ctx)
185+
})
186+
return c.isPointInTimeSupported
187+
}
188+
189+
func (c *clientImpl) queryPointInTimeSupported(ctx context.Context) bool {
190+
result, _, err := c.esClient.Ping(c.url.String()).Do(ctx)
191+
if err != nil {
192+
return false
193+
}
194+
if result == nil || result.Version.BuildFlavor != pointInTimeSupportedFlavor {
195+
return false
196+
}
197+
esVersion, err := semver.ParseTolerant(result.Version.Number)
198+
if err != nil {
199+
return false
200+
}
201+
return pointInTimeSupportedIn(esVersion)
202+
}
203+
141204
func (c *clientImpl) OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error) {
142205
resp, err := c.esClient.OpenPointInTime(index).KeepAlive(keepAliveInterval).Do(ctx)
143206
if err != nil {

0 commit comments

Comments
 (0)