Skip to content

Commit 01fae92

Browse files
gouthamvecodesome
andauthored
Use TSDB's WAL for writes. (#1103)
* Update vendored libraries: - Add github.com/promtheus/tsdb/wal - Update github.com/prometheus/client_golang for WrapRegistererWith function. Signed-off-by: Tom Wilkie <[email protected]> * Use TSDB's WAL for writes. Signed-off-by: Tom Wilkie <[email protected]> * Fix merge conflicts Signed-off-by: Ganesh Vernekar <[email protected]> * Fix creation of user states in WAL recover Signed-off-by: Ganesh Vernekar <[email protected]> * Remove WAL recovery on start-up and allow nil Record Signed-off-by: Ganesh Vernekar <[email protected]> * Fix types Signed-off-by: Ganesh Vernekar <[email protected]> * WAL compression always enabled Signed-off-by: Ganesh Vernekar <[email protected]> * Change checkpoint logic to be more like prometheus's tsdb Signed-off-by: Ganesh Vernekar <[email protected]> * Add metrics for checkpoint and name changes Signed-off-by: Ganesh Vernekar <[email protected]> * Initial attempt for flushing chunks from WAL [WIP] Signed-off-by: Ganesh Vernekar <[email protected]> * Combine checkpoint and WAL chunks before flushing Signed-off-by: Ganesh Vernekar <[email protected]> * Bring back recovery and tests Signed-off-by: Ganesh Vernekar <[email protected]> * Fix race in the test Signed-off-by: Ganesh Vernekar <[email protected]> * Recover on startup Signed-off-by: Ganesh Vernekar <[email protected]> * Dont remove the last segment in truncation Signed-off-by: Ganesh Vernekar <[email protected]> * Always read WAL and remove recover-only mode Signed-off-by: Ganesh Vernekar <[email protected]> * Timer for WAL recovery time Signed-off-by: Ganesh Vernekar <[email protected]> * More rigorous test for WAL Signed-off-by: Ganesh Vernekar <[email protected]> * More profiling in debug mode Signed-off-by: Ganesh Vernekar <[email protected]> * Fix race in test Signed-off-by: Ganesh Vernekar <[email protected]> * No limits on number of series during recovery Signed-off-by: Ganesh Vernekar <[email protected]> * No rate limiting of checkpoint Signed-off-by: Ganesh Vernekar <[email protected]> * Change segment deletion logic Signed-off-by: Ganesh Vernekar <[email protected]> * Process WAL records in parallel. Also create a checkpoint during graceful shutdown. Signed-off-by: Ganesh Vernekar <[email protected]> * Added comments and some refactoring and not returning on no-series for samples Signed-off-by: Ganesh Vernekar <[email protected]> * Process checkpoint series in parallel Signed-off-by: Ganesh Vernekar <[email protected]> * Fix race in processing WAL Adapted Prometheus style concurrency handling for processing the WAL segments. Signed-off-by: Ganesh Vernekar <[email protected]> * Small enhancements Signed-off-by: Ganesh Vernekar <[email protected]> * Cache the user states and series when processing samples Signed-off-by: Ganesh Vernekar <[email protected]> * Enhancement in the user state cache and fix in samples buffer Signed-off-by: Ganesh Vernekar <[email protected]> * Cache user states and series right from the checkpoint Signed-off-by: Ganesh Vernekar <[email protected]> * Small enhancements Signed-off-by: Ganesh Vernekar <[email protected]> * Fix the flag Signed-off-by: Ganesh Vernekar <[email protected]> * Remove test files Signed-off-by: Ganesh Vernekar <[email protected]> * Use tsdb from prometheus/prometheus repo Signed-off-by: Ganesh Vernekar <[email protected]> * Avoid flushing on shutdown Signed-off-by: Ganesh Vernekar <[email protected]> * Fix after rebase Signed-off-by: Ganesh Vernekar <[email protected]> * Fix bug of resetting userStates Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Small enhancements Signed-off-by: Ganesh Vernekar <[email protected]> * Update comments Signed-off-by: Ganesh Vernekar <[email protected]> * Remove ingester<->WAL circular dependancy Signed-off-by: Ganesh Vernekar <[email protected]> * Change segment size of the WAL Signed-off-by: Ganesh Vernekar <[email protected]> * Use same directory for temporary tokens file Signed-off-by: Ganesh Vernekar <[email protected]> * Disble transfer out when WAL is enabled Signed-off-by: Ganesh Vernekar <[email protected]> * Flush on shutdown endpoint irrespective of WAL Signed-off-by: Ganesh Vernekar <[email protected]> * Use sync.Pool for the records Signed-off-by: Ganesh Vernekar <[email protected]> * Fix Goutham's comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix Goutham's comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix possible data corruption, goroutine deadlock and memory leak Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * memoryChunks counter fix, metics updated, small cleanup Signed-off-by: Ganesh Vernekar <[email protected]> * Update config file and argument doc Signed-off-by: Ganesh Vernekar <[email protected]> * Add guide to run/migrate-to WAL in ingesters Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> Co-authored-by: Ganesh Vernekar <[email protected]>
2 parents c56d00a + fe3cc08 commit 01fae92

