Skip to content

Commit b7fde58

Browse files
author
Paul Bellamy
authored
services/horizon: Allow captive core to run with sqlite database (stellar#4092)
1 parent 2cce28d commit b7fde58

File tree

20 files changed

+262
-53
lines changed

20 files changed

+262
-53
lines changed

.circleci/config.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ jobs:
430430
enable-captive-core:
431431
type: boolean
432432
default: false
433+
enable-captive-core-remote-storage:
434+
type: boolean
435+
default: false
433436
working_directory: ~/go/src/github.com/stellar/go
434437
machine:
435438
image: ubuntu-2004:202010-01
@@ -452,6 +455,12 @@ jobs:
452455
- run:
453456
name: Setting Captive Core env variables
454457
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=true" >> $BASH_ENV
458+
- when:
459+
condition: <<parameters.enable-captive-core-remote-storage>>
460+
steps:
461+
- run:
462+
name: Setting Captive Core Remote Storage env variable
463+
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB=true" >> $BASH_ENV
455464
- run:
456465
name: Run Horizon integration tests <<#parameters.enable-captive-core>>(With captive core)<</parameters.enable-captive-core>>
457466
# Currently all integration tests are in a single directory.
@@ -480,6 +489,10 @@ workflows:
480489
- test_horizon_integration:
481490
name: test_horizon_integration_with_captive_core
482491
enable-captive-core: true
492+
- test_horizon_integration:
493+
name: test_horizon_integration_with_captive_core_remote_storage
494+
enable-captive-core: true
495+
enable-captive-core-remote-storage: true
483496
- test_verify_range_docker_image:
484497
filters:
485498
# we use test_verify_range_docker_image with publish in master

ingest/ledgerbackend/captive_core_backend.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ type CaptiveCoreConfig struct {
124124
// stored. We always append /captive-core to this directory, since we clean
125125
// it up entirely on shutdown.
126126
StoragePath string
127+
128+
// UseDB, when true, instructs the core invocation to use an external db url
129+
// for ledger states rather than in memory(RAM). The external db url is determined by the presence
130+
// of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite
131+
// and the db file will be stored at location derived from StoragePath parameter.
132+
UseDB bool
127133
}
128134

129135
// NewCaptive returns a new CaptiveStellarCore instance.
@@ -142,6 +148,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
142148
if parentCtx == nil {
143149
parentCtx = context.Background()
144150
}
151+
145152
var cancel context.CancelFunc
146153
config.Context, cancel = context.WithCancel(parentCtx)
147154

@@ -250,11 +257,8 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
250257
var runner stellarCoreRunnerInterface
251258
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil {
252259
return errors.Wrap(err, "error creating stellar-core runner")
253-
} else {
254-
// only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check
255-
// see https://golang.org/doc/faq#nil_error
256-
c.stellarCoreRunner = runner
257260
}
261+
c.stellarCoreRunner = runner
258262

259263
runFrom, ledgerHash, err := c.runFromParams(ctx, from)
260264
if err != nil {
@@ -279,14 +283,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
279283
}
280284

281285
// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
282-
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
286+
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (uint32, string, error) {
287+
283288
if from == 1 {
284289
// Trying to start-from 1 results in an error from Stellar-Core:
285290
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
286291
// TODO maybe we can fix it by generating 1st ledger meta
287292
// like GenesisLedgerStateReader?
288-
err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
289-
return
293+
err := errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
294+
return 0, "", err
290295
}
291296

292297
if from <= 63 {
@@ -298,26 +303,25 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
298303
from = 3
299304
}
300305

301-
runFrom = from - 1
306+
runFrom := from - 1
302307
if c.ledgerHashStore != nil {
303308
var exists bool
304-
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
309+
ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
305310
if err != nil {
306311
err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom)
307-
return
312+
return 0, "", err
308313
}
309314
if exists {
310-
return
315+
return runFrom, ledgerHash, nil
311316
}
312317
}
313318

314-
ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
315-
if err2 != nil {
316-
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
317-
return
319+
ledgerHeader, err := c.archive.GetLedgerHeader(from)
320+
if err != nil {
321+
return 0, "", errors.Wrapf(err, "error trying to read ledger header %d from HAS", from)
318322
}
319-
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
320-
return
323+
ledgerHash := hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
324+
return runFrom, ledgerHash, nil
321325
}
322326

323327
// nextExpectedSequence returns nextLedger (if currently set) or start of
@@ -406,6 +410,10 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
406410
return false
407411
}
408412

