Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 42 additions & 35 deletions packages/agents/cartographer/core/src/operations/intents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { DEFAULT_SAFE_CONFIRMATIONS } from '../config';
import { getSubgraphSupportedDomains } from './helper';
import { computeIsSwap } from '../lib/intentHelpers';

export const updateOriginIntents = async (context: AppContext) => {
export const updateOriginIntents = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand Down Expand Up @@ -48,12 +48,13 @@ export const updateOriginIntents = async (context: AppContext) => {

if (queryMetaParams.size === 0) {
logger.debug('No domains to update', requestContext, methodContext, { domains });
return;
return 0;
}

// Get origin intents for all domains in the mapping.
const intents = await subgraph.getOriginIntentsByNonce(queryMetaParams);
logger.info('Retrieved origin intents', requestContext, methodContext, { intents: intents.length });
if (intents.length === 0) return 0;

// Compute is_swap for each intent by comparing ticker hashes
const intentsWithSwapFlag = intents.map((intent) => {
Expand Down Expand Up @@ -88,9 +89,10 @@ export const updateOriginIntents = async (context: AppContext) => {
}
// Log the successful update
logger.debug('Updated OriginIntents in database', requestContext, methodContext, { intents });
return intentsWithSwapFlag.length;
};

export const updateDestinationIntents = async (context: AppContext) => {
export const updateDestinationIntents = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand Down Expand Up @@ -129,37 +131,40 @@ export const updateDestinationIntents = async (context: AppContext) => {
}),
);

if (queryMetaParams.size > 0) {
// Get destination intents for all domains in the mapping.
const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams);
intents.forEach((intent) => {
const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext(
updateDestinationIntents.name,
);
logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent });
});
const checkpoints = domains
.map((domain) => {
const domainIntents = intents.filter((intent) => intent.destination === domain);
const max = getMaxTxNonce(domainIntents);
const latest = queryMetaParams.get(domain)?.latestNonce ?? 0;
if (domainIntents.length > 0 && max > latest) {
return { domain, checkpoint: max };
}
return undefined;
})
.filter((x) => !!x) as { domain: string; checkpoint: number }[];

await database.saveDestinationIntents(intents);
for (const checkpoint of checkpoints) {
await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint);
}
// Log the successful update
logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents });
if (queryMetaParams.size === 0) return 0;

// Get destination intents for all domains in the mapping.
const intents = await subgraph.getDestinationIntentsByNonce(queryMetaParams);
if (intents.length === 0) return 0;

intents.forEach((intent) => {
const { requestContext: _requestContext, methodContext: _methodContext } = createLoggingContext(
updateDestinationIntents.name,
);
logger.debug('Retrieved destination intent', _requestContext, _methodContext, { intent });
});
const checkpoints = domains
.map((domain) => {
const domainIntents = intents.filter((intent) => intent.destination === domain);
const max = getMaxTxNonce(domainIntents);
const latest = queryMetaParams.get(domain)?.latestNonce ?? 0;
if (domainIntents.length > 0 && max > latest) {
return { domain, checkpoint: max };
}
return undefined;
})
.filter((x) => !!x) as { domain: string; checkpoint: number }[];

await database.saveDestinationIntents(intents);
for (const checkpoint of checkpoints) {
await database.saveCheckPoint('destination_intent_' + checkpoint.domain, checkpoint.checkpoint);
}
// Log the successful update
logger.debug('Updated DestinationIntents in database', requestContext, methodContext, { intents });
return intents.length;
};

export const updateHubIntents = async (context: AppContext) => {
export const updateHubIntents = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -180,7 +185,7 @@ export const updateHubIntents = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return 0;
}

