Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions packages/adapters/mqclient/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,7 @@ import { Queue, Worker, Job, ConnectionOptions, QueueOptions, WorkerOptions } fr
import { jsonifyError, Logger } from '@chimera-monorepo/utils';

export { Queue, Worker, Job } from 'bullmq';

export const LIGHTHOUSE_QUEUES = {
INTENT: 'lighthouse-intent',
FILL: 'lighthouse-fill',
SETTLEMENT: 'lighthouse-settlement',
SOLANA: 'lighthouse-solana',
EXPIRED: 'lighthouse-expired',
INVOICE: 'lighthouse-invoice',
} as const;
export { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils';

export const parseRedisUrl = (redisUrl: string): ConnectionOptions => {
const url = new URL(redisUrl);
Expand Down
75 changes: 41 additions & 34 deletions packages/agents/cartographer/core/src/operations/intents.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { createLoggingContext, getMaxTxNonce, jsonifyError } from '@chimera-monorepo/utils';
import { createLoggingContext, getMaxTxNonce, jsonifyError, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils';
import { SubgraphQueryMetaParams } from '@chimera-monorepo/adapters-subgraph';

import { AppContext } from '../context';
import { DEFAULT_SAFE_CONFIRMATIONS } from '../config';
import { getSubgraphSupportedDomains } from './helper';
import { computeIsSwap } from '../lib/intentHelpers';

export const updateOriginIntents = async (context: AppContext) => {
export const updateOriginIntents = async (context: AppContext): Promise<Set<string>> => {
const {
adapters: { subgraph, database },
config,
Expand Down Expand Up @@ -48,12 +48,13 @@ export const updateOriginIntents = async (context: AppContext) => {

if (queryMetaParams.size === 0) {
logger.debug('No domains to update', requestContext, methodContext, { domains });
return;
return new Set();
}

// Get origin intents for all domains in the mapping.
const intents = await subgraph.getOriginIntentsByNonce(queryMetaParams);
logger.info('Retrieved origin intents', requestContext, methodContext, { intents: intents.length });
if (intents.length === 0) return new Set();

// Compute is_swap for each intent by comparing ticker hashes
const intentsWithSwapFlag = intents.map((intent) => {
Expand Down Expand Up @@ -88,9 +89,10 @@ export const updateOriginIntents = async (context: AppContext) => {
}
// Log the successful update
logger.debug('Updated OriginIntents in database', requestContext, methodContext, { intents });
return new Set([LIGHTHOUSE_QUEUES.INTENT]);
};

export const updateDestinationIntents = async (context: AppContext) => {
export const updateDestinationIntents = async (context: AppContext): Promise<Set<string>> => {
const {
adapters: { subgraph, database },
config,
Expand Down Expand Up @@ -129,37 +131,40 @@ export const updateDestinationIntents = async (context: AppContext) => {
}),
);

if (queryMetaParams.size > 0) {
// Get destination intents for all domains in the mapping.
const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams);
intents.forEach((intent) => {
const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext(
updateDestinationIntents.name,
);
logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent });
});
const checkpoints = domains
.map((domain) => {
const domainIntents = intents.filter((intent) => intent.destination === domain);
const max = getMaxTxNonce(domainIntents);
const latest = queryMetaParams.get(domain)?.latestNonce ?? 0;
if (domainIntents.length > 0 && max > latest) {
return { domain, checkpoint: max };
}
return undefined;
})
.filter((x) => !!x) as { domain: string; checkpoint: number }[];

await database.saveDestinationIntents(intents);
for (const checkpoint of checkpoints) {
await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint);
}
// Log the successful update
logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents });
if (queryMetaParams.size === 0) return new Set();

// Get destination intents for all domains in the mapping.
const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams);
if (intents.length === 0) return new Set();

intents.forEach((intent) => {
const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext(
updateDestinationIntents.name,
);
logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent });
});
const checkpoints = domains
.map((domain) => {
const domainIntents = intents.filter((intent) => intent.destination === domain);
const max = getMaxTxNonce(domainIntents);
const latest = queryMetaParams.get(domain)?.latestNonce ?? 0;
if (domainIntents.length > 0 && max > latest) {
return { domain, checkpoint: max };
}
return undefined;
})
.filter((x) => !!x) as { domain: string; checkpoint: number }[];

await database.saveDestinationIntents(intents);
for (const checkpoint of checkpoints) {
await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint);
}
// Log the successful update
logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents });
return new Set([LIGHTHOUSE_QUEUES.FILL]);
};

export const updateHubIntents = async (context: AppContext) => {
export const updateHubIntents = async (context: AppContext): Promise<Set<string>> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -180,7 +185,7 @@ export const updateHubIntents = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return new Set();
}