413+
if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
414+
return false
415+
}
416+
409417
lastLedger := uint32(0)
410418
if c.lastLedger != nil {
411419
lastLedger = *c.lastLedger

ingest/ledgerbackend/captive_core_backend_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
316316
ctx := context.Background()
317317
mockRunner := &stellarCoreRunnerMock{}
318318
mockRunner.On("close").Return(fmt.Errorf("transient error"))
319+
mockRunner.On("getProcessExitError").Return(false, nil)
319320
mockRunner.On("context").Return(ctx)
320321

321322
captiveBackend := CaptiveStellarCore{
@@ -490,13 +491,15 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
490491
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
491492
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
492493
mockRunner.On("context").Return(ctx)
494+
mockRunner.On("getProcessExitError").Return(false, nil)
493495

494496
mockArchive := &historyarchive.MockArchive{}
495497
mockArchive.
496498
On("GetRootHAS").
497499
Return(historyarchive.HistoryArchiveState{
498500
CurrentLedger: uint32(129),
499501
}, nil)
502+
500503
mockArchive.
501504
On("GetLedgerHeader", uint32(65)).
502505
Return(xdr.LedgerHeaderHistoryEntry{}, nil)
@@ -585,6 +588,7 @@ func TestCaptiveGetLedger(t *testing.T) {
585588
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
586589
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
587590
mockRunner.On("context").Return(ctx)
591+
mockRunner.On("getProcessExitError").Return(false, nil)
588592

589593
mockArchive := &historyarchive.MockArchive{}
590594
mockArchive.
@@ -1288,6 +1292,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
12881292
func TestCaptiveIsPrepared(t *testing.T) {
12891293
mockRunner := &stellarCoreRunnerMock{}
12901294
mockRunner.On("context").Return(context.Background()).Maybe()
1295+
mockRunner.On("getProcessExitError").Return(false, nil)
12911296

12921297
// c.prepared == nil
12931298
captiveBackend := CaptiveStellarCore{
@@ -1351,6 +1356,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
13511356
mockRunner := &stellarCoreRunnerMock{}
13521357
ctx, cancel := context.WithCancel(context.Background())
13531358
mockRunner.On("context").Return(ctx).Maybe()
1359+
mockRunner.On("getProcessExitError").Return(false, nil)
13541360

13551361
rang := UnboundedRange(100)
13561362
captiveBackend := CaptiveStellarCore{
@@ -1447,5 +1453,4 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {
14471453

14481454
mockRunner.AssertExpectations(t)
14491455
mockArchive.AssertExpectations(t)
1450-
mockLedgerHashStore.AssertExpectations(t)
14511456
}

ingest/ledgerbackend/stellar_core_runner.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type stellarCoreRunner struct {
6666
processExitError error
6767

6868
storagePath string
69+
useDB bool
6970
nonce string
7071

7172
log *log.Entry
@@ -122,6 +123,7 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
122123
ctx: ctx,
123124
cancel: cancel,
124125
storagePath: fullStoragePath,
126+
useDB: config.UseDB,
125127
mode: mode,
126128
nonce: fmt.Sprintf(
127129
"captive-stellar-core-%x",
@@ -261,11 +263,22 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
261263
}
262264

263265
rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
264-
r.cmd = r.createCmd(
265-
"catchup", rangeArg,
266-
"--metadata-output-stream", r.getPipeName(),
267-
"--in-memory",
268-
)
266+
params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()}
267+
268+
// horizon operator has specified to use external storage for captive core ledger state
269+
// instruct captive core invocation to not use memory, and in that case
270+
// cc will look at DATABASE property in cfg toml for the external storage source to use.
271+
// when using external storage of ledgers, use new-db to first set the state of
272+
// remote db storage to genesis to purge any prior state and reset.
273+
if r.useDB {
274+
if err := r.createCmd("new-db").Run(); err != nil {
275+
return errors.Wrap(err, "error initializing core db")
276+
}
277+
} else {
278+
params = append(params, "--in-memory")
279+
}
280+
281+
r.cmd = r.createCmd(params...)
269282

270283
var err error
271284
r.pipe, err = r.start(r.cmd)
@@ -304,13 +317,34 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
304317
return errors.New("runner already started")
305318
}
306319

307-
r.cmd = r.createCmd(
308-
"run",
309-
"--in-memory",
310-
"--start-at-ledger", fmt.Sprintf("%d", from),
311-
"--start-at-hash", hash,
312-
"--metadata-output-stream", r.getPipeName(),
313-
)
320+
if r.useDB {
321+
if err := r.createCmd("new-db").Run(); err != nil {
322+
return errors.Wrap(err, "error initializing core db")
323+
}
324+
// Do a quick catch-up to set the LCL in core to be our expected starting
325+
// point.
326+
if from > 2 {
327+
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
328+
return errors.Wrap(err, "error runing stellar-core catchup")
329+
}
330+
} else if err := r.createCmd("catchup", "2/0").Run(); err != nil {
331+
return errors.Wrap(err, "error runing stellar-core catchup")
332+
}
333+
334+
r.cmd = r.createCmd(
335+
"run",
336+
"--metadata-output-stream",
337+
r.getPipeName(),
338+
)
339+
} else {
340+
r.cmd = r.createCmd(
341+
"run",
342+
"--in-memory",
343+
"--start-at-ledger", fmt.Sprintf("%d", from),
344+
"--start-at-hash", hash,
345+
"--metadata-output-stream", r.getPipeName(),
346+
)
347+
}
314348

315349
var err error
316350
r.pipe, err = r.start(r.cmd)

ingest/ledgerbackend/testdata/expected-offline-core.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Generated file, do not edit
2+
DATABASE = "sqlite3://stellar.db"
23
FAILURE_SAFETY = 0
34
HTTP_PORT = 0
45
LOG_FILE_PATH = ""
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# DATABASE limited to only be sqlite:// protocol
2+
DATABASE="postgres://mydb"
3+
4+
[[HOME_DOMAINS]]
5+
HOME_DOMAIN="testnet.stellar.org"
6+
QUALITY="MEDIUM"
7+
8+
[[VALIDATORS]]
9+
NAME="sdf_testnet_1"
10+
HOME_DOMAIN="testnet.stellar.org"
11+
PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y"
12+
ADDRESS="localhost:123"

ingest/ledgerbackend/toml.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type QuorumSet struct {
6161
}
6262

6363
type captiveCoreTomlValues struct {
64+
Database string `toml:"DATABASE,omitempty"`
6465
// we cannot omitempty because the empty string is a valid configuration for LOG_FILE_PATH
6566
// and the default is stellar-core.log
6667
LogFilePath string `toml:"LOG_FILE_PATH"`
@@ -312,6 +313,8 @@ type CaptiveCoreTomlParams struct {
312313
LogPath *string
313314
// Strict is a flag which, if enabled, rejects Stellar Core toml fields which are not supported by captive core.
314315
Strict bool
316+
// If true, specifies that captive core should be invoked with on-disk rather than in-memory option for ledger state
317+
UseDB bool
315318
}
316319

317320
// NewCaptiveCoreTomlFromFile constructs a new CaptiveCoreToml instance by merging configuration
@@ -405,6 +408,11 @@ func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) {
405408
}
406409

407410
func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) {
411+
412+
if params.UseDB && !c.tree.Has("DATABASE") {
413+
c.Database = "sqlite3://stellar.db"
414+
}
415+
408416
if !c.tree.Has("NETWORK_PASSPHRASE") {
409417
c.NetworkPassphrase = params.NetworkPassphrase
410418
}
@@ -549,5 +557,9 @@ func (c *CaptiveCoreToml) validate(params CaptiveCoreTomlParams) error {
549557
names[v.Name] = true
550558
}
551559

560+
if len(c.Database) > 0 && !strings.HasPrefix(c.Database, "sqlite3://") {
561+
return fmt.Errorf("invalid DATABASE parameter: %s, for captive core config, must be valid sqlite3 db url", c.Database)
562+
}
563+
552564
return nil
553565
}

0 commit comments

Comments
 (0)