diff --git a/packages/adapters/mqclient/src/index.ts b/packages/adapters/mqclient/src/index.ts index bde1b8ff..453efe84 100644 --- a/packages/adapters/mqclient/src/index.ts +++ b/packages/adapters/mqclient/src/index.ts @@ -2,15 +2,7 @@ import { Queue, Worker, Job, ConnectionOptions, QueueOptions, WorkerOptions } fr import { jsonifyError, Logger } from '@chimera-monorepo/utils'; export { Queue, Worker, Job } from 'bullmq'; - -export const LIGHTHOUSE_QUEUES = { - INTENT: 'lighthouse-intent', - FILL: 'lighthouse-fill', - SETTLEMENT: 'lighthouse-settlement', - SOLANA: 'lighthouse-solana', - EXPIRED: 'lighthouse-expired', - INVOICE: 'lighthouse-invoice', -} as const; +export { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; export const parseRedisUrl = (redisUrl: string): ConnectionOptions => { const url = new URL(redisUrl); diff --git a/packages/agents/cartographer/core/src/operations/intents.ts b/packages/agents/cartographer/core/src/operations/intents.ts index 2d2c7cdf..d95ef2b6 100644 --- a/packages/agents/cartographer/core/src/operations/intents.ts +++ b/packages/agents/cartographer/core/src/operations/intents.ts @@ -1,4 +1,4 @@ -import { createLoggingContext, getMaxTxNonce, jsonifyError } from '@chimera-monorepo/utils'; +import { createLoggingContext, getMaxTxNonce, jsonifyError, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; import { SubgraphQueryMetaParams } from '@chimera-monorepo/adapters-subgraph'; import { AppContext } from '../context'; @@ -6,7 +6,7 @@ import { DEFAULT_SAFE_CONFIRMATIONS } from '../config'; import { getSubgraphSupportedDomains } from './helper'; import { computeIsSwap } from '../lib/intentHelpers'; -export const updateOriginIntents = async (context: AppContext) => { +export const updateOriginIntents = async (context: AppContext): Promise> => { const { adapters: { subgraph, database }, config, @@ -48,12 +48,13 @@ export const updateOriginIntents = async (context: AppContext) => { if (queryMetaParams.size === 0) { logger.debug('No domains to update', requestContext, methodContext, { domains }); - return; + return new Set(); } // Get origin intents for all domains in the mapping. const intents = await subgraph.getOriginIntentsByNonce(queryMetaParams); logger.info('Retrieved origin intents', requestContext, methodContext, { intents: intents.length }); + if (intents.length === 0) return new Set(); // Compute is_swap for each intent by comparing ticker hashes const intentsWithSwapFlag = intents.map((intent) => { @@ -88,9 +89,10 @@ export const updateOriginIntents = async (context: AppContext) => { } // Log the successful update logger.debug('Updated OriginIntents in database', requestContext, methodContext, { intents }); + return new Set([LIGHTHOUSE_QUEUES.INTENT]); }; -export const updateDestinationIntents = async (context: AppContext) => { +export const updateDestinationIntents = async (context: AppContext): Promise> => { const { adapters: { subgraph, database }, config, @@ -129,37 +131,40 @@ export const updateDestinationIntents = async (context: AppContext) => { }), ); - if (queryMetaParams.size > 0) { - // Get destination intents for all domains in the mapping. - const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams); - intents.forEach((intent) => { - const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext( - updateDestinationIntents.name, - ); - logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent }); - }); - const checkpoints = domains - .map((domain) => { - const domainIntents = intents.filter((intent) => intent.destination === domain); - const max = getMaxTxNonce(domainIntents); - const latest = queryMetaParams.get(domain)?.latestNonce ?? 0; - if (domainIntents.length > 0 && max > latest) { - return { domain, checkpoint: max }; - } - return undefined; - }) - .filter((x) => !!x) as { domain: string; checkpoint: number }[]; - - await database.saveDestinationIntents(intents); - for (const checkpoint of checkpoints) { - await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint); - } - // Log the successful update - logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents }); + if (queryMetaParams.size === 0) return new Set(); + + // Get destination intents for all domains in the mapping. + const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams); + if (intents.length === 0) return new Set(); + + intents.forEach((intent) => { + const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext( + updateDestinationIntents.name, + ); + logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent }); + }); + const checkpoints = domains + .map((domain) => { + const domainIntents = intents.filter((intent) => intent.destination === domain); + const max = getMaxTxNonce(domainIntents); + const latest = queryMetaParams.get(domain)?.latestNonce ?? 0; + if (domainIntents.length > 0 && max > latest) { + return { domain, checkpoint: max }; + } + return undefined; + }) + .filter((x) => !!x) as { domain: string; checkpoint: number }[]; + + await database.saveDestinationIntents(intents); + for (const checkpoint of checkpoints) { + await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint); } + // Log the successful update + logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents }); + return new Set([LIGHTHOUSE_QUEUES.FILL]); }; -export const updateHubIntents = async (context: AppContext) => { +export const updateHubIntents = async (context: AppContext): Promise> => { const { adapters: { subgraph, database }, config, @@ -180,7 +185,7 @@ export const updateHubIntents = async (context: AppContext) => { latestBlockMap: Object.fromEntries(latestBlockMap.entries()), }, ); - return; + return new Set(); } // Get the latest checkpoint for the hub domain @@ -216,7 +221,7 @@ export const updateHubIntents = async (context: AppContext) => { if (addedIntents.length === 0 && filledIntents.length === 0 && enqueuedIntents.length === 0) { // Save latest checkpoint logger.debug('No new intents found', requestContext, methodContext); - return; + return new Set(); } // Save intents to the database @@ -249,6 +254,8 @@ export const updateHubIntents = async (context: AppContext) => { const latest = getMaxTxNonce(enqueuedIntents.map((i) => ({ txNonce: i.settlementEnqueuedTxNonce! }))); await database.saveCheckPoint('hub_intent_enqueued_' + config.hub.domain, latest); } + if (enqueuedIntents.length > 0) return new Set([LIGHTHOUSE_QUEUES.SETTLEMENT]); + return new Set(); }; export const updateSettlementIntents = async (context: AppContext) => { diff --git a/packages/agents/cartographer/core/src/operations/invoices.ts b/packages/agents/cartographer/core/src/operations/invoices.ts index c5179a18..9f4eec42 100644 --- a/packages/agents/cartographer/core/src/operations/invoices.ts +++ b/packages/agents/cartographer/core/src/operations/invoices.ts @@ -2,6 +2,7 @@ import { HubDeposit, HubInvoice, TIntentStatus, + LIGHTHOUSE_QUEUES, createLoggingContext, getMaxTxNonce, jsonifyError, @@ -9,7 +10,7 @@ import { import { AppContext } from '../context'; -export const updateHubInvoices = async (context: AppContext) => { +export const updateHubInvoices = async (context: AppContext): Promise> => { const { adapters: { subgraph, database }, config, @@ -30,7 +31,7 @@ export const updateHubInvoices = async (context: AppContext) => { latestBlockMap: Object.fromEntries(latestBlockMap.entries()), }, ); - return; + return new Set(); } // Get the latest checkpoint for the hub domain @@ -56,7 +57,7 @@ export const updateHubInvoices = async (context: AppContext) => { // Exit early if no new invoices are found if (hubInvoices.length === 0) { logger.info('No new hub invoices found', requestContext, methodContext); - return; + return new Set(); } // Deduplicate enqueued invoices @@ -78,13 +79,13 @@ export const updateHubInvoices = async (context: AppContext) => { // Save latest checkpoint const latest = getMaxTxNonce(hubInvoices.map((i) => ({ txNonce: i.enqueuedTxNonce! }))); await database.saveCheckPoint('hub_invoice_' + config.hub.domain, latest); + return new Set([LIGHTHOUSE_QUEUES.INVOICE]); }; /** * @notice Updates processed and enqueued deposits from the hub subgraph. - * @returns Promise */ -export const updateHubDeposits = async (context: AppContext) => { +export const updateHubDeposits = async (context: AppContext): Promise> => { const { adapters: { subgraph, database }, config, @@ -105,7 +106,7 @@ export const updateHubDeposits = async (context: AppContext) => { latestBlockMap: Object.fromEntries(latestBlockMap.entries()), }, ); - return; + return new Set(); } // Get the latest checkpoint for the hub domain @@ -134,7 +135,7 @@ export const updateHubDeposits = async (context: AppContext) => { // Exit early if no new deposits are found if (enqueuedDeposits.length === 0 && processedDeposits.length === 0) { logger.info('No new hub deposits found', requestContext, methodContext); - return; + return new Set(); } // Only save latest entry (processed or enqueued) for each deposit @@ -161,4 +162,5 @@ export const updateHubDeposits = async (context: AppContext) => { const latestProcessed = getMaxTxNonce(processedDeposits.map((i) => ({ txNonce: i.processedTxNonce ?? 0 }))); await database.saveCheckPoint('hub_deposit_enqueued_' + config.hub.domain, latestEnqueued); await database.saveCheckPoint('hub_deposit_processed_' + config.hub.domain, latestProcessed); + return new Set([LIGHTHOUSE_QUEUES.INVOICE]); }; diff --git a/packages/agents/cartographer/handler/package.json b/packages/agents/cartographer/handler/package.json index 59d599d1..77f34bb1 100644 --- a/packages/agents/cartographer/handler/package.json +++ b/packages/agents/cartographer/handler/package.json @@ -14,7 +14,7 @@ "lint": "eslint ./src --ext .ts --env node", "start": "npx ts-node src/index.ts", "test": "yarn test:unit", - "test:unit": "nyc ts-mocha --require 'test/globalTestHook.ts' --check-leaks --exit --timeout 60000 'test/**/*.spec.ts'", + "test:unit": "nyc ts-mocha --check-leaks --exit --timeout 60000 'test/**/*.spec.ts'", "clean": "rimraf ./dist ./tsconfig.tsBuildInfo", "build": "tsc --build ./tsconfig.build.json", "verify": "yarn test && yarn clean && yarn build && yarn lint --max-warnings 0" diff --git a/packages/agents/cartographer/handler/src/maintenance/backfill.ts b/packages/agents/cartographer/handler/src/maintenance/backfill.ts index e5622bfe..1d006cef 100644 --- a/packages/agents/cartographer/handler/src/maintenance/backfill.ts +++ b/packages/agents/cartographer/handler/src/maintenance/backfill.ts @@ -1,6 +1,7 @@ import { createLoggingContext, jsonifyError, sendHeartbeat } from '@chimera-monorepo/utils'; +import { AppContext } from '@chimera-monorepo/cartographer-core'; +import { notifyLighthouse } from '../notify'; import { - AppContext, updateHubInvoices, updateHubDeposits, updateOriginIntents, @@ -15,12 +16,15 @@ import { updateMessageStatus, updateAssets, updateDepositors, -} from '@chimera-monorepo/cartographer-core'; +} from '../mockable'; /** * Run all backfill operations using core operations with the handler's context. * This catches any events missed by webhooks and also runs periodic-only operations * like updateMessageStatus (which checks on-chain Hyperlane delivery status). + * + * When an operation pulls new data from subgraphs, the corresponding lighthouse + * queue is notified so the lighthouse can process it. */ export const runBackfill = async (context: AppContext): Promise => { const { logger } = context; @@ -28,14 +32,20 @@ export const runBackfill = async (context: AppContext): Promise => { logger.debug('Starting backfill maintenance cycle', requestContext, methodContext); - const operations = [ - { name: 'updateOrders', fn: updateOrders }, + // Operations that notify lighthouse when new data is found. + // Each returns a Set of lighthouse queue names to notify. + const notifyOperations: { name: string; fn: (ctx: AppContext) => Promise> }[] = [ { name: 'updateOriginIntents', fn: updateOriginIntents }, { name: 'updateDestinationIntents', fn: updateDestinationIntents }, { name: 'updateHubIntents', fn: updateHubIntents }, - { name: 'updateSettlementIntents', fn: updateSettlementIntents }, { name: 'updateHubInvoices', fn: updateHubInvoices }, { name: 'updateHubDeposits', fn: updateHubDeposits }, + ]; + + // Operations that don't notify lighthouse + const plainOperations = [ + { name: 'updateSettlementIntents', fn: updateSettlementIntents }, + { name: 'updateOrders', fn: updateOrders }, { name: 'updateAssets', fn: updateAssets }, { name: 'updateDepositors', fn: updateDepositors }, { name: 'updateMessages', fn: updateMessages }, @@ -45,7 +55,19 @@ export const runBackfill = async (context: AppContext): Promise => { { name: 'updateMessageStatus', fn: updateMessageStatus }, ]; - for (const op of operations) { + // Collect which queues need notification (deduplicated) + const queuesToNotify = new Set(); + + for (const op of notifyOperations) { + try { + const queues = await op.fn(context); + for (const q of queues) queuesToNotify.add(q); + } catch (error) { + logger.error(`Backfill error in ${op.name}`, requestContext, methodContext, jsonifyError(error as Error)); + } + } + + for (const op of plainOperations) { try { await op.fn(context); } catch (error) { @@ -53,6 +75,11 @@ export const runBackfill = async (context: AppContext): Promise => { } } + // Notify lighthouse for all queues that had new data + for (const queueName of queuesToNotify) { + await notifyLighthouse(queueName); + } + // Full materialized view refresh (non-debounced) try { await context.adapters.database.refreshIntentsView(); diff --git a/packages/agents/cartographer/handler/src/mockable.ts b/packages/agents/cartographer/handler/src/mockable.ts new file mode 100644 index 00000000..8aa34daa --- /dev/null +++ b/packages/agents/cartographer/handler/src/mockable.ts @@ -0,0 +1,31 @@ +import { + updateHubInvoices as _updateHubInvoices, + updateHubDeposits as _updateHubDeposits, + updateOriginIntents as _updateOriginIntents, + updateDestinationIntents as _updateDestinationIntents, + updateHubIntents as _updateHubIntents, + updateSettlementIntents as _updateSettlementIntents, + updateOrders as _updateOrders, + updateMessages as _updateMessages, + updateQueues as _updateQueues, + updateHubSpokeMeta as _updateHubSpokeMeta, + updateProtocolUpdateLogs as _updateProtocolUpdateLogs, + updateMessageStatus as _updateMessageStatus, + updateAssets as _updateAssets, + updateDepositors as _updateDepositors, +} from '@chimera-monorepo/cartographer-core'; + +export const updateHubInvoices = _updateHubInvoices; +export const updateHubDeposits = _updateHubDeposits; +export const updateOriginIntents = _updateOriginIntents; +export const updateDestinationIntents = _updateDestinationIntents; +export const updateHubIntents = _updateHubIntents; +export const updateSettlementIntents = _updateSettlementIntents; +export const updateOrders = _updateOrders; +export const updateMessages = _updateMessages; +export const updateQueues = _updateQueues; +export const updateHubSpokeMeta = _updateHubSpokeMeta; +export const updateProtocolUpdateLogs = _updateProtocolUpdateLogs; +export const updateMessageStatus = _updateMessageStatus; +export const updateAssets = _updateAssets; +export const updateDepositors = _updateDepositors; diff --git a/packages/agents/cartographer/handler/src/notify.ts b/packages/agents/cartographer/handler/src/notify.ts index ddc624fd..e0f27e13 100644 --- a/packages/agents/cartographer/handler/src/notify.ts +++ b/packages/agents/cartographer/handler/src/notify.ts @@ -1,5 +1,5 @@ -import { createProducer, pingRedis, LIGHTHOUSE_QUEUES, Queue } from '@chimera-monorepo/mqclient'; -import { jsonifyError, Logger } from '@chimera-monorepo/utils'; +import { createProducer, pingRedis, Queue } from '@chimera-monorepo/mqclient'; +import { jsonifyError, Logger, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; let queues: Map | null = null; let notifyLogger: Logger | undefined; diff --git a/packages/agents/cartographer/handler/src/processors/intentProcessor.ts b/packages/agents/cartographer/handler/src/processors/intentProcessor.ts index ec3b825b..c8a4b49d 100644 --- a/packages/agents/cartographer/handler/src/processors/intentProcessor.ts +++ b/packages/agents/cartographer/handler/src/processors/intentProcessor.ts @@ -9,7 +9,7 @@ import { } from '../webhooks/parsers'; import { base64ToHex } from '../webhooks/webhookHandler'; import { notifyLighthouse } from '../notify'; -import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient'; +import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; /** * Safely extract an intent ID from the webhook payload. diff --git a/packages/agents/cartographer/handler/src/processors/invoiceProcessor.ts b/packages/agents/cartographer/handler/src/processors/invoiceProcessor.ts index 58f9f6a4..4139e314 100644 --- a/packages/agents/cartographer/handler/src/processors/invoiceProcessor.ts +++ b/packages/agents/cartographer/handler/src/processors/invoiceProcessor.ts @@ -1,9 +1,8 @@ -import { HubIntent, TIntentStatus } from '@chimera-monorepo/utils'; +import { HubIntent, TIntentStatus, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; import { AppContext } from '@chimera-monorepo/cartographer-core'; import { parseHubInvoice, parseHubDeposit } from '../webhooks/parsers'; import { base64ToHex } from '../webhooks/webhookHandler'; import { notifyLighthouse } from '../notify'; -import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient'; /** * Create a minimal HubIntent for status-only upserts. diff --git a/packages/agents/cartographer/handler/src/processors/monitorProcessor.ts b/packages/agents/cartographer/handler/src/processors/monitorProcessor.ts index b765aa3e..8ccb0318 100644 --- a/packages/agents/cartographer/handler/src/processors/monitorProcessor.ts +++ b/packages/agents/cartographer/handler/src/processors/monitorProcessor.ts @@ -11,8 +11,6 @@ import { HubMessage, } from '@chimera-monorepo/utils'; import { AppContext, CartographerConfig } from '@chimera-monorepo/cartographer-core'; -import { notifyLighthouse } from '../notify'; -import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient'; import { parseMessage, parseQueue, @@ -87,15 +85,6 @@ export const processHubMessage = async (payload: Record, contex : []; await database.saveMessages([message], [], [], hubIntentUpdates); - - // Notify lighthouse for settlement messages - if (msg.type === TMessageType.Settlement) { - if (destDomain === SOLANA_CHAINID) { - await notifyLighthouse(LIGHTHOUSE_QUEUES.SOLANA); - } else { - await notifyLighthouse(LIGHTHOUSE_QUEUES.SETTLEMENT); - } - } }; export const processSpokeMessage = async (payload: Record, context: AppContext): Promise => { @@ -129,11 +118,6 @@ export const processSpokeMessage = async (payload: Record, cont : []; await database.saveMessages([message], originIntentUpdates, destinationIntentUpdates, []); - - // Notify lighthouse for fill messages - if (msg.type === TMessageType.Fill) { - await notifyLighthouse(LIGHTHOUSE_QUEUES.FILL); - } }; export const processQueue = async ( diff --git a/packages/agents/cartographer/handler/src/processors/solanaInstructionProcessor.ts b/packages/agents/cartographer/handler/src/processors/solanaInstructionProcessor.ts index 3f234a85..9eed47cd 100644 --- a/packages/agents/cartographer/handler/src/processors/solanaInstructionProcessor.ts +++ b/packages/agents/cartographer/handler/src/processors/solanaInstructionProcessor.ts @@ -1,8 +1,7 @@ // eslint-disable-next-line @typescript-eslint/no-var-requires const bs58 = require('bs58') as { decode: (input: string) => Uint8Array }; import { AppContext } from '@chimera-monorepo/cartographer-core'; -import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient'; -import { SOLANA_CHAINID } from '@chimera-monorepo/utils'; +import { LIGHTHOUSE_QUEUES, SOLANA_CHAINID } from '@chimera-monorepo/utils'; import { notifyLighthouse } from '../notify'; /** CPI discriminator that must match bytes 0–7 of the decoded instruction data. */ diff --git a/packages/agents/cartographer/handler/src/processors/tronLogProcessor.ts b/packages/agents/cartographer/handler/src/processors/tronLogProcessor.ts index 2b835d99..cbfe9155 100644 --- a/packages/agents/cartographer/handler/src/processors/tronLogProcessor.ts +++ b/packages/agents/cartographer/handler/src/processors/tronLogProcessor.ts @@ -1,6 +1,5 @@ import { AppContext } from '@chimera-monorepo/cartographer-core'; -import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient'; -import { TRON_CHAINID } from '@chimera-monorepo/utils'; +import { LIGHTHOUSE_QUEUES, TRON_CHAINID } from '@chimera-monorepo/utils'; import { notifyLighthouse } from '../notify'; /** Map of topic[0] hash → lighthouse queue name. */ diff --git a/packages/agents/cartographer/handler/test/globalTestHook.ts b/packages/agents/cartographer/handler/test/globalTestHook.ts deleted file mode 100644 index e75b48d4..00000000 --- a/packages/agents/cartographer/handler/test/globalTestHook.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { reset, restore } from 'sinon'; - -export const mochaHooks = { - afterEach() { - restore(); - reset(); - }, -}; diff --git a/packages/agents/cartographer/handler/test/maintenance/backfill.spec.ts b/packages/agents/cartographer/handler/test/maintenance/backfill.spec.ts new file mode 100644 index 00000000..1b0c3a28 --- /dev/null +++ b/packages/agents/cartographer/handler/test/maintenance/backfill.spec.ts @@ -0,0 +1,171 @@ +import { stub, restore, SinonStub } from 'sinon'; +import { expect, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; +import { AppContext } from '@chimera-monorepo/cartographer-core'; + +import * as mockable from '../../src/mockable'; +import { runBackfill } from '../../src/maintenance/backfill'; +import * as notify from '../../src/notify'; +import { createAppContext } from '../mock'; + +describe('backfill', () => { + let context: AppContext; + let notifyStub: SinonStub; + + // Core operations that should trigger lighthouse notifications + let updateOriginIntentsStub: SinonStub; + let updateDestinationIntentsStub: SinonStub; + let updateHubIntentsStub: SinonStub; + let updateHubInvoicesStub: SinonStub; + let updateHubDepositsStub: SinonStub; + + // Core operations that should NOT trigger lighthouse notifications + let updateSettlementIntentsStub: SinonStub; + let updateOrdersStub: SinonStub; + let updateAssetsStub: SinonStub; + let updateDepositorsStub: SinonStub; + let updateMessagesStub: SinonStub; + let updateQueuesStub: SinonStub; + let updateHubSpokeMetaStub: SinonStub; + let updateProtocolUpdateLogsStub: SinonStub; + let updateMessageStatusStub: SinonStub; + + beforeEach(() => { + context = createAppContext(); + notifyStub = stub(notify, 'notifyLighthouse').resolves(); + + // Notify operations — default to returning empty set (no new data) + updateOriginIntentsStub = stub(mockable, 'updateOriginIntents').resolves(new Set()); + updateDestinationIntentsStub = stub(mockable, 'updateDestinationIntents').resolves(new Set()); + updateHubIntentsStub = stub(mockable, 'updateHubIntents').resolves(new Set()); + updateHubInvoicesStub = stub(mockable, 'updateHubInvoices').resolves(new Set()); + updateHubDepositsStub = stub(mockable, 'updateHubDeposits').resolves(new Set()); + + // Plain operations + updateSettlementIntentsStub = stub(mockable, 'updateSettlementIntents').resolves(); + updateMessagesStub = stub(mockable, 'updateMessages').resolves(); + updateOrdersStub = stub(mockable, 'updateOrders').resolves(); + updateAssetsStub = stub(mockable, 'updateAssets').resolves(); + updateDepositorsStub = stub(mockable, 'updateDepositors').resolves(); + updateQueuesStub = stub(mockable, 'updateQueues').resolves(); + updateHubSpokeMetaStub = stub(mockable, 'updateHubSpokeMeta').resolves(); + updateProtocolUpdateLogsStub = stub(mockable, 'updateProtocolUpdateLogs').resolves(); + updateMessageStatusStub = stub(mockable, 'updateMessageStatus').resolves(); + }); + + afterEach(() => { + restore(); + }); + + it('should run all operations', async () => { + await runBackfill(context); + + expect(updateOriginIntentsStub.callCount).to.equal(1); + expect(updateDestinationIntentsStub.callCount).to.equal(1); + expect(updateHubIntentsStub.callCount).to.equal(1); + expect(updateSettlementIntentsStub.callCount).to.equal(1); + expect(updateHubInvoicesStub.callCount).to.equal(1); + expect(updateHubDepositsStub.callCount).to.equal(1); + expect(updateMessagesStub.callCount).to.equal(1); + expect(updateOrdersStub.callCount).to.equal(1); + expect(updateAssetsStub.callCount).to.equal(1); + expect(updateDepositorsStub.callCount).to.equal(1); + expect(updateQueuesStub.callCount).to.equal(1); + expect(updateHubSpokeMetaStub.callCount).to.equal(1); + expect(updateProtocolUpdateLogsStub.callCount).to.equal(1); + expect(updateMessageStatusStub.callCount).to.equal(1); + }); + + it('should not notify lighthouse when no operations return new data', async () => { + await runBackfill(context); + + expect(notifyStub.callCount).to.equal(0); + }); + + it('should notify INTENT queue when updateOriginIntents finds new data', async () => { + updateOriginIntentsStub.resolves(new Set([LIGHTHOUSE_QUEUES.INTENT])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.INTENT)).to.be.true; + }); + + it('should notify FILL queue when updateDestinationIntents finds new data', async () => { + updateDestinationIntentsStub.resolves(new Set([LIGHTHOUSE_QUEUES.FILL])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.FILL)).to.be.true; + }); + + it('should notify SETTLEMENT queue when updateHubIntents finds enqueued intents', async () => { + updateHubIntentsStub.resolves(new Set([LIGHTHOUSE_QUEUES.SETTLEMENT])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.SETTLEMENT)).to.be.true; + }); + + it('should notify INVOICE queue when updateHubInvoices finds new data', async () => { + updateHubInvoicesStub.resolves(new Set([LIGHTHOUSE_QUEUES.INVOICE])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.INVOICE)).to.be.true; + }); + + it('should notify INVOICE queue when updateHubDeposits finds new data', async () => { + updateHubDepositsStub.resolves(new Set([LIGHTHOUSE_QUEUES.INVOICE])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.INVOICE)).to.be.true; + }); + + it('should deduplicate queue notifications across operations', async () => { + // Both updateHubInvoices and updateHubDeposits return INVOICE + updateHubInvoicesStub.resolves(new Set([LIGHTHOUSE_QUEUES.INVOICE])); + updateHubDepositsStub.resolves(new Set([LIGHTHOUSE_QUEUES.INVOICE])); + + await runBackfill(context); + + // INVOICE should be notified only once despite two operations returning it + const invoiceCalls = notifyStub.getCalls().filter((c: { args: string[] }) => c.args[0] === LIGHTHOUSE_QUEUES.INVOICE); + expect(invoiceCalls.length).to.equal(1); + }); + + it('should continue running remaining operations when one fails', async () => { + updateOriginIntentsStub.rejects(new Error('subgraph timeout')); + + await runBackfill(context); + + expect(updateDestinationIntentsStub.callCount).to.equal(1); + expect(updateHubIntentsStub.callCount).to.equal(1); + expect(updateOrdersStub.callCount).to.equal(1); + }); + + it('should not notify for a queue whose operation failed', async () => { + updateOriginIntentsStub.rejects(new Error('subgraph timeout')); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.INTENT)).to.be.false; + }); + + it('should still notify other queues when one operation fails', async () => { + updateOriginIntentsStub.rejects(new Error('subgraph timeout')); + updateHubIntentsStub.resolves(new Set([LIGHTHOUSE_QUEUES.SETTLEMENT])); + + await runBackfill(context); + + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.INTENT)).to.be.false; + expect(notifyStub.calledWith(LIGHTHOUSE_QUEUES.SETTLEMENT)).to.be.true; + }); + + it('should refresh materialized views', async () => { + await runBackfill(context); + + const db = context.adapters.database; + expect((db.refreshIntentsView as SinonStub).callCount).to.equal(1); + expect((db.refreshInvoicesView as SinonStub).callCount).to.equal(1); + }); +}); diff --git a/packages/agents/cartographer/handler/test/processors/intentProcessor.spec.ts b/packages/agents/cartographer/handler/test/processors/intentProcessor.spec.ts index 19cc155d..7da80d4f 100644 --- a/packages/agents/cartographer/handler/test/processors/intentProcessor.spec.ts +++ b/packages/agents/cartographer/handler/test/processors/intentProcessor.spec.ts @@ -1,7 +1,7 @@ import { expect, mkAddress, mkBytes32 } from '@chimera-monorepo/utils'; import { SinonStubbedInstance } from 'sinon'; import { Database } from '@chimera-monorepo/database'; -import { AppContext, CartographerConfig } from '@chimera-monorepo/cartographer-core'; +import { AppContext } from '@chimera-monorepo/cartographer-core'; import { processOriginIntent, diff --git a/packages/agents/cartographer/handler/test/processors/solanaInstructionProcessor.spec.ts b/packages/agents/cartographer/handler/test/processors/solanaInstructionProcessor.spec.ts index db067ec3..c22dd30f 100644 --- a/packages/agents/cartographer/handler/test/processors/solanaInstructionProcessor.spec.ts +++ b/packages/agents/cartographer/handler/test/processors/solanaInstructionProcessor.spec.ts @@ -1,5 +1,5 @@ import { expect } from '@chimera-monorepo/utils'; -import { stub, restore } from 'sinon'; +import { stub, restore, SinonStub } from 'sinon'; import { AppContext } from '@chimera-monorepo/cartographer-core'; import bs58 from 'bs58'; @@ -9,7 +9,7 @@ import { createAppContext } from '../mock'; describe('solanaInstructionProcessor', () => { let context: AppContext; - let notifyStub: sinon.SinonStub; + let notifyStub: SinonStub; // CPI discriminator: e445a52e51cb9a1d const CPI_DISC = Buffer.from('e445a52e51cb9a1d', 'hex'); diff --git a/packages/agents/cartographer/handler/test/processors/tronLogProcessor.spec.ts b/packages/agents/cartographer/handler/test/processors/tronLogProcessor.spec.ts index 00c1a5b8..77d39795 100644 --- a/packages/agents/cartographer/handler/test/processors/tronLogProcessor.spec.ts +++ b/packages/agents/cartographer/handler/test/processors/tronLogProcessor.spec.ts @@ -1,5 +1,5 @@ import { expect } from '@chimera-monorepo/utils'; -import { stub, restore } from 'sinon'; +import { stub, restore, SinonStub } from 'sinon'; import { AppContext } from '@chimera-monorepo/cartographer-core'; import { processTronLog } from '../../src/processors/tronLogProcessor'; @@ -8,7 +8,7 @@ import { createAppContext } from '../mock'; describe('tronLogProcessor', () => { let context: AppContext; - let notifyStub: sinon.SinonStub; + let notifyStub: SinonStub; beforeEach(() => { context = createAppContext(); diff --git a/packages/agents/lighthouse/src/server.ts b/packages/agents/lighthouse/src/server.ts index 61d4c166..0481d27f 100644 --- a/packages/agents/lighthouse/src/server.ts +++ b/packages/agents/lighthouse/src/server.ts @@ -1,14 +1,6 @@ import { timingSafeEqual } from 'crypto'; -import { Logger, jsonifyError, createLoggingContext, QueueType } from '@chimera-monorepo/utils'; -import { - createWorker, - createProducer, - pingRedis, - LIGHTHOUSE_QUEUES, - Worker, - Queue, - Job, -} from '@chimera-monorepo/mqclient'; +import { Logger, jsonifyError, createLoggingContext, QueueType, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils'; +import { createWorker, createProducer, pingRedis, Worker, Queue, Job } from '@chimera-monorepo/mqclient'; import fastify, { FastifyInstance } from 'fastify'; import { getConfig } from './config'; diff --git a/packages/utils/src/constants/index.ts b/packages/utils/src/constants/index.ts index 7504e756..8bf3cb5a 100644 --- a/packages/utils/src/constants/index.ts +++ b/packages/utils/src/constants/index.ts @@ -1,2 +1,3 @@ export * from './abi'; export * from './domains'; +export * from './queue'; diff --git a/packages/utils/src/constants/queue.ts b/packages/utils/src/constants/queue.ts new file mode 100644 index 00000000..5659dded --- /dev/null +++ b/packages/utils/src/constants/queue.ts @@ -0,0 +1,8 @@ +export const LIGHTHOUSE_QUEUES = { + INTENT: 'lighthouse-intent', + FILL: 'lighthouse-fill', + SETTLEMENT: 'lighthouse-settlement', + SOLANA: 'lighthouse-solana', + EXPIRED: 'lighthouse-expired', + INVOICE: 'lighthouse-invoice', +} as const;