// Get the latest checkpoint for the hub domain
Expand Down Expand Up @@ -216,7 +221,7 @@ export const updateHubIntents = async (context: AppContext) => {
if (addedIntents.length === 0 && filledIntents.length === 0 && enqueuedIntents.length === 0) {
// Save latest checkpoint
logger.debug('No new intents found', requestContext, methodContext);
return;
return 0;
}

// Save intents to the database
Expand Down Expand Up @@ -249,9 +254,10 @@ export const updateHubIntents = async (context: AppContext) => {
const latest = getMaxTxNonce(enqueuedIntents.map((i) => ({ txNonce: i.settlementEnqueuedTxNonce! })));
await database.saveCheckPoint('hub_intent_enqueued_' + config.hub.domain, latest);
}
return addedIntents.length + filledIntents.length + enqueuedIntents.length;
};

export const updateSettlementIntents = async (context: AppContext) => {
export const updateSettlementIntents = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand Down Expand Up @@ -293,7 +299,7 @@ export const updateSettlementIntents = async (context: AppContext) => {

if (queryMetaParams.size === 0) {
logger.debug('No domains to update', requestContext, methodContext, { domains });
return;
return 0;
}

// Get settlement intents for all domains in the mapping.
Expand Down Expand Up @@ -323,6 +329,7 @@ export const updateSettlementIntents = async (context: AppContext) => {
}
// Log the successful update
logger.debug('Updated SettlementIntents in database', requestContext, methodContext, { intents });
return intents.length;
};

export const updateOrders = async (context: AppContext) => {
Expand Down
16 changes: 9 additions & 7 deletions packages/agents/cartographer/core/src/operations/invoices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {

import { AppContext } from '../context';

export const updateHubInvoices = async (context: AppContext) => {
export const updateHubInvoices = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -30,7 +30,7 @@ export const updateHubInvoices = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return 0;
}

// Get the latest checkpoint for the hub domain
Expand All @@ -56,7 +56,7 @@ export const updateHubInvoices = async (context: AppContext) => {
// Exit early if no new invoices are found
if (hubInvoices.length === 0) {
logger.info('No new hub invoices found', requestContext, methodContext);
return;
return 0;
}

// Deduplicate enqueued invoices
Expand All @@ -78,13 +78,14 @@ export const updateHubInvoices = async (context: AppContext) => {
// Save latest checkpoint
const latest = getMaxTxNonce(hubInvoices.map((i) => ({ txNonce: i.enqueuedTxNonce! })));
await database.saveCheckPoint('hub_invoice_' + config.hub.domain, latest);
return deduplicatedInvoices.length;
};

/**
* @notice Updates processed and enqueued deposits from the hub subgraph.
* @returns Promise<void>
* @returns Count of new deposits saved.
*/
export const updateHubDeposits = async (context: AppContext) => {
export const updateHubDeposits = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
config,
Expand All @@ -105,7 +106,7 @@ export const updateHubDeposits = async (context: AppContext) => {
latestBlockMap: Object.fromEntries(latestBlockMap.entries()),
},
);
return;
return 0;
}

// Get the latest checkpoint for the hub domain
Expand Down Expand Up @@ -134,7 +135,7 @@ export const updateHubDeposits = async (context: AppContext) => {
// Exit early if no new deposits are found
if (enqueuedDeposits.length === 0 && processedDeposits.length === 0) {
logger.info('No new hub deposits found', requestContext, methodContext);
return;
return 0;
}

// Only save latest entry (processed or enqueued) for each deposit
Expand All @@ -161,4 +162,5 @@ export const updateHubDeposits = async (context: AppContext) => {
const latestProcessed = getMaxTxNonce(processedDeposits.map((i) => ({ txNonce: i.processedTxNonce ?? 0 })));
await database.saveCheckPoint('hub_deposit_enqueued_' + config.hub.domain, latestEnqueued);
await database.saveCheckPoint('hub_deposit_processed_' + config.hub.domain, latestProcessed);
return hubDeposits.length;
};
5 changes: 4 additions & 1 deletion packages/agents/cartographer/core/src/operations/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const getMessageStatus = async (
return status;
};

export const updateMessages = async (context: AppContext) => {
export const updateMessages = async (context: AppContext): Promise<number> => {
const {
adapters: { subgraph, database },
logger,
Expand All @@ -101,6 +101,7 @@ export const updateMessages = async (context: AppContext) => {
const evmDomains = Object.keys(config.chains)
.filter((d) => config.chains[d].network === 'evm')
.concat(config.hub.domain);
let totalMessages = 0;
for (const domain of evmDomains) {
// Retrieve the most recent timestamp
const latestNonce = await database.getCheckPoint('message_' + domain);
Expand Down Expand Up @@ -184,12 +185,14 @@ export const updateMessages = async (context: AppContext) => {

// If there are any new messages, update the checkpoint with the timestamp of the latest message
if (messages.length > 0) {
totalMessages += messages.length;
const maxNonce = getMaxTxNonce(messages);
await database.saveCheckPoint('message_' + domain, maxNonce);
}

logger.debug('Saved messages', requestContext, methodContext, { messages });
}
return totalMessages;
};

export const updateQueues = async (context: AppContext) => {
Expand Down
55 changes: 44 additions & 11 deletions packages/agents/cartographer/handler/src/maintenance/backfill.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { createLoggingContext, jsonifyError, sendHeartbeat } from '@chimera-monorepo/utils';
import { AppContext } from '@chimera-monorepo/cartographer-core';
import { LIGHTHOUSE_QUEUES } from '@chimera-monorepo/mqclient';
import { notifyLighthouse } from '../notify';
import {
AppContext,
updateHubInvoices,
updateHubDeposits,
updateOriginIntents,
Expand All @@ -15,44 +17,75 @@ import {
updateMessageStatus,
updateAssets,
updateDepositors,
} from '@chimera-monorepo/cartographer-core';
} from '../mockable';

/**
* Run all backfill operations using core operations with the handler's context.
* This catches any events missed by webhooks and also runs periodic-only operations
* like updateMessageStatus (which checks on-chain Hyperlane delivery status).
*
* When an operation pulls new data from subgraphs, the corresponding lighthouse
* queue is notified so the lighthouse can process it.
*/
export const runBackfill = async (context: AppContext): Promise<void> => {
const { logger } = context;
const { requestContext, methodContext } = createLoggingContext('runBackfill');

logger.debug('Starting backfill maintenance cycle', requestContext, methodContext);

const operations = [
// Operations that don't notify lighthouse (no return value needed)
const plainOperations = [
{ name: 'updateOrders', fn: updateOrders },
{ name: 'updateOriginIntents', fn: updateOriginIntents },
{ name: 'updateDestinationIntents', fn: updateDestinationIntents },
{ name: 'updateHubIntents', fn: updateHubIntents },
{ name: 'updateSettlementIntents', fn: updateSettlementIntents },
{ name: 'updateHubInvoices', fn: updateHubInvoices },
{ name: 'updateHubDeposits', fn: updateHubDeposits },
{ name: 'updateAssets', fn: updateAssets },
{ name: 'updateDepositors', fn: updateDepositors },
{ name: 'updateMessages', fn: updateMessages },
{ name: 'updateQueues', fn: updateQueues },
{ name: 'updateHubSpokeMeta', fn: updateHubSpokeMeta },
{ name: 'updateProtocolUpdateLogs', fn: updateProtocolUpdateLogs },
{ name: 'updateMessageStatus', fn: updateMessageStatus },
];

for (const op of operations) {
// Operations that notify lighthouse when new data is found
const notifyOperations: { name: string; fn: (ctx: AppContext) => Promise<number>; queues: string[] }[] = [
{ name: 'updateOriginIntents', fn: updateOriginIntents, queues: [LIGHTHOUSE_QUEUES.INTENT] },
{ name: 'updateDestinationIntents', fn: updateDestinationIntents, queues: [LIGHTHOUSE_QUEUES.FILL] },
{ name: 'updateHubIntents', fn: updateHubIntents, queues: [LIGHTHOUSE_QUEUES.INTENT, LIGHTHOUSE_QUEUES.FILL] },
{ name: 'updateSettlementIntents', fn: updateSettlementIntents, queues: [LIGHTHOUSE_QUEUES.SETTLEMENT] },
{ name: 'updateHubInvoices', fn: updateHubInvoices, queues: [LIGHTHOUSE_QUEUES.INVOICE] },
{ name: 'updateHubDeposits', fn: updateHubDeposits, queues: [LIGHTHOUSE_QUEUES.INVOICE] },
{
name: 'updateMessages',
fn: updateMessages,
queues: [LIGHTHOUSE_QUEUES.SETTLEMENT, LIGHTHOUSE_QUEUES.FILL, LIGHTHOUSE_QUEUES.SOLANA],
},
Comment on lines +55 to +59
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateMessages is treated as a single boolean signal, but here any non-zero count triggers SETTLEMENT + FILL + SOLANA notifications. Since updateMessages can return >0 for spoke Intent/Fill messages (and non-Solana hub messages), this can enqueue unnecessary settlement/solana work. Consider returning more granular info from updateMessages (e.g., counts per message type / per queue or a Set of queues to notify), or splitting hub vs spoke message updates so backfill only notifies the queues that actually had new relevant data (similar to processHubMessage / processSpokeMessage notification behavior).

Copilot uses AI. Check for mistakes.
];

// Collect which queues need notification (deduplicated)
const queuesToNotify = new Set<string>();

for (const op of notifyOperations) {
try {
const count = await op.fn(context);
if (count > 0) {
for (const q of op.queues) queuesToNotify.add(q);
}
} catch (error) {
logger.error(`Backfill error in ${op.name}`, requestContext, methodContext, jsonifyError(error as Error));
}
}

for (const op of plainOperations) {
try {
await op.fn(context);
} catch (error) {
logger.error(`Backfill error in ${op.name}`, requestContext, methodContext, jsonifyError(error as Error));
}
}

// Notify lighthouse for all queues that had new data
for (const queueName of queuesToNotify) {
await notifyLighthouse(queueName);
}

// Full materialized view refresh (non-debounced)
try {
await context.adapters.database.refreshIntentsView();
Expand Down
31 changes: 31 additions & 0 deletions packages/agents/cartographer/handler/src/mockable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {
updateHubInvoices as _updateHubInvoices,
updateHubDeposits as _updateHubDeposits,
updateOriginIntents as _updateOriginIntents,
updateDestinationIntents as _updateDestinationIntents,
updateHubIntents as _updateHubIntents,
updateSettlementIntents as _updateSettlementIntents,
updateOrders as _updateOrders,
updateMessages as _updateMessages,
updateQueues as _updateQueues,
updateHubSpokeMeta as _updateHubSpokeMeta,
updateProtocolUpdateLogs as _updateProtocolUpdateLogs,
updateMessageStatus as _updateMessageStatus,
updateAssets as _updateAssets,
updateDepositors as _updateDepositors,
} from '@chimera-monorepo/cartographer-core';

export const updateHubInvoices = _updateHubInvoices;
export const updateHubDeposits = _updateHubDeposits;
export const updateOriginIntents = _updateOriginIntents;
export const updateDestinationIntents = _updateDestinationIntents;
export const updateHubIntents = _updateHubIntents;
export const updateSettlementIntents = _updateSettlementIntents;
export const updateOrders = _updateOrders;
export const updateMessages = _updateMessages;
export const updateQueues = _updateQueues;
export const updateHubSpokeMeta = _updateHubSpokeMeta;
export const updateProtocolUpdateLogs = _updateProtocolUpdateLogs;
export const updateMessageStatus = _updateMessageStatus;
export const updateAssets = _updateAssets;
export const updateDepositors = _updateDepositors;
Loading
Loading