19 files changed

+2709
-71
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))
4949

5050
# Manually declared dependencies And what goes into each exe
5151
pkg/ingester/client/cortex.pb.go: pkg/ingester/client/cortex.proto
52+
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
5253
pkg/ring/ring.pb.go: pkg/ring/ring.proto
5354
pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
5455
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
5556
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
5657
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
5758
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
59+
5860
all: $(UPTODATE_FILES)
5961
test: protos
6062
mod-check: protos

docs/configuration/arguments.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,24 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
305305
Where you don't want to cache every chunk written by ingesters, but you do want to take advantage of chunk write deduplication, this option will make ingesters write a placeholder to the cache for each chunk.
306306
Make sure you configure ingesters with a different cache to queriers, which need the whole value.
307307

308+
#### WAL
309+
310+
- `--ingester.wal-dir`
311+
Directory where the WAL data should be stores and/or recovered from.
312+
313+
- `--ingester.wal-enabled`
314+
315+
Setting this to `true` enables writing to WAL during ingestion.
316+
317+
- `--ingester.checkpoint-enabled`
318+
Set this to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process.
319+
320+
- `--ingester.checkpoint-duration`
321+
This is the interval at which checkpoints should be created.
322+
323+
- `--ingester.recover-from-wal`
324+
Set this to to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.
325+
308326
## Runtime Configuration file
309327

310328
Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=<filename>` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=<duration>` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=<filename>` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=<filename>` is not specified.

docs/configuration/config-file-reference.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,27 @@ ring:
323323
The `ingester_config` configures the Cortex ingester.
324324

