-
Notifications
You must be signed in to change notification settings - Fork 521
exp/lighthorizon/index: More testing for batch indexing and off-by-one bugfix. #4442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2b62f14
e01d1c6
ec779b0
27d9fe3
7498161
9bf6b44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,66 +159,70 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { | |
for j := uint32(0); j < config.Workers; j++ { | ||
go func(routineIndex uint32) { | ||
defer wg.Done() | ||
logger := jobLogger. | ||
accountLog := jobLogger. | ||
WithField("worker", routineIndex). | ||
WithField("total", len(accounts)) | ||
logger.Info("Started worker") | ||
WithField("subservice", "accounts") | ||
accountLog.Info("Started worker") | ||
|
||
var accountsProcessed, accountsSkipped uint64 | ||
for account := range workQueues[routineIndex] { | ||
logger.Infof("Account: %s", account) | ||
accountLog. | ||
WithField("total", len(accounts)). | ||
WithField("indexed", accountsProcessed). | ||
WithField("skipped", accountsSkipped) | ||
|
||
accountLog.Debugf("Account: %s", account) | ||
if (accountsProcessed+accountsSkipped)%97 == 0 { | ||
logger. | ||
WithField("indexed", accountsProcessed). | ||
WithField("skipped", accountsSkipped). | ||
Infof("Processed %d/%d accounts", | ||
accountsProcessed+accountsSkipped, len(accounts)) | ||
accountLog.Infof("Processed %d/%d accounts", | ||
accountsProcessed+accountsSkipped, len(accounts)) | ||
} | ||
|
||
logger.Infof("Reading index for account: %s", account) | ||
accountLog.Debugf("Reading index for account: %s", account) | ||
|
||
// First, open the "final merged indices" at the root level | ||
// for this account. | ||
mergedIndices, mergeErr := outerJobStore.Read(account) | ||
mergedIndices, readErr := outerJobStore.Read(account) | ||
|
||
// TODO: in final version this should be critical error, now just skip it | ||
if os.IsNotExist(mergeErr) { | ||
logger.Errorf("Account %s is unavailable - TODO fix", account) | ||
if os.IsNotExist(readErr) { | ||
accountLog.Errorf("Account %s is unavailable - TODO fix", account) | ||
continue | ||
} else if mergeErr != nil { | ||
panic(mergeErr) | ||
} else if err != nil { | ||
panic(readErr) | ||
} | ||
|
||
// Then, iterate through all of the job folders and merge | ||
// indices from all jobs that touched this account. | ||
for k := uint32(0); k < config.MapJobCount; k++ { | ||
var jobErr error | ||
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) | ||
|
||
// FIXME: This could probably come from a pool. Every | ||
// worker needs to have a connection to every index | ||
// store, so there's no reason to re-open these for each | ||
// inner loop. | ||
innerJobStore, indexErr := index.Connect(url) | ||
if indexErr != nil { | ||
logger.WithError(indexErr). | ||
innerJobStore, jobErr := index.Connect(url) | ||
if jobErr != nil { | ||
accountLog.WithError(jobErr). | ||
Errorf("Failed to open index at %s", url) | ||
panic(indexErr) | ||
panic(jobErr) | ||
} | ||
|
||
jobIndices, innerJobErr := innerJobStore.Read(account) | ||
jobIndices, jobErr := innerJobStore.Read(account) | ||
|
||
// This job never touched this account; skip. | ||
if os.IsNotExist(innerJobErr) { | ||
if os.IsNotExist(jobErr) { | ||
continue | ||
} else if innerJobErr != nil { | ||
logger.WithError(innerJobErr). | ||
} else if jobErr != nil { | ||
accountLog.WithError(jobErr). | ||
Errorf("Failed to read index for %s", account) | ||
panic(innerJobErr) | ||
panic(jobErr) | ||
} | ||
|
||
if mergeIndexErr := mergeIndices(mergedIndices, jobIndices); mergeIndexErr != nil { | ||
logger.WithError(mergeIndexErr). | ||
if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil { | ||
accountLog.WithError(jobErr). | ||
Errorf("Merge failure for index at %s", url) | ||
panic(mergeIndexErr) | ||
panic(jobErr) | ||
} | ||
} | ||
|
||
|
@@ -228,71 +232,82 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { | |
// Mark this account for other workers to ignore. | ||
doneAccounts.Add(account) | ||
accountsProcessed++ | ||
logger = logger.WithField("processed", accountsProcessed) | ||
accountLog = accountLog.WithField("processed", accountsProcessed) | ||
|
||
// Periodically flush to disk to save memory. | ||
if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 { | ||
logger.Infof("Flushing indexed accounts.") | ||
if err = finalIndexStore.Flush(); err != nil { | ||
logger.WithError(err).Errorf("Flush error.") | ||
panic(err) | ||
accountLog.Infof("Flushing indexed accounts.") | ||
if flushErr := finalIndexStore.Flush(); flushErr != nil { | ||
accountLog.WithError(flushErr).Errorf("Flush error.") | ||
panic(flushErr) | ||
} | ||
} | ||
} | ||
|
||
jobLogger.Infof("Final account flush.") | ||
accountLog.Infof("Final account flush.") | ||
if err = finalIndexStore.Flush(); err != nil { | ||
logger.WithError(err).Errorf("Flush error.") | ||
accountLog.WithError(err).Errorf("Flush error.") | ||
panic(err) | ||
} | ||
|
||
// Merge the transaction indexes | ||
// There's 256 files, (one for each first byte of the txn hash) | ||
var transactionsProcessed, transactionsSkipped uint64 | ||
logger = jobLogger. | ||
WithField("indexed", transactionsProcessed). | ||
WithField("skipped", transactionsSkipped) | ||
|
||
for i := byte(0x00); i < 0xff; i++ { | ||
if i%97 == 0 { | ||
logger.Infof("%d transactions processed (%d skipped)", | ||
transactionsProcessed, transactionsSkipped) | ||
txLog := jobLogger. | ||
WithField("worker", routineIndex). | ||
WithField("subservice", "transactions") | ||
|
||
var prefixesProcessed, prefixesSkipped uint64 | ||
for i := int(0x00); i <= 0xff; i++ { | ||
b := byte(i) // can't loop over range bc overflow | ||
Comment on lines
+260
to
+261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, I was staring at this for 10 minutes trying to understand what's going on here. Nice one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeahh.. took a lot more than 10 minutes to figure out why the indices were incorrect 😆 For anyone else curious:
|
||
if b%97 == 0 { | ||
txLog.Infof("Processed %d/%d prefixes (%d skipped)", | ||
prefixesProcessed, 0xff, prefixesSkipped) | ||
} | ||
|
||
if !config.shouldProcessTx(i, routineIndex) { | ||
transactionsSkipped++ | ||
if !config.shouldProcessTx(b, routineIndex) { | ||
prefixesSkipped++ | ||
continue | ||
} | ||
transactionsProcessed++ | ||
|
||
prefix := hex.EncodeToString([]byte{i}) | ||
txLog = txLog. | ||
WithField("indexed", prefixesProcessed). | ||
WithField("skipped", prefixesSkipped) | ||
|
||
prefix := hex.EncodeToString([]byte{b}) | ||
for k := uint32(0); k < config.MapJobCount; k++ { | ||
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) | ||
innerJobStore, jobErr := index.Connect(url) | ||
if jobErr != nil { | ||
logger.WithError(jobErr).Errorf("Failed to open index at %s", url) | ||
panic(jobErr) | ||
var innerErr error | ||
|
||
innerJobStore, innerErr := index.Connect(url) | ||
if innerErr != nil { | ||
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url) | ||
panic(innerErr) | ||
} | ||
|
||
innerTxnIndexes, innerJobErr := innerJobStore.ReadTransactions(prefix) | ||
if os.IsNotExist(innerJobErr) { | ||
innerTxnIndexes, innerErr := innerJobStore.ReadTransactions(prefix) | ||
if os.IsNotExist(innerErr) { | ||
continue | ||
} else if innerJobErr != nil { | ||
logger.WithError(innerJobErr).Errorf("Error reading tx prefix %s", prefix) | ||
panic(innerJobErr) | ||
} else if innerErr != nil { | ||
txLog.WithError(innerErr).Errorf("Error reading tx prefix %s", prefix) | ||
panic(innerErr) | ||
} | ||
|
||
if prefixErr := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { | ||
logger.WithError(prefixErr).Errorf("Error merging txs at prefix %s", prefix) | ||
panic(prefixErr) | ||
if innerErr = finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); innerErr != nil { | ||
txLog.WithError(innerErr).Errorf("Error merging txs at prefix %s", prefix) | ||
panic(innerErr) | ||
} | ||
} | ||
|
||
prefixesProcessed++ | ||
} | ||
|
||
jobLogger.Infof("Final transaction flush (%d processed)", transactionsProcessed) | ||
txLog = txLog. | ||
WithField("indexed", prefixesProcessed). | ||
WithField("skipped", prefixesSkipped) | ||
|
||
txLog.Infof("Final transaction flush...") | ||
if err = finalIndexStore.Flush(); err != nil { | ||
logger.Errorf("Error flushing transactions: %v", err) | ||
txLog.Errorf("Error flushing transactions: %v", err) | ||
panic(err) | ||
} | ||
}(j) | ||
|
Uh oh!
There was an error while loading. Please reload this page.