Skip to content

Commit 27d9fe3

Browse files
committed
Isolate loggers to individual processing "sections"
1 parent ec779b0 commit 27d9fe3

File tree

1 file changed

+70
-56
lines changed
  • exp/lighthorizon/index/cmd/batch/reduce

1 file changed

+70
-56
lines changed

exp/lighthorizon/index/cmd/batch/reduce/main.go

Lines changed: 70 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -159,66 +159,70 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
159159
for j := uint32(0); j < config.Workers; j++ {
160160
go func(routineIndex uint32) {
161161
defer wg.Done()
162-
logger := jobLogger.
162+
accountLog := jobLogger.
163163
WithField("worker", routineIndex).
164-
WithField("total", len(accounts))
165-
logger.Info("Started worker")
164+
WithField("subservice", "accounts")
165+
accountLog.Info("Started worker")
166166

167167
var accountsProcessed, accountsSkipped uint64
168168
for account := range workQueues[routineIndex] {
169-
logger.Infof("Account: %s", account)
169+
accountLog.
170+
WithField("total", len(accounts)).
171+
WithField("indexed", accountsProcessed).
172+
WithField("skipped", accountsSkipped)
173+
174+
accountLog.Debugf("Account: %s", account)
170175
if (accountsProcessed+accountsSkipped)%97 == 0 {
171-
logger.
172-
WithField("indexed", accountsProcessed).
173-
WithField("skipped", accountsSkipped).
174-
Infof("Processed %d/%d accounts",
175-
accountsProcessed+accountsSkipped, len(accounts))
176+
accountLog.Infof("Processed %d/%d accounts",
177+
accountsProcessed+accountsSkipped, len(accounts))
176178
}
177179

178-
logger.Infof("Reading index for account: %s", account)
180+
accountLog.Debugf("Reading index for account: %s", account)
179181

180182
// First, open the "final merged indices" at the root level
181183
// for this account.
182-
mergedIndices, mergeErr := outerJobStore.Read(account)
184+
mergedIndices, err := outerJobStore.Read(account)
183185

184186
// TODO: in final version this should be critical error, now just skip it
185-
if os.IsNotExist(mergeErr) {
186-
logger.Errorf("Account %s is unavailable - TODO fix", account)
187+
if os.IsNotExist(err) {
188+
accountLog.Errorf("Account %s is unavailable - TODO fix", account)
187189
continue
188-
} else if mergeErr != nil {
189-
panic(mergeErr)
190+
} else if err != nil {
191+
panic(err)
190192
}
191193

192194
// Then, iterate through all of the job folders and merge
193195
// indices from all jobs that touched this account.
194196
for k := uint32(0); k < config.MapJobCount; k++ {
197+
var jobErr error
195198
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
196199

197200
// FIXME: This could probably come from a pool. Every
198201
// worker needs to have a connection to every index
199202
// store, so there's no reason to re-open these for each
200203
// inner loop.
201-
innerJobStore, indexErr := index.Connect(url)
202-
if indexErr != nil {
203-
logger.WithError(indexErr).
204+
innerJobStore, jobErr := index.Connect(url)
205+
if jobErr != nil {
206+
accountLog.WithError(jobErr).
204207
Errorf("Failed to open index at %s", url)
205-
panic(indexErr)
208+
panic(jobErr)
206209
}
207210

208-
jobIndices, innerJobErr := innerJobStore.Read(account)
211+
jobIndices, jobErr := innerJobStore.Read(account)
212+
209213
// This job never touched this account; skip.
210-
if os.IsNotExist(innerJobErr) {
214+
if os.IsNotExist(jobErr) {
211215
continue
212-
} else if innerJobErr != nil {
213-
logger.WithError(innerJobErr).
216+
} else if jobErr != nil {
217+
accountLog.WithError(jobErr).
214218
Errorf("Failed to read index for %s", account)
215-
panic(innerJobErr)
219+
panic(jobErr)
216220
}
217221

218-
if mergeIndexErr := mergeIndices(mergedIndices, jobIndices); mergeIndexErr != nil {
219-
logger.WithError(mergeIndexErr).
222+
if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
223+
accountLog.WithError(jobErr).
220224
Errorf("Merge failure for index at %s", url)
221-
panic(mergeIndexErr)
225+
panic(jobErr)
222226
}
223227
}
224228

@@ -228,72 +232,82 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
228232
// Mark this account for other workers to ignore.
229233
doneAccounts.Add(account)
230234
accountsProcessed++
231-
logger = logger.WithField("processed", accountsProcessed)
235+
accountLog = accountLog.WithField("processed", accountsProcessed)
232236

233237
// Periodically flush to disk to save memory.
234238
if accountsProcessed%ACCOUNT_FLUSH_FREQUENCY == 0 {
235-
logger.Infof("Flushing indexed accounts.")
239+
accountLog.Infof("Flushing indexed accounts.")
236240
if err = finalIndexStore.Flush(); err != nil {
237-
logger.WithError(err).Errorf("Flush error.")
241+
accountLog.WithError(err).Errorf("Flush error.")
238242
panic(err)
239243
}
240244
}
241245
}
242246

243-
jobLogger.Infof("Final account flush.")
247+
accountLog.Infof("Final account flush.")
244248
if err = finalIndexStore.Flush(); err != nil {
245-
logger.WithError(err).Errorf("Flush error.")
249+
accountLog.WithError(err).Errorf("Flush error.")
246250
panic(err)
247251
}
248252

249253
// Merge the transaction indexes
250254
// There's 256 files, (one for each first byte of the txn hash)
251-
var transactionsProcessed, transactionsSkipped uint64
252-
logger = jobLogger.
253-
WithField("indexed", transactionsProcessed).
254-
WithField("skipped", transactionsSkipped)
255+
txLog := jobLogger.
256+
WithField("worker", routineIndex).
257+
WithField("subservice", "transactions")
255258

259+
var prefixesProcessed, prefixesSkipped uint64
256260
for i := int(0x00); i <= 0xff; i++ {
257261
b := byte(i) // can't loop over range bc overflow
258-
if i%97 == 0 {
259-
logger.Infof("%d transactions processed (%d skipped)",
260-
transactionsProcessed, transactionsSkipped)
262+
if b%97 == 0 {
263+
txLog.Infof("Processed %d/%d prefixes (%d skipped)",
264+
prefixesProcessed, 0xff, prefixesSkipped)
261265
}
262266

263267
if !config.shouldProcessTx(b, routineIndex) {
264-
transactionsSkipped++
268+
prefixesSkipped++
265269
continue
266270
}
267-
transactionsProcessed++
268271

269-
prefix := hex.EncodeToString([]byte{b})
272+
txLog = txLog.
273+
WithField("indexed", prefixesProcessed).
274+
WithField("skipped", prefixesSkipped)
270275

276+
prefix := hex.EncodeToString([]byte{b})
271277
for k := uint32(0); k < config.MapJobCount; k++ {
272278
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
273-
innerJobStore, jobErr := index.Connect(url)
274-
if jobErr != nil {
275-
logger.WithError(jobErr).Errorf("Failed to open index at %s", url)
276-
panic(jobErr)
279+
var innerErr error
280+
281+
innerJobStore, innerErr := index.Connect(url)
282+
if innerErr != nil {
283+
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
284+
panic(innerErr)
277285
}
278286

279-
innerTxnIndexes, innerJobErr := innerJobStore.ReadTransactions(prefix)
280-
if os.IsNotExist(innerJobErr) {
287+
innerTxnIndexes, innerErr := innerJobStore.ReadTransactions(prefix)
288+
if os.IsNotExist(innerErr) {
281289
continue
282-
} else if innerJobErr != nil {
283-
logger.WithError(innerJobErr).Errorf("Error reading tx prefix %s", prefix)
284-
panic(innerJobErr)
290+
} else if innerErr != nil {
291+
txLog.WithError(innerErr).Errorf("Error reading tx prefix %s", prefix)
292+
panic(innerErr)
285293
}
286294

287-
if prefixErr := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil {
288-
logger.WithError(prefixErr).Errorf("Error merging txs at prefix %s", prefix)
289-
panic(prefixErr)
295+
if innerErr = finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); innerErr != nil {
296+
txLog.WithError(innerErr).Errorf("Error merging txs at prefix %s", prefix)
297+
panic(innerErr)
290298
}
291299
}
300+
301+
prefixesProcessed++
292302
}
293303

294-
jobLogger.Infof("Final transaction flush (%d processed)", transactionsProcessed)
304+
txLog = txLog.
305+
WithField("indexed", prefixesProcessed).
306+
WithField("skipped", prefixesSkipped)
307+
308+
txLog.Infof("Final transaction flush...")
295309
if err = finalIndexStore.Flush(); err != nil {
296-
logger.Errorf("Error flushing transactions: %v", err)
310+
txLog.Errorf("Error flushing transactions: %v", err)
297311
panic(err)
298312
}
299313
}(j)

0 commit comments

Comments
 (0)