325325
```yaml
326+
walconfig:
327+
# Enable writing of ingested data into WAL.
328+
# CLI flag: -ingester.wal-enabled
329+
[wal_enabled: <boolean> | default = false]
330+
331+
# Enable checkpointing of in-memory chunks.
332+
# CLI flag: -ingester.checkpoint-enabled
333+
[checkpoint_enabled: <boolean> | default = false]
334+
335+
# Recover data from existing WAL irrespective of WAL enabled/disabled.
336+
# CLI flag: -ingester.recover-from-wal
337+
[recover_from_wal: <boolean> | default = false]
338+
339+
# Directory to store the WAL and/or recover from WAL.
340+
# CLI flag: -ingester.wal-dir
341+
[wal_dir: <string> | default = "wal"]
342+
343+
# Interval at which checkpoints should be created.
344+
# CLI flag: -ingester.checkpoint-duration
345+
[checkpoint_duration: <duration> | default = 30m0s]
346+
326347
lifecycler:
327348
ring:
328349
kvstore:

docs/guides/ingesters-with-wal.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
---
2+
title: "Ingesters with WAL"
3+
linkTitle: "Ingesters with WAL"
4+
weight: 5
5+
slug: ingesters-with-wal
6+
---
7+
8+
Currently the ingesters running in the chunks storage mode, store all their data in memory. If there is a crash, there could be loss of data. WAL helps fill this gap in reliability.
9+
10+
To use WAL, there are some changes that needs to be made in the deployment.
11+
12+
## Changes to deployment
13+
14+
1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [statefulset](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes.
15+
16+
2. Following flags needs to be set
17+
* `--ingester.wal-dir` to the directory where the WAL data should be stores and/or recovered from. Note that this should be on the mounted volume.
18+
* `--ingester.wal-enabled` to `true` which enables writing to WAL during ingestion.
19+
* `--ingester.checkpoint-enabled` to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process.
20+
* `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. Default is `30m`, and depending on the number of series, it can be brought down to `15m` if there are less series per ingester (say 1M).
21+
* `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.
22+
* If you are going to enable WAL, it is advisable to always set this to `true`.
23+
* `--ingester.tokens-file-path` should be set to the filepath where the tokens should be stored. Note that this should be on the mounted volume. Why this is required is described below.
24+
25+
## Changes in lifecycle when WAL is enabled
26+
27+
1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of statefulset there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL.
28+
29+
2. As there are no transfers between ingesters, the tokens are stored and recovered from disk between rollout/restarts. This is [not a new thing](https://github.com/cortexproject/cortex/pull/1750) but it is effective when using statefulsets.
30+
31+
## Migrating from stateless deployments
32+
33+
The ingester _deployment without WAL_ and _statefulset with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately.
34+
35+
Let's take an example of 4 ingesters. The migration would look something like this:
36+
37+
1. Bring up one stateful ingester `ingester-0` and wait till it's ready (accepting read and write requests).
38+
2. Scale down old ingester deployment to 3 and wait till the leaving ingester flushes all the data to chunk store.
39+
3. Once that ingester has disappeared from `kc get pods ...`, add another stateful ingester and wait till it's ready. This assures not transfer. Now you have `ingester-0 ingester-1`.
40+
4. Repeat step 2 to reduce remove another ingester from old deployment.
41+
5. Repeat step 3 to add another stateful ingester. Now you have `ingester-0 ingester-1 ingester-2`.
42+
6. Repeat step 4 and 5, and now you will finally have `ingester-0 ingester-1 ingester-2 ingester-3`.
43+
44+
## How to scale up/down
45+
46+
### Scale up
47+
48+
Scaling up is same as what you would do without WAL or statefulsets. Nothing to change here.
49+
50+
### Scale down
51+
52+
Since Kubernetes doesn't differentiate between rollout and scale down when sending a signal, the flushing of chunks is disabled by default. Hence the only thing to take care during scale down is flushing of chunks.
53+
54+
There are 2 ways to do it, with the latter being a fallback option.
55+
56+
**First option**
57+
Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`.
58+
59+
Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/shutdown`](https://github.com/cortexproject/cortex/pull/1746) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring).
60+
61+
After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2.
62+
63+
PS: Given you have to scale down 1 ingester at a time, you can pipeline the shutdown and scaledown process instead of hitting shutdown endpoint for all to-be-scaled-down ingesters at the same time.
64+
65+
**Fallback option**
66+
67+
There is a [flush mode ingester](https://github.com/cortexproject/cortex/pull/1747) in progress, and with recent discussions there will be a separate target called flusher in it's place.
68+
69+
You can run it as a kubernetes job which will
70+
* Attach to the volume of the scaled down ingester
71+
* Recover from the WAL
72+
* And flush all the chunks.
73+
74+
This job is to be run for all the ingesters that you missed hitting the shutdown endpoint as a first option.
75+
76+
More info about the flusher target will be added once it's upstream.

pkg/distributor/distributor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
188188
if !canJoinDistributorsRing {
189189
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
190190
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
191-
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey)
191+
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true)
192192
if err != nil {
193193
return nil, err
194194
}

pkg/ingester/ingester.go

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,14 @@ const (
3535
queryStreamBatchSize = 128
3636
)
3737

38+
var (
39+
// This is initialised if the WAL is enabled and the records are fetched from this pool.
40+
recordPool sync.Pool
41+
)
42+
3843
// Config for an Ingester.
3944
type Config struct {
45+
WALConfig WALConfig
4046
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
4147

4248
// Config for transferring chunks. Zero or negative = no retries.
@@ -70,6 +76,7 @@ type Config struct {
7076
// RegisterFlags adds the flags required to config this to the given FlagSet
7177
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7278
cfg.LifecyclerConfig.RegisterFlags(f)
79+
cfg.WALConfig.RegisterFlags(f)
7380

7481
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.")
7582
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
@@ -109,6 +116,9 @@ type Ingester struct {
109116
flushQueues []*util.PriorityQueue
110117
flushQueuesDone sync.WaitGroup
111118

119+
// This should never be nil.
120+
wal WAL
121+
112122
// Hook for injecting behaviour from tests.
113123
preFlushUserSeries func()
114124

@@ -131,6 +141,19 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
131141
return NewV2(cfg, clientConfig, limits, registerer)
132142
}
133143

144+
if cfg.WALConfig.walEnabled {
145+
// If WAL is enabled, we don't transfer out the data to any ingester.
146+
// Either the next ingester which takes it's place should recover from WAL
147+
// or the data has to be flushed during scaledown.
148+
cfg.MaxTransferRetries = 0
149+
150+
recordPool = sync.Pool{
151+
New: func() interface{} {
152+
return &Record{}
153+
},
154+
}
155+
}
156+
134157
i := &Ingester{
135158
cfg: cfg,
136159
clientConfig: clientConfig,
@@ -142,14 +165,36 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
142165
}
143166

144167
var err error
145-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey)
168+
// During WAL recovery, it will create new user states which requires the limiter.
169+
// Hence initialise the limiter before creating the WAL.
170+
// The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled.
171+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled)
146172
if err != nil {
147173
return nil, err
148174
}
149-
150-
// Init the limter and instantiate the user states which depend on it
151175
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
152-
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
176+
177+
if cfg.WALConfig.recover {
178+
level.Info(util.Logger).Log("msg", "recovering from WAL")
179+
start := time.Now()
180+
if err := recoverFromWAL(i); err != nil {
181+
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String())
182+
return nil, err
183+
}
184+
elapsed := time.Since(start)
185+
level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String())
186+
i.metrics.walReplayDuration.Set(elapsed.Seconds())
187+
}
188+
189+
// If the WAL recover happened, then the userStates would already be set.
190+
if i.userStates == nil {
191+
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
192+
}
193+
194+
i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp)
195+
if err != nil {
196+
return nil, err
197+
}
153198

154199
// Now that user states have been created, we can start the lifecycler
155200
i.lifecycler.Start()
@@ -200,6 +245,8 @@ func (i *Ingester) Shutdown() {
200245
close(i.quit)
201246
i.done.Wait()
202247

248+
i.wal.Stop()
249+
203250
// Next initiate our graceful exit from the ring.
204251
i.lifecycler.Shutdown()
205252
}
@@ -209,7 +256,11 @@ func (i *Ingester) Shutdown() {
209256
// * Change the state of ring to stop accepting writes.
210257
// * Flush all the chunks.
211258
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
259+
originalState := i.lifecycler.FlushOnShutdown()
260+
// We want to flush the chunks if transfer fails irrespective of original flag.
261+
i.lifecycler.SetFlushOnShutdown(true)
212262
i.Shutdown()
263+
i.lifecycler.SetFlushOnShutdown(originalState)
213264
w.WriteHeader(http.StatusNoContent)
214265
}
215266

@@ -232,11 +283,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
232283
if err != nil {
233284
return nil, fmt.Errorf("no user id")
234285
}
286+
235287
var lastPartialErr *validationError
288+
var record *Record
289+
if i.cfg.WALConfig.walEnabled {
290+
record = recordPool.Get().(*Record)
291+
record.UserId = userID
292+
// Assuming there is not much churn in most cases, there is no use
293+
// keeping the record.Labels slice hanging around.
294+
record.Labels = nil
295+
if cap(record.Samples) < len(req.Timeseries) {
296+
record.Samples = make([]Sample, 0, len(req.Timeseries))
297+
} else {
298+
record.Samples = record.Samples[:0]
299+
}
300+
}
236301

237302
for _, ts := range req.Timeseries {
238303
for _, s := range ts.Samples {
239-
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source)
304+
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
240305
if err == nil {
241306
continue
242307
}
@@ -254,10 +319,19 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
254319
if lastPartialErr != nil {
255320
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
256321
}
322+
323+
if record != nil {
324+
// Log the record only if there was no error in ingestion.
325+
if err := i.wal.Log(record); err != nil {
326+
return nil, err
327+
}
328+
recordPool.Put(record)
329+
}
330+
257331
return &client.WriteResponse{}, nil
258332
}
259333

260-
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
334+
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
261335
labels.removeBlanks()
262336

263337
var (
@@ -274,7 +348,8 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
274348
if i.stopped {
275349
return fmt.Errorf("ingester stopping")
276350
}
277-
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels)
351+
352+
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
278353
if err != nil {
279354
if ve, ok := err.(*validationError); ok {
280355
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
@@ -310,6 +385,14 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
310385
return err
311386
}
312387

388+
if record != nil {
389+
record.Samples = append(record.Samples, Sample{
390+
Fingerprint: uint64(fp),
391+
Timestamp: uint64(timestamp),
392+
Value: float64(value),
393+
})
394+
}
395+
313396
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
314397
i.metrics.ingestedSamples.Inc()
315398
switch source {
@@ -430,7 +513,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
430513
}
431514

432515
numSeries++
433-
wireChunks, err := toWireChunks(chunks)
516+
wireChunks, err := toWireChunks(chunks, nil)
434517
if err != nil {
435518
return err
436519
}

0 commit comments

Comments
 (0)