// Get the latest checkpoint for the hub domain
Expand Down Expand Up @@ -216,7 +221,7 @@ export const updateHubIntents = async (context: AppContext) => {
if (addedIntents.length === 0 && filledIntents.length === 0 && enqueuedIntents.length === 0) {
// Save latest checkpoint
logger.debug('No new intents found', requestContext, methodContext);
return;
return new Set();
}

// Save intents to the database
Expand Down Expand Up @@ -249,6 +254,8 @@ export const updateHubIntents = async (context: AppContext) => {
const latest = getMaxTxNonce(enqueuedIntents.map((i) => ({ txNonce: i.settlementEnqueuedTxNonce! })));
await database.saveCheckPoint('hub_intent_enqueued_' + config.hub.domain, latest);
}
if (enqueuedIntents.length > 0) return new Set([LIGHTHOUSE_QUEUES.SETTLEMENT]);
return new Set();
};

export const updateSettlementIntents = async (context: AppContext) => {
Expand Down
16 changes: 9 additions & 7 deletions packages/agents/cartographer/core/src/operations/invoices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import {
HubDeposit,
HubInvoice,
TIntentStatus,
LIGHTHOUSE_QUEUES,
createLoggingContext,
getMaxTxNonce,
jsonifyError,
} from '@chimera-monorepo/utils';

import { AppContext } from '../context';

export const updateHubInvoices = async (context: AppContext) => {
export const updateHubInvoices = async (context: AppContext): Promise<Set<string>> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -30,7 +31,7 @@ export const updateHubInvoices = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return new Set();
}

// Get the latest checkpoint for the hub domain
Expand All @@ -56,7 +57,7 @@ export const updateHubInvoices = async (context: AppContext) => {
// Exit early if no new invoices are found
if (hubInvoices.length === 0) {
logger.info('No new hub invoices found', requestContext, methodContext);
return;
return new Set();
}

// Deduplicate enqueued invoices
Expand All @@ -78,13 +79,13 @@ export const updateHubInvoices = async (context: AppContext) => {
// Save latest checkpoint
const latest = getMaxTxNonce(hubInvoices.map((i) => ({ txNonce: i.enqueuedTxNonce! })));
await database.saveCheckPoint('hub_invoice_' + config.hub.domain, latest);
return new Set([LIGHTHOUSE_QUEUES.INVOICE]);
};

/**
* @notice Updates processed and enqueued deposits from the hub subgraph.
* @returns Promise<void>
*/
export const updateHubDeposits = async (context: AppContext) => {
export const updateHubDeposits = async (context: AppContext): Promise<Set<string>> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -105,7 +106,7 @@ export const updateHubDeposits = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return new Set();
}

// Get the latest checkpoint for the hub domain
Expand Down Expand Up @@ -134,7 +135,7 @@ export const updateHubDeposits = async (context: AppContext) => {
// Exit early if no new deposits are found
if (enqueuedDeposits.length === 0 && processedDeposits.length === 0) {
logger.info('No new hub deposits found', requestContext, methodContext);
return;
return new Set();
}

// Only save latest entry (processed or enqueued) for each deposit
Expand All @@ -161,4 +162,5 @@ export const updateHubDeposits = async (context: AppContext) => {
const latestProcessed = getMaxTxNonce(processedDeposits.map((i) => ({ txNonce: i.processedTxNonce ?? 0 })));
await database.saveCheckPoint('hub_deposit_enqueued_' + config.hub.domain, latestEnqueued);
await database.saveCheckPoint('hub_deposit_processed_' + config.hub.domain, latestProcessed);
return new Set([LIGHTHOUSE_QUEUES.INVOICE]);
};
39 changes: 33 additions & 6 deletions packages/agents/cartographer/handler/src/maintenance/backfill.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createLoggingContext, jsonifyError, sendHeartbeat } from '@chimera-monorepo/utils';
import { AppContext } from '@chimera-monorepo/cartographer-core';
import { notifyLighthouse } from '../notify';
import {
AppContext,
updateHubInvoices,
updateHubDeposits,
updateOriginIntents,
Expand All @@ -15,27 +16,36 @@ import {
updateMessageStatus,
updateAssets,
updateDepositors,
} from '@chimera-monorepo/cartographer-core';
} from '../mockable';

/**
* Run all backfill operations using core operations with the handler's context.
* This catches any events missed by webhooks and also runs periodic-only operations
* like updateMessageStatus (which checks on-chain Hyperlane delivery status).
*
* When an operation pulls new data from subgraphs, the corresponding lighthouse
* queue is notified so the lighthouse can process it.
*/
export const runBackfill = async (context: AppContext): Promise<void> => {
const { logger } = context;
const { requestContext, methodContext } = createLoggingContext('runBackfill');

logger.debug('Starting backfill maintenance cycle', requestContext, methodContext);

const operations = [
{ name: 'updateOrders', fn: updateOrders },
// Operations that notify lighthouse when new data is found.
// Each returns a Set<string> of lighthouse queue names to notify.
const notifyOperations: { name: string; fn: (ctx: AppContext) => Promise<Set<string>> }[] = [
{ name: 'updateOriginIntents', fn: updateOriginIntents },
{ name: 'updateDestinationIntents', fn: updateDestinationIntents },
{ name: 'updateHubIntents', fn: updateHubIntents },
{ name: 'updateSettlementIntents', fn: updateSettlementIntents },
{ name: 'updateHubInvoices', fn: updateHubInvoices },
{ name: 'updateHubDeposits', fn: updateHubDeposits },
];

// Operations that don't notify lighthouse
const plainOperations = [
{ name: 'updateSettlementIntents', fn: updateSettlementIntents },
{ name: 'updateOrders', fn: updateOrders },
{ name: 'updateAssets', fn: updateAssets },
{ name: 'updateDepositors', fn: updateDepositors },
{ name: 'updateMessages', fn: updateMessages },
Expand All @@ -45,14 +55,31 @@ export const runBackfill = async (context: AppContext): Promise<void> => {
{ name: 'updateMessageStatus', fn: updateMessageStatus },
];

for (const op of operations) {
// Collect which queues need notification (deduplicated)
const queuesToNotify = new Set<string>();

for (const op of notifyOperations) {
try {
const queues = await op.fn(context);
for (const q of queues) queuesToNotify.add(q);
} catch (error) {
logger.error(`Backfill error in ${op.name}`, requestContext, methodContext, jsonifyError(error as Error));
}
}

for (const op of plainOperations) {
try {
await op.fn(context);
} catch (error) {
logger.error(`Backfill error in ${op.name}`, requestContext, methodContext, jsonifyError(error as Error));
}
}

// Notify lighthouse for all queues that had new data
for (const queueName of queuesToNotify) {
await notifyLighthouse(queueName);
}

// Full materialized view refresh (non-debounced)
try {
await context.adapters.database.refreshIntentsView();
Expand Down
31 changes: 31 additions & 0 deletions packages/agents/cartographer/handler/src/mockable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {
updateHubInvoices as _updateHubInvoices,
updateHubDeposits as _updateHubDeposits,
updateOriginIntents as _updateOriginIntents,
updateDestinationIntents as _updateDestinationIntents,
updateHubIntents as _updateHubIntents,
updateSettlementIntents as _updateSettlementIntents,
updateOrders as _updateOrders,
updateMessages as _updateMessages,
updateQueues as _updateQueues,
updateHubSpokeMeta as _updateHubSpokeMeta,
updateProtocolUpdateLogs as _updateProtocolUpdateLogs,
updateMessageStatus as _updateMessageStatus,
updateAssets as _updateAssets,
updateDepositors as _updateDepositors,
} from '@chimera-monorepo/cartographer-core';

export const updateHubInvoices = _updateHubInvoices;
export const updateHubDeposits = _updateHubDeposits;
export const updateOriginIntents = _updateOriginIntents;
export const updateDestinationIntents = _updateDestinationIntents;
export const updateHubIntents = _updateHubIntents;
export const updateSettlementIntents = _updateSettlementIntents;
export const updateOrders = _updateOrders;
export const updateMessages = _updateMessages;
export const updateQueues = _updateQueues;
export const updateHubSpokeMeta = _updateHubSpokeMeta;
export const updateProtocolUpdateLogs = _updateProtocolUpdateLogs;
export const updateMessageStatus = _updateMessageStatus;
export const updateAssets = _updateAssets;
export const updateDepositors = _updateDepositors;
4 changes: 2 additions & 2 deletions packages/agents/cartographer/handler/src/notify.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createProducer, pingRedis, LIGHTHOUSE_QUEUES, Queue } from '@chimera-monorepo/mqclient';
import { jsonifyError, Logger } from '@chimera-monorepo/utils';
import { createProducer, pingRedis, Queue } from '@chimera-monorepo/mqclient';
import { jsonifyError, Logger, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils';

let queues: Map<string, Queue> | null = null;
let notifyLogger: Logger | undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from '../webhooks/parsers';
import { base64ToHex } from '../webhooks/webhookHandler';
import { notifyLighthouse } from '../notify';
import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient';
import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils';

/**
* Safely extract an intent ID from the webhook payload.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { HubIntent, TIntentStatus } from '@chimera-monorepo/utils';
import { HubIntent, TIntentStatus, LIGHTHOUSE_QUEUES } from '@chimera-monorepo/utils';
import { AppContext } from '@chimera-monorepo/cartographer-core';
import { parseHubInvoice, parseHubDeposit } from '../webhooks/parsers';
import { base64ToHex } from '../webhooks/webhookHandler';
import { notifyLighthouse } from '../notify';
import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient';

/**
* Create a minimal HubIntent for status-only upserts.
Expand Down
Loading
Loading