From 9a42c71e59ee9faf07d2b166c7ff44f62bb986ff Mon Sep 17 00:00:00 2001 From: liu-zhipeng <57480598+liu-zhipeng@users.noreply.github.com> Date: Fri, 27 Mar 2026 16:40:45 +0800 Subject: [PATCH 1/3] feat: integrate unified inventory service into Mark Key changes: - Create reservation before earmark in on-demand rebalancing - Query inventory balance in getEarmarkedBalance() with DB fallback - Acquire nonce from inventory service before tx submission - Report nonce confirm/fail after tx result - Register pending inbound after successful bridge operations - Create MARK_PURCHASE reservations before intent submission - Create REBALANCE_THRESHOLD reservations before regular rebalancing - All inventory calls are non-blocking with graceful fallback - Enabled via config.unifiedInventoryEnabled (uses everclearApiUrl) --- package.json | 1 + packages/adapters/inventory/jest.config.js | 18 ++ packages/adapters/inventory/package.json | 40 +++ packages/adapters/inventory/src/client.ts | 259 ++++++++++++++++++ packages/adapters/inventory/src/index.ts | 11 + packages/adapters/inventory/src/types.ts | 117 ++++++++ packages/adapters/inventory/tsconfig.json | 16 ++ packages/agent/package.json | 1 + packages/agent/src/adapters.ts | 13 + packages/agent/tsconfig.json | 1 + packages/core/src/types/config.ts | 1 + packages/handler/package.json | 1 + packages/handler/src/helpers/intent.ts | 41 +++ packages/handler/tsconfig.json | 1 + packages/poller/package.json | 1 + packages/poller/src/helpers/intent.ts | 3 + packages/poller/src/helpers/transactions.ts | 63 ++++- packages/poller/src/init.ts | 2 + .../poller/src/invoice/processInvoices.ts | 50 ++++ .../poller/src/rebalance/bridgeExecution.ts | 5 +- packages/poller/src/rebalance/onDemand.ts | 232 +++++++++++++++- packages/poller/src/rebalance/rebalance.ts | 44 ++- packages/poller/tsconfig.json | 1 + yarn.lock | 21 ++ 24 files changed, 933 insertions(+), 10 deletions(-) create mode 100644 packages/adapters/inventory/jest.config.js create mode 100644 packages/adapters/inventory/package.json create mode 100644 packages/adapters/inventory/src/client.ts create mode 100644 packages/adapters/inventory/src/index.ts create mode 100644 packages/adapters/inventory/src/types.ts create mode 100644 packages/adapters/inventory/tsconfig.json diff --git a/package.json b/package.json index 6c6e81dc..e41f3d34 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "packages/adapters/chainservice", "packages/adapters/database", "packages/adapters/everclear", + "packages/adapters/inventory", "packages/adapters/web3signer", "packages/adapters/prometheus", "packages/adapters/rebalance", diff --git a/packages/adapters/inventory/jest.config.js b/packages/adapters/inventory/jest.config.js new file mode 100644 index 00000000..d0b65f6b --- /dev/null +++ b/packages/adapters/inventory/jest.config.js @@ -0,0 +1,18 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + setupFilesAfterEnv: ['/../../../jest.setup.shared.js'], + testMatch: ['**/test/**/*.spec.ts'], + transform: { + '^.+\\.ts$': [ + 'ts-jest', + { + tsconfig: '/tsconfig.json', + }, + ], + }, + moduleNameMapper: { + '^@mark/core$': '/../../core/src', + '^@mark/(.*)$': '/../$1/src', + }, +}; diff --git a/packages/adapters/inventory/package.json b/packages/adapters/inventory/package.json new file mode 100644 index 00000000..551d566c --- /dev/null +++ b/packages/adapters/inventory/package.json @@ -0,0 +1,40 @@ +{ + "name": "@mark/inventory", + "version": "0.0.1", + "private": true, + "description": "Unified inventory service client for Mark.", + "author": "Everclear", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "files": [ + "dist/**/*", + "src/**/*" + ], + "scripts": { + "build": "tsc --build ./tsconfig.json", + "clean": "rimraf ./dist ./tsconfig.tsbuildinfo", + "dev": "yarn dev", + "lint": "yarn lint:package && yarn lint:ts", + "lint:fix": "yarn lint --fix", + "lint:package": "sort-package-json", + "lint:ts": "eslint ./src", + "purge": "yarn clean && rimraf ./coverage ./node_modules", + "test": "yarn test:unit", + "test:unit": "jest" + }, + "dependencies": { + "@mark/core": "workspace:*", + "@mark/logger": "workspace:*", + "axios": "1.9.0" + }, + "devDependencies": { + "@types/jest": "29.5.0", + "@types/node": "20.17.12", + "eslint": "9.17.0", + "jest": "29.5.0", + "rimraf": "6.0.1", + "sort-package-json": "2.12.0", + "ts-jest": "29.1.0", + "typescript": "5.7.2" + } +} diff --git a/packages/adapters/inventory/src/client.ts b/packages/adapters/inventory/src/client.ts new file mode 100644 index 00000000..99a811bc --- /dev/null +++ b/packages/adapters/inventory/src/client.ts @@ -0,0 +1,259 @@ +import { jsonifyError, Logger } from '@mark/logger'; +import { axiosGet, axiosPost } from '@mark/core'; +import axios from 'axios'; +import { + Reservation, + CreateReservationParams, + ReservationStatus, + NonceAssignment, + InventoryBalance, + PendingInbound, + RegisterInboundParams, +} from './types'; + +/** + * HTTP client for the unified inventory service (connext-api). + * + * The inventory API is served from the same base URL as the everclear API. + * Uses the same axios helpers (axiosGet/axiosPost) as EverclearAdapter for + * connection pooling, retries, and consistent error handling. + * + * All methods are non-throwing: failures are logged and return undefined/void, + * so Mark degrades gracefully when the inventory service is unavailable. + */ +export class InventoryServiceClient { + private readonly apiUrl: string; + private readonly logger: Logger; + + constructor(apiUrl: string, logger: Logger) { + this.apiUrl = apiUrl.replace(/\/$/, ''); + this.logger = logger; + } + + /** + * Logs API errors with full context, matching EverclearAdapter.logApiError pattern. + */ + private logApiError(message: string, url: string, params: unknown, err: unknown): void { + const errorContext = (err as { context?: Record }).context; + this.logger.warn(message, { + url, + errorMessage: (err as Error).message, + apiResponseStatus: errorContext?.status, + apiResponseBody: errorContext?.data, + requestParams: params, + error: jsonifyError(err), + }); + } + + // ── Reservation Management ────────────────────────────────────────── + + /** + * Create a reservation for an operation (rebalance, purchase, etc.). + * POST /inventory/reserve + */ + async createReservation(params: CreateReservationParams): Promise { + const url = `${this.apiUrl}/inventory/reserve`; + try { + const { data } = await axiosPost(url, params, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to create reservation (non-blocking)', url, params, err); + return undefined; + } + } + + /** + * Update the status of an existing reservation. + * PUT /inventory/reserve/{reservationId}/status + */ + async updateReservationStatus( + reservationId: string, + status: ReservationStatus, + metadata?: Record, + ): Promise { + const url = `${this.apiUrl}/inventory/reserve/${reservationId}/status`; + const body = { status, metadata }; + try { + const { data } = await axios.put(url, body); + return data; + } catch (err) { + this.logApiError('Failed to update reservation status (non-blocking)', url, body, err); + return undefined; + } + } + + /** + * Delete (release) a reservation. + * DELETE /inventory/reserve/{reservationId} + */ + async deleteReservation(reservationId: string): Promise { + const url = `${this.apiUrl}/inventory/reserve/${reservationId}`; + try { + const { data } = await axios.delete<{ success: boolean }>(url); + return data?.success ?? false; + } catch (err) { + this.logApiError('Failed to delete reservation (non-blocking)', url, { reservationId }, err); + return false; + } + } + + /** + * Get full inventory balance for a (chain, asset) pair. + * GET /inventory/balance/{chainId}/{asset} + * + * Returns on-chain balance, available balance, reserved amounts by type, + * pending inbound, and pending intents. + */ + async getInventoryBalance(chainId: string, asset: string): Promise { + const url = `${this.apiUrl}/inventory/balance/${chainId}/${encodeURIComponent(asset)}`; + try { + const { data } = await axiosGet(url, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to get inventory balance (non-blocking)', url, { chainId, asset }, err); + return undefined; + } + } + + /** + * Get all reservations for an operation. + * GET /inventory/operations/{operationId} + */ + async getReservationsByOperation(operationId: string): Promise { + const url = `${this.apiUrl}/inventory/operations/${encodeURIComponent(operationId)}`; + try { + const { data } = await axiosGet<{ reservations: Reservation[] }>(url, undefined, 1, 0); + return data?.reservations ?? []; + } catch (err) { + this.logApiError('Failed to get reservations by operation (non-blocking)', url, { operationId }, err); + return []; + } + } + + // ── Nonce Management ──────────────────────────────────────────────── + + /** + * Assign the next sequential nonce for a (chainId, wallet) pair. + * POST /inventory/nonce/assign + */ + async assignNonce(chainId: string, wallet: string, operationId?: string): Promise { + const url = `${this.apiUrl}/inventory/nonce/assign`; + const body = { chainId, wallet, operationId }; + try { + const { data } = await axiosPost(url, body, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to assign nonce from inventory service (non-blocking)', url, body, err); + return undefined; + } + } + + /** + * Confirm that a previously assigned nonce was successfully included on-chain. + * POST /inventory/nonce/confirm + */ + async confirmNonce(chainId: string, wallet: string, nonce: number, txHash?: string): Promise { + const url = `${this.apiUrl}/inventory/nonce/confirm`; + const body = { chainId, wallet, nonce, txHash }; + try { + await axiosPost(url, body, undefined, 1, 0); + } catch (err) { + this.logApiError('Failed to confirm nonce (non-blocking)', url, body, err); + } + } + + /** + * Report that a previously assigned nonce failed (transaction not mined / reverted). + * POST /inventory/nonce/fail + */ + async failNonce(chainId: string, wallet: string, nonce: number): Promise { + const url = `${this.apiUrl}/inventory/nonce/fail`; + const body = { chainId, wallet, nonce }; + try { + await axiosPost(url, body, undefined, 1, 0); + } catch (err) { + this.logApiError('Failed to report nonce failure (non-blocking)', url, body, err); + } + } + + // ── Pending Inbound Tracking ──────────────────────────────────────── + + /** + * Register expected inbound funds from a cross-chain rebalance. + * POST /inventory/inbound + */ + async registerInbound(params: RegisterInboundParams): Promise { + const url = `${this.apiUrl}/inventory/inbound`; + try { + const { data } = await axiosPost(url, params, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to register pending inbound (non-blocking)', url, params, err); + return undefined; + } + } + + /** + * Confirm that pending inbound funds have arrived. + * POST /inventory/inbound/{inboundId}/confirm + */ + async confirmInbound(inboundId: string, txHash?: string): Promise { + const url = `${this.apiUrl}/inventory/inbound/${inboundId}/confirm`; + const body = { txHash }; + try { + const { data } = await axiosPost(url, body, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to confirm inbound (non-blocking)', url, body, err); + return undefined; + } + } + + /** + * Cancel a pending inbound that won't arrive. + * POST /inventory/inbound/{inboundId}/cancel + */ + async cancelInbound(inboundId: string, reason?: string): Promise { + const url = `${this.apiUrl}/inventory/inbound/${inboundId}/cancel`; + const body = { reason }; + try { + const { data } = await axiosPost(url, body, undefined, 1, 0); + return data; + } catch (err) { + this.logApiError('Failed to cancel inbound (non-blocking)', url, body, err); + return undefined; + } + } + + // ── Transaction Result Reporting (convenience wrappers) ───────────── + + /** + * Report a successful transaction result for a reservation. + */ + async reportTransactionSuccess( + reservationId: string, + txHash: string, + chainId: string, + metadata?: Record, + ): Promise { + await this.updateReservationStatus(reservationId, 'COMPLETED', { + txHash, + chainId, + ...metadata, + }); + } + + /** + * Report a failed transaction result for a reservation. + */ + async reportTransactionFailure( + reservationId: string, + reason: string, + metadata?: Record, + ): Promise { + await this.updateReservationStatus(reservationId, 'FAILED', { + failureReason: reason, + ...metadata, + }); + } +} diff --git a/packages/adapters/inventory/src/index.ts b/packages/adapters/inventory/src/index.ts new file mode 100644 index 00000000..e0dbdb18 --- /dev/null +++ b/packages/adapters/inventory/src/index.ts @@ -0,0 +1,11 @@ +export { InventoryServiceClient } from './client'; +export type { + OperationType, + ReservationStatus, + Reservation, + CreateReservationParams, + NonceAssignment, + InventoryBalance, + PendingInbound, + RegisterInboundParams, +} from './types'; diff --git a/packages/adapters/inventory/src/types.ts b/packages/adapters/inventory/src/types.ts new file mode 100644 index 00000000..35946e06 --- /dev/null +++ b/packages/adapters/inventory/src/types.ts @@ -0,0 +1,117 @@ +/** + * Operation types supported by the unified inventory service. + */ +export type OperationType = + | 'FAST_PATH_FILL' + | 'REBALANCE_LEG2' + | 'REBALANCE_ONDEMAND' + | 'MARK_PURCHASE' + | 'REBALANCE_THRESHOLD'; + +/** + * Reservation lifecycle statuses. + */ +export type ReservationStatus = + | 'PENDING' + | 'ACTIVE' + | 'EXECUTING' + | 'COMPLETED' + | 'FAILED' + | 'EXPIRED' + | 'PREEMPTED'; + +/** + * A reservation returned by the inventory service. + */ +export interface Reservation { + id: string; + chainId: string; + asset: string; + amount: string; + operationType: OperationType; + operationId: string; + priority: number; + status: ReservationStatus; + ttlSeconds: number; + expiresAt: number; + requestedBy: string; + parentReservationId?: string; + createdAt: number; + updatedAt: number; + metadata?: Record; +} + +/** + * Parameters for creating a new reservation. + */ +export interface CreateReservationParams { + chainId: string; + asset: string; + amount: string; + operationType: OperationType; + operationId: string; + requestedBy: string; + ttlSeconds?: number; + metadata?: Record; +} + +/** + * A nonce assignment returned by the inventory service. + */ +export interface NonceAssignment { + nonce: number; + nonceId: string; + chainId: string; + wallet: string; + assignedAt: number; +} + +/** + * Inventory balance response from GET /inventory/balance/{chainId}/{asset}. + * This is the full balance view including on-chain balance, reservations, and pending state. + */ +export interface InventoryBalance { + chainId: string; + asset: string; + totalBalance: string; + availableBalance: string; + reservedByType: Partial>; + pendingInbound: string; + pendingIntents: string; + reservationCount: number; + timestamp: number; +} + +/** + * Pending inbound entry for cross-chain rebalance tracking. + */ +export interface PendingInbound { + id: string; + chainId: string; + asset: string; + amount: string; + sourceChain: string; + operationType: string; // API accepts any string (not restricted to OperationType enum) + operationId: string; + status: 'PENDING' | 'CONFIRMED' | 'EXPIRED' | 'CANCELLED'; + expectedArrivalAt?: number; + txHash?: string; + createdAt: number; + updatedAt: number; + metadata?: Record; +} + +/** + * Parameters for registering a pending inbound. + */ +export interface RegisterInboundParams { + chainId: string; + asset: string; + amount: string; + sourceChain: string; + operationType: string; // API accepts any string (not restricted to OperationType enum) + operationId: string; + expectedArrivalSeconds?: number; + txHash?: string; + metadata?: Record; +} diff --git a/packages/adapters/inventory/tsconfig.json b/packages/adapters/inventory/tsconfig.json new file mode 100644 index 00000000..ee112e1f --- /dev/null +++ b/packages/adapters/inventory/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist", + "baseUrl": ".", + "composite": true, + "types": ["node", "jest"] + }, + "include": ["src/**/*"], + "exclude": ["dist", "node_modules", "**/*.spec.ts", "test"], + "references": [ + { "path": "../../core" }, + { "path": "../logger" } + ] +} diff --git a/packages/agent/package.json b/packages/agent/package.json index 57b9a1bb..9860ff33 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -25,6 +25,7 @@ "@mark/core": "workspace:*", "@mark/database": "workspace:*", "@mark/everclear": "workspace:*", + "@mark/inventory": "workspace:*", "@mark/logger": "workspace:*", "@mark/prometheus": "workspace:*", "@mark/rebalance": "workspace:*", diff --git a/packages/agent/src/adapters.ts b/packages/agent/src/adapters.ts index d525f709..654e1d10 100644 --- a/packages/agent/src/adapters.ts +++ b/packages/agent/src/adapters.ts @@ -6,6 +6,7 @@ import { EverclearAdapter } from '@mark/everclear'; import { PurchaseCache } from '@mark/cache'; import { PrometheusAdapter } from '@mark/prometheus'; import { RebalanceAdapter } from '@mark/rebalance'; +import { InventoryServiceClient } from '@mark/inventory'; import * as database from '@mark/database'; /** @@ -20,6 +21,7 @@ export interface BaseAdapters { rebalance: RebalanceAdapter; database: typeof database; web3Signer: Web3Signer; + inventory?: InventoryServiceClient; } /** @@ -107,6 +109,16 @@ export function initializeBaseAdapters( // Initialize database database.initializeDatabase(config.database); + // Initialize inventory service client if unified inventory is enabled. + // The inventory API is served from the same connext-api as everclearApiUrl. + let inventory: InventoryServiceClient | undefined; + if (config.unifiedInventoryEnabled) { + inventory = new InventoryServiceClient(config.everclearApiUrl, logger); + logger.info('Unified inventory service client initialized', { + url: config.everclearApiUrl, + }); + } + const baseAdapters: BaseAdapters & { solanaSigner?: SolanaSigner } = { chainService, fillServiceChainService, @@ -116,6 +128,7 @@ export function initializeBaseAdapters( rebalance, database, web3Signer: web3Signer as Web3Signer, + inventory, }; // Initialize Solana signer if requested and configured diff --git a/packages/agent/tsconfig.json b/packages/agent/tsconfig.json index e86165cb..e52323ab 100644 --- a/packages/agent/tsconfig.json +++ b/packages/agent/tsconfig.json @@ -22,6 +22,7 @@ { "path": "../adapters/chainservice" }, { "path": "../adapters/database" }, { "path": "../adapters/everclear" }, + { "path": "../adapters/inventory" }, { "path": "../adapters/logger" }, { "path": "../adapters/prometheus" }, { "path": "../adapters/rebalance" }, diff --git a/packages/core/src/types/config.ts b/packages/core/src/types/config.ts index f171db9f..34d82911 100644 --- a/packages/core/src/types/config.ts +++ b/packages/core/src/types/config.ts @@ -275,6 +275,7 @@ export interface MarkConfiguration extends RebalanceConfig { bridgeContractAddress?: string; // Override Mantle bridge contract }; quoteServiceUrl?: string; // Quote service URL for DEX swap quotes (default: https://quotes.api.everclear.org) + unifiedInventoryEnabled?: boolean; // Enable unified inventory service (reservations/nonce via everclearApiUrl) redis: RedisConfig; database: DatabaseConfig; ownAddress: string; diff --git a/packages/handler/package.json b/packages/handler/package.json index b742f344..73d3d8ff 100644 --- a/packages/handler/package.json +++ b/packages/handler/package.json @@ -27,6 +27,7 @@ "@mark/core": "workspace:*", "@mark/database": "workspace:*", "@mark/everclear": "workspace:*", + "@mark/inventory": "workspace:*", "@mark/logger": "workspace:*", "@mark/poller": "workspace:*", "@mark/prometheus": "workspace:*", diff --git a/packages/handler/src/helpers/intent.ts b/packages/handler/src/helpers/intent.ts index 9528ddb6..053739a0 100644 --- a/packages/handler/src/helpers/intent.ts +++ b/packages/handler/src/helpers/intent.ts @@ -114,9 +114,45 @@ export async function splitAndSendIntents( // Send all intents in one batch let purchases: PurchaseAction[] = []; + + // Create MARK_PURCHASE reservation via inventory service + const { inventory } = processingContext; + let purchaseReservationId: string | undefined; + if (inventory && originDomain) { + const totalAmount = intents.reduce((sum, i) => sum + BigInt(i.amount), 0n); + const tickerAddress = getTokenAddressFromConfig(ticker, originDomain, config); + const reservation = await inventory.createReservation({ + chainId: originDomain, + asset: tickerAddress || ticker, + amount: totalAmount.toString(), + operationType: 'MARK_PURCHASE', + operationId: invoiceId, + requestedBy: 'mark-handler', + ttlSeconds: 120, // spec: MARK_PURCHASE default TTL is 120s + metadata: { + invoiceId, + tickerHash: ticker, + intentCount: intents.length.toString(), + }, + }); + if (reservation) { + purchaseReservationId = reservation.id; + await inventory.updateReservationStatus(reservation.id, 'EXECUTING'); + } + } + try { const intentResults = await sendIntents(invoice.intent_id, intents, processingContext, config, requestId); + // Report successful purchase to inventory service + if (inventory && purchaseReservationId && intentResults.length > 0) { + await inventory.reportTransactionSuccess( + purchaseReservationId, + intentResults[0].transactionHash, + intentResults[0].chainId, + ); + } + // Create purchases maintaining the invoice-intent relationship purchases = intentResults.map((result, index) => ({ target: invoice, @@ -188,6 +224,11 @@ export async function splitAndSendIntents( duration: getTimeSeconds() - start, }); } catch (error) { + // Release purchase reservation on failure + if (inventory && purchaseReservationId) { + await inventory.reportTransactionFailure(purchaseReservationId, 'intent_submission_failed'); + } + prometheus.recordInvalidPurchase(InvalidPurchaseReasons.TransactionFailed, labels); logger.error('Failed to send intents for invoice', { diff --git a/packages/handler/tsconfig.json b/packages/handler/tsconfig.json index 6ab44563..436f2481 100644 --- a/packages/handler/tsconfig.json +++ b/packages/handler/tsconfig.json @@ -22,6 +22,7 @@ { "path": "../adapters/database" }, { "path": "../adapters/chainservice" }, { "path": "../adapters/everclear" }, + { "path": "../adapters/inventory" }, { "path": "../adapters/prometheus" }, { "path": "../adapters/rebalance" }, { "path": "../adapters/web3signer" }, diff --git a/packages/poller/package.json b/packages/poller/package.json index 43b5d902..ac27e118 100644 --- a/packages/poller/package.json +++ b/packages/poller/package.json @@ -30,6 +30,7 @@ "@mark/core": "workspace:*", "@mark/database": "workspace:*", "@mark/everclear": "workspace:*", + "@mark/inventory": "workspace:*", "@mark/logger": "workspace:*", "@mark/prometheus": "workspace:*", "@mark/rebalance": "workspace:*", diff --git a/packages/poller/src/helpers/intent.ts b/packages/poller/src/helpers/intent.ts index 9c15c240..d7e0b9c1 100644 --- a/packages/poller/src/helpers/intent.ts +++ b/packages/poller/src/helpers/intent.ts @@ -359,6 +359,9 @@ export const sendEvmIntents = async ( }, zodiacConfig: originWalletConfig, context: { requestId, invoiceId, transactionType: 'batch-create-intent' }, + inventory: adapters.inventory, + walletAddress: config.ownAddress, + operationId: invoiceId, }); const purchaseTx: TransactionReceipt = purchaseResult.receipt!; diff --git a/packages/poller/src/helpers/transactions.ts b/packages/poller/src/helpers/transactions.ts index 7e9ef75c..29404859 100644 --- a/packages/poller/src/helpers/transactions.ts +++ b/packages/poller/src/helpers/transactions.ts @@ -2,6 +2,8 @@ import { ChainService, TransactionReceipt } from '@mark/chainservice'; import { LoggingContext, TransactionSubmissionType, TransactionRequest, WalletConfig, isEvmChain } from '@mark/core'; import { wrapTransactionWithZodiac } from './zodiac'; import { Logger } from '@mark/logger'; +import type { InventoryServiceClient, NonceAssignment } from '@mark/inventory'; + export interface TransactionSubmissionParams { chainService: ChainService; logger: Logger; @@ -9,27 +11,69 @@ export interface TransactionSubmissionParams { txRequest: TransactionRequest; zodiacConfig: WalletConfig; context?: LoggingContext; // For logging context + inventory?: InventoryServiceClient; // Optional: unified inventory service for nonce management + walletAddress?: string; // Wallet address for nonce management (required if inventory is provided) + operationId?: string; // Operation ID for nonce tracking } export interface TransactionSubmissionResult { submissionType: TransactionSubmissionType; hash: string; // unique identifier for the transaction, could be safe hash or transaction hash receipt?: TransactionReceipt; // The actual receipt type from chainService + nonceAssignment?: NonceAssignment; // Nonce assigned by inventory service (if used) } /** - * Submits a transaction with consistent logging and error handling + * Submits a transaction with consistent logging, nonce management, and result reporting. + * + * When an inventory service client is provided: + * 1. Acquires a nonce from the inventory service before submission + * 2. Reports nonce confirmation on success (with tx hash) + * 3. Reports nonce failure on error + * + * Falls back to chain service nonce management if inventory service is unavailable. */ export async function submitTransactionWithLogging( params: TransactionSubmissionParams, ): Promise { - const { chainService, logger, chainId, txRequest, zodiacConfig, context = {} } = params; + const { + chainService, + logger, + chainId, + txRequest, + zodiacConfig, + context = {}, + inventory, + walletAddress, + operationId, + } = params; // Prepare the transaction (wrap with Zodiac if needed) const preparedTx = isEvmChain(chainId) ? await wrapTransactionWithZodiac({ ...txRequest, chainId: +params.chainId }, zodiacConfig) : txRequest; + // Acquire nonce from inventory service if available + let nonceAssignment: NonceAssignment | undefined; + if (inventory && walletAddress && isEvmChain(chainId)) { + nonceAssignment = await inventory.assignNonce(chainId, walletAddress, operationId); + if (nonceAssignment) { + logger.info('Acquired nonce from inventory service', { + ...context, + chainId, + nonce: nonceAssignment.nonce, + nonceId: nonceAssignment.nonceId, + }); + // Attach nonce to the transaction request (TransactionRequest.nonce is a string) + preparedTx.nonce = nonceAssignment.nonce.toString(); + } else { + logger.debug('Inventory service nonce unavailable, using chain service nonce', { + ...context, + chainId, + }); + } + } + logger.info('Submitting transaction', { ...context, chainId, @@ -38,6 +82,7 @@ export async function submitTransactionWithLogging( originalTo: txRequest.to, value: preparedTx.value?.toString() || '0', funcSig: preparedTx.funcSig, + nonce: nonceAssignment?.nonce, }); try { @@ -48,12 +93,19 @@ export async function submitTransactionWithLogging( chainId, transactionHash: receipt.transactionHash, walletType: zodiacConfig.walletType, + nonce: nonceAssignment?.nonce, }); + // Report nonce confirmation to inventory service + if (nonceAssignment && inventory && walletAddress) { + await inventory.confirmNonce(chainId, walletAddress, nonceAssignment.nonce, receipt.transactionHash); + } + return { submissionType: TransactionSubmissionType.Onchain, hash: receipt.transactionHash, receipt, + nonceAssignment, }; } catch (error) { logger.error('Transaction submission failed', { @@ -62,7 +114,14 @@ export async function submitTransactionWithLogging( error, txRequest: preparedTx, walletType: zodiacConfig.walletType, + nonce: nonceAssignment?.nonce, }); + + // Report nonce failure to inventory service + if (nonceAssignment && inventory && walletAddress) { + await inventory.failNonce(chainId, walletAddress, nonceAssignment.nonce); + } + throw error; } } diff --git a/packages/poller/src/init.ts b/packages/poller/src/init.ts index f9be6e05..4f966c6d 100644 --- a/packages/poller/src/init.ts +++ b/packages/poller/src/init.ts @@ -15,6 +15,7 @@ import { PurchaseCache } from '@mark/cache'; import { PrometheusAdapter } from '@mark/prometheus'; import { rebalanceInventory, cleanupExpiredEarmarks, cleanupExpiredRegularRebalanceOps } from './rebalance'; import { RebalanceAdapter } from '@mark/rebalance'; +import { InventoryServiceClient } from '@mark/inventory'; import { cleanupViemClients } from './helpers/contracts'; import * as database from '@mark/database'; import { bytesToHex, WalletClient } from 'viem'; @@ -33,6 +34,7 @@ export interface MarkAdapters { prometheus: PrometheusAdapter; rebalance: RebalanceAdapter; database: typeof database; + inventory?: InventoryServiceClient; // Optional: unified inventory service client } export interface ProcessingContext extends MarkAdapters { config: MarkConfiguration; diff --git a/packages/poller/src/invoice/processInvoices.ts b/packages/poller/src/invoice/processInvoices.ts index 7421c150..81b1bbbe 100644 --- a/packages/poller/src/invoice/processInvoices.ts +++ b/packages/poller/src/invoice/processInvoices.ts @@ -503,11 +503,43 @@ export async function processTickerGroup( // Send all intents in one batch let purchases: PurchaseAction[] = []; + const { inventory } = context; + const purchaseReservationIds: string[] = []; try { if (allIntents.length === 0) { throw new Error('No intents to send'); } + // Create MARK_PURCHASE reservations via inventory service for each invoice in the batch + if (inventory) { + for (const { invoice, intents } of batchedGroup.invoicesWithIntents) { + const totalAmount = intents.reduce((sum, i) => sum + BigInt(i.amount), 0n); + const tickerAddress = getTokenAddressFromConfig( + invoice.ticker_hash, + batchedGroup.origin, + config, + ); + const reservation = await inventory.createReservation({ + chainId: batchedGroup.origin, + asset: tickerAddress || invoice.ticker_hash, + amount: totalAmount.toString(), + operationType: 'MARK_PURCHASE', + operationId: invoice.intent_id, + requestedBy: 'mark', + ttlSeconds: 120, // spec: MARK_PURCHASE default TTL is 120s + metadata: { + invoiceId: invoice.intent_id, + tickerHash: invoice.ticker_hash, + intentCount: intents.length.toString(), + }, + }); + if (reservation) { + purchaseReservationIds.push(reservation.id); + await inventory.updateReservationStatus(reservation.id, 'EXECUTING'); + } + } + } + const intentResults = await sendIntents( allIntents[0].invoice.intent_id, allIntents.map((i) => i.params), @@ -516,6 +548,17 @@ export async function processTickerGroup( requestId, ); + // Report successful purchase to inventory service + if (inventory && intentResults.length > 0) { + for (const reservationId of purchaseReservationIds) { + await inventory.reportTransactionSuccess( + reservationId, + intentResults[0].transactionHash, + intentResults[0].chainId, + ); + } + } + // Create purchases maintaining the invoice-intent relationship purchases = intentResults.map((result, index) => ({ target: allIntents[index].invoice, @@ -592,6 +635,13 @@ export async function processTickerGroup( duration: getTimeSeconds() - start, }); } catch (error) { + // Release purchase reservations on failure + if (inventory && purchaseReservationIds.length > 0) { + for (const reservationId of purchaseReservationIds) { + await inventory.reportTransactionFailure(reservationId, 'intent_submission_failed'); + } + } + // Record invalid purchase for each invoice in the batch for (const { invoice } of batchedGroup.invoicesWithIntents) { prometheus.recordInvalidPurchase(InvalidPurchaseReasons.TransactionFailed, { diff --git a/packages/poller/src/rebalance/bridgeExecution.ts b/packages/poller/src/rebalance/bridgeExecution.ts index ee143f59..fba6d741 100644 --- a/packages/poller/src/rebalance/bridgeExecution.ts +++ b/packages/poller/src/rebalance/bridgeExecution.ts @@ -20,7 +20,7 @@ import { SenderConfig } from './types'; // --------------------------------------------------------------------------- export interface SubmitBridgeTxsParams { - context: Pick; + context: Pick; chainService: ChainService; route: { origin: number; destination: number; asset: string }; bridgeType: SupportedBridge; @@ -87,6 +87,9 @@ export const submitBridgeTransactions = async ({ }, zodiacConfig, context: { requestId, route, bridgeType, transactionType: memo, sender: senderLabel }, + inventory: context.inventory, + walletAddress: senderAddress, + operationId: `rebalance-${bridgeType}-${route.origin}-${route.destination}`, }); logger.info('Successfully submitted bridge transaction', { diff --git a/packages/poller/src/rebalance/onDemand.ts b/packages/poller/src/rebalance/onDemand.ts index 107b48c1..3a23f9ed 100644 --- a/packages/poller/src/rebalance/onDemand.ts +++ b/packages/poller/src/rebalance/onDemand.ts @@ -22,6 +22,7 @@ import { jsonifyError } from '@mark/logger'; import { RebalanceTransactionMemo } from '@mark/rebalance'; import { getValidatedZodiacConfig, getActualAddress } from '../helpers/zodiac'; import { submitTransactionWithLogging } from '../helpers/transactions'; +import type { Reservation } from '@mark/inventory'; const MIN_REBALANCE_AMOUNT_FACTOR = 2n; @@ -1026,7 +1027,7 @@ export async function executeOnDemandRebalancing( evaluationResult: OnDemandRebalanceResult, context: ProcessingContext, ): Promise { - const { logger, requestId, config } = context; + const { logger, requestId, config, inventory } = context; if (!evaluationResult.canRebalance) { return null; @@ -1048,6 +1049,45 @@ export async function executeOnDemandRebalancing( return existingActive.status === EarmarkStatus.PENDING ? existingActive.id : null; } + // Create a reservation via the inventory service BEFORE creating the earmark. + // This ensures the unified inventory service tracks the reserved funds across all services. + let reservation: Reservation | undefined; + if (inventory) { + const tickerAddress = getTokenAddressFromConfig( + invoice.ticker_hash.toLowerCase(), + destinationChain!.toString(), + config, + ); + reservation = await inventory.createReservation({ + chainId: destinationChain!.toString(), + asset: tickerAddress || invoice.ticker_hash, + amount: minAmount!, + operationType: 'REBALANCE_ONDEMAND', + operationId: invoice.intent_id, + requestedBy: 'mark', + ttlSeconds: 3600, // spec: REBALANCE_ONDEMAND default TTL is 3600s + metadata: { + invoiceId: invoice.intent_id, + tickerHash: invoice.ticker_hash, + }, + }); + + if (reservation) { + logger.info('Created inventory reservation for on-demand rebalance', { + requestId, + reservationId: reservation.id, + invoiceId: invoice.intent_id, + chainId: destinationChain, + amount: minAmount, + }); + } else { + logger.warn('Failed to create inventory reservation, proceeding with earmark only', { + requestId, + invoiceId: invoice.intent_id, + }); + } + } + // Create an earmark with INITIATING status BEFORE executing any bridge transactions. // The unique partial index on (invoice_id) WHERE status IN ('initiating', 'pending', 'ready') // prevents two concurrent processes from both creating earmarks. This ensures the constraint @@ -1065,13 +1105,24 @@ export async function executeOnDemandRebalancing( const dbError = error as { code?: string; constraint?: string }; if (dbError.code === '23505' && dbError.constraint === 'unique_active_earmark_per_invoice') { // Another process won the race and created the earmark first. - // Return null — the caller will retry and pick up the existing earmark. + // Release the reservation since we won't proceed. + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'race_condition_earmark_exists', + }); + } logger.warn('Race condition: another process created earmark first, skipping', { requestId, invoiceId: invoice.intent_id, }); return null; } + // Release reservation on unexpected error + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'earmark_creation_failed', + }); + } throw error; } @@ -1079,8 +1130,16 @@ export async function executeOnDemandRebalancing( requestId, earmarkId: earmark.id, invoiceId: invoice.intent_id, + reservationId: reservation?.id, }); + // Update reservation to ACTIVE now that earmark is created + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'ACTIVE', { + earmarkId: earmark.id, + }); + } + // Track successful operations to create database records later const successfulOperations: Array<{ originChainId: number; @@ -1095,6 +1154,34 @@ export async function executeOnDemandRebalancing( try { for (const operation of rebalanceOperations!) { + // Create a per-operation reservation for the origin chain funds being bridged + let originReservation: Reservation | undefined; + if (inventory && !operation.isSameChainSwap) { + const originTickerAddress = getTokenAddressFromConfig( + invoice.ticker_hash.toLowerCase(), + operation.originChain.toString(), + config, + ); + originReservation = await inventory.createReservation({ + chainId: operation.originChain.toString(), + asset: originTickerAddress || invoice.ticker_hash, + amount: operation.amount, + operationType: 'REBALANCE_ONDEMAND', + operationId: `${invoice.intent_id}-bridge-${operation.originChain}-${operation.destinationChain}`, + requestedBy: 'mark', + ttlSeconds: 3600, // 1 hour for bridge operations + metadata: { + invoiceId: invoice.intent_id, + bridge: operation.bridge, + parentReservationId: reservation?.id || '', + }, + }); + + if (originReservation) { + await inventory.updateReservationStatus(originReservation.id, 'EXECUTING'); + } + } + const execResult = await executeSingleOperation( operation, invoice.intent_id, @@ -1104,10 +1191,20 @@ export async function executeOnDemandRebalancing( ); if (!execResult) { - // Error already logged in executeSingleOperation + // Release origin reservation on failure + if (originReservation && inventory) { + await inventory.updateReservationStatus(originReservation.id, 'FAILED', { + reason: 'bridge_operation_failed', + }); + } // For swaps, fail fast; for bridges, continue to next operation if (operation.isSameChainSwap) { await database.updateEarmarkStatus(earmark.id, EarmarkStatus.FAILED); + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'swap_operation_failed', + }); + } return null; } continue; @@ -1129,6 +1226,40 @@ export async function executeOnDemandRebalancing( receipt: execResult.result.receipt, recipient: execResult.recipient, }); + + // Report successful bridge tx to inventory service and release origin reservation + if (originReservation && inventory) { + await inventory.reportTransactionSuccess( + originReservation.id, + execResult.result.receipt.transactionHash, + operation.originChain.toString(), + { bridge: operation.bridge }, + ); + } + + // Register pending inbound on the destination chain so the inventory + // service knows funds are in transit and can include them in balance calcs + if (inventory) { + const destTickerAddress = getTokenAddressFromConfig( + invoice.ticker_hash.toLowerCase(), + destinationChain!.toString(), + config, + ); + await inventory.registerInbound({ + chainId: destinationChain!.toString(), + asset: destTickerAddress || invoice.ticker_hash, + amount: execResult.result.effectiveAmount || operation.amount, + sourceChain: operation.originChain.toString(), + operationType: 'REBALANCE_ONDEMAND', + operationId: invoice.intent_id, + expectedArrivalSeconds: 1800, // 30 min default for bridge operations + txHash: execResult.result.receipt.transactionHash, + metadata: { + bridge: operation.bridge, + earmarkId: earmark.id, + }, + }); + } } } @@ -1145,6 +1276,11 @@ export async function executeOnDemandRebalancing( }); } await database.updateEarmarkStatus(earmark.id, EarmarkStatus.FAILED); + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'no_bridge_operations', + }); + } return null; } @@ -1155,6 +1291,11 @@ export async function executeOnDemandRebalancing( totalBridgeOperations: bridgeOperationCount, }); await database.updateEarmarkStatus(earmark.id, EarmarkStatus.FAILED); + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'all_bridge_operations_failed', + }); + } return null; } @@ -1164,11 +1305,26 @@ export async function executeOnDemandRebalancing( const newStatus = allSucceeded ? EarmarkStatus.PENDING : EarmarkStatus.FAILED; await database.updateEarmarkStatus(earmark.id, newStatus); + // Update reservation status to match earmark + if (reservation && inventory) { + if (allSucceeded) { + await inventory.updateReservationStatus(reservation.id, 'EXECUTING', { + earmarkStatus: 'pending', + bridgeOperations: successfulOperations.length.toString(), + }); + } else { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'partial_bridge_failure', + }); + } + } + if (allSucceeded) { logger.info('All bridge operations succeeded, earmark now PENDING', { requestId, earmarkId: earmark.id, invoiceId: invoice.intent_id, + reservationId: reservation?.id, successfulOperations: successfulOperations.length, totalBridgeOperations: bridgeOperationCount, }); @@ -1232,6 +1388,12 @@ export async function executeOnDemandRebalancing( error: jsonifyError(updateError), }); } + // Release reservation on error + if (reservation && inventory) { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'execution_error', + }); + } return null; } } @@ -1736,6 +1898,9 @@ async function executeRebalanceTransactionWithBridge( }, zodiacConfig, context: { requestId, invoiceId, bridgeType, transactionType: memo }, + inventory: context.inventory, + walletAddress: context.config.ownAddress, + operationId: `${invoiceId}-${bridgeType}-${route.origin}`, }); logger.info('Successfully submitted on-demand rebalance transaction', { @@ -1881,7 +2046,7 @@ export async function cleanupCompletedEarmarks( purchasedInvoiceIds: string[], context: ProcessingContext, ): Promise { - const { logger, requestId } = context; + const { logger, requestId, inventory } = context; for (const invoiceId of purchasedInvoiceIds) { try { @@ -1890,6 +2055,18 @@ export async function cleanupCompletedEarmarks( if (earmark && earmark.status === EarmarkStatus.READY) { await database.updateEarmarkStatus(earmark.id, EarmarkStatus.COMPLETED); + // Release any associated inventory reservations + if (inventory) { + const reservations = await inventory.getReservationsByOperation(invoiceId); + for (const reservation of reservations) { + if (reservation.status !== 'COMPLETED' && reservation.status !== 'FAILED' && reservation.status !== 'EXPIRED') { + await inventory.updateReservationStatus(reservation.id, 'COMPLETED', { + reason: 'earmark_completed', + }); + } + } + } + logger.info('Marked earmark as completed', { requestId, earmarkId: earmark.id, @@ -1907,7 +2084,7 @@ export async function cleanupCompletedEarmarks( } export async function cleanupStaleEarmarks(invoiceIds: string[], context: ProcessingContext): Promise { - const { logger, requestId } = context; + const { logger, requestId, inventory } = context; for (const invoiceId of invoiceIds) { try { @@ -1917,6 +2094,18 @@ export async function cleanupStaleEarmarks(invoiceIds: string[], context: Proces // Mark earmark as cancelled since the invoice is no longer available await database.updateEarmarkStatus(earmark.id, EarmarkStatus.CANCELLED); + // Release any associated inventory reservations + if (inventory) { + const reservations = await inventory.getReservationsByOperation(invoiceId); + for (const reservation of reservations) { + if (reservation.status !== 'COMPLETED' && reservation.status !== 'FAILED' && reservation.status !== 'EXPIRED') { + await inventory.updateReservationStatus(reservation.id, 'FAILED', { + reason: 'earmark_cancelled_stale', + }); + } + } + } + logger.info('Marked stale earmark as cancelled', { requestId, earmarkId: earmark.id, @@ -1939,10 +2128,41 @@ export async function getEarmarkedBalance( tickerHash: string, context: ProcessingContext, ): Promise { - const { config } = context; + const { config, inventory, logger, requestId } = context; const ticker = tickerHash.toLowerCase(); + // If inventory service is available, query it for the authoritative reserved balance. + // GET /inventory/balance/{chainId}/{asset} returns the full balance view including + // reservations across all services (Mark, fast-path filler, etc.). + if (inventory) { + const tickerAddress = getTokenAddressFromConfig(ticker, chainId.toString(), config); + if (tickerAddress) { + const balance = await inventory.getInventoryBalance(chainId.toString(), tickerAddress); + if (balance) { + // The reserved amount is totalBalance - availableBalance + const totalReserved = BigInt(balance.totalBalance) - BigInt(balance.availableBalance); + logger.debug('Got reserved balance from inventory service', { + requestId, + chainId, + ticker, + totalBalance: balance.totalBalance, + availableBalance: balance.availableBalance, + totalReserved: totalReserved.toString(), + reservedByType: balance.reservedByType, + }); + return totalReserved > 0n ? totalReserved : 0n; + } + // Fall through to database-based calculation if inventory service call fails + logger.debug('Inventory service unavailable, falling back to database earmarks', { + requestId, + chainId, + ticker, + }); + } + } + + // Fallback: database-based earmark calculation (original behavior) // Get earmarked amounts (initiating, pending, and ready) const earmarks = await database.getEarmarks({ designatedPurchaseChain: chainId, diff --git a/packages/poller/src/rebalance/rebalance.ts b/packages/poller/src/rebalance/rebalance.ts index 105fb9ec..924db70c 100644 --- a/packages/poller/src/rebalance/rebalance.ts +++ b/packages/poller/src/rebalance/rebalance.ts @@ -2,6 +2,7 @@ import { getMarkBalances, getTickerForAsset, convertToNativeUnits } from '../hel import { jsonifyMap, jsonifyError } from '@mark/logger'; import { getDecimalsFromConfig, + getTokenAddressFromConfig, WalletType, RebalanceOperationStatus, DBPS_MULTIPLIER, @@ -149,7 +150,33 @@ export async function rebalanceInventory(context: ProcessingContext): Promise 0) { rebalanceOperations.push(...result.actions); rebalanceSuccessful = true; + // Mark reservation as completed + if (inventory && thresholdReservationId) { + await inventory.updateReservationStatus(thresholdReservationId, 'COMPLETED'); + } break; // Exit the bridge preference loop for this route } - // Empty actions means quote/slippage failure — try next preference + // Empty actions means quote/slippage failure — release reservation and try next preference + if (inventory && thresholdReservationId) { + await inventory.updateReservationStatus(thresholdReservationId, 'FAILED', { + reason: 'quote_slippage_failure', + }); + } continue; } catch (error) { + // Release reservation on bridge execution failure + if (inventory && thresholdReservationId) { + await inventory.updateReservationStatus(thresholdReservationId, 'FAILED', { + reason: 'bridge_execution_error', + }); + } logger.error('Failed to execute bridge, trying next preference', { requestId, route, diff --git a/packages/poller/tsconfig.json b/packages/poller/tsconfig.json index acece3bf..6d4db9c0 100644 --- a/packages/poller/tsconfig.json +++ b/packages/poller/tsconfig.json @@ -25,6 +25,7 @@ { "path": "../adapters/database" }, { "path": "../adapters/chainservice" }, { "path": "../adapters/everclear" }, + { "path": "../adapters/inventory" }, { "path": "../adapters/prometheus" }, { "path": "../adapters/rebalance" }, { "path": "../adapters/web3signer" } diff --git a/yarn.lock b/yarn.lock index f04d2738..e0133a31 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5431,6 +5431,7 @@ __metadata: "@mark/core": "workspace:*" "@mark/database": "workspace:*" "@mark/everclear": "workspace:*" + "@mark/inventory": "workspace:*" "@mark/logger": "workspace:*" "@mark/prometheus": "workspace:*" "@mark/rebalance": "workspace:*" @@ -5563,6 +5564,7 @@ __metadata: "@mark/core": "workspace:*" "@mark/database": "workspace:*" "@mark/everclear": "workspace:*" + "@mark/inventory": "workspace:*" "@mark/logger": "workspace:*" "@mark/poller": "workspace:*" "@mark/prometheus": "workspace:*" @@ -5593,6 +5595,24 @@ __metadata: languageName: unknown linkType: soft +"@mark/inventory@workspace:*, @mark/inventory@workspace:packages/adapters/inventory": + version: 0.0.0-use.local + resolution: "@mark/inventory@workspace:packages/adapters/inventory" + dependencies: + "@mark/core": "workspace:*" + "@mark/logger": "workspace:*" + "@types/jest": 29.5.0 + "@types/node": 20.17.12 + axios: 1.9.0 + eslint: 9.17.0 + jest: 29.5.0 + rimraf: 6.0.1 + sort-package-json: 2.12.0 + ts-jest: 29.1.0 + typescript: 5.7.2 + languageName: unknown + linkType: soft + "@mark/logger@workspace:*, @mark/logger@workspace:packages/adapters/logger": version: 0.0.0-use.local resolution: "@mark/logger@workspace:packages/adapters/logger" @@ -5618,6 +5638,7 @@ __metadata: "@mark/core": "workspace:*" "@mark/database": "workspace:*" "@mark/everclear": "workspace:*" + "@mark/inventory": "workspace:*" "@mark/logger": "workspace:*" "@mark/prometheus": "workspace:*" "@mark/rebalance": "workspace:*" From 4dbb21d101c0a079eea06872bae732ae6084d02f Mon Sep 17 00:00:00 2001 From: liu-zhipeng <57480598+liu-zhipeng@users.noreply.github.com> Date: Fri, 27 Mar 2026 19:42:20 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20single=20EOA=20support=20=E2=80=94?= =?UTF-8?q?=20remove=20fillServiceSignerUrl,=20fix=20double-counting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/agent/src/adapters.ts | 52 +++---------------- packages/core/src/config.ts | 2 - packages/core/src/types/config.ts | 2 - packages/handler/src/init.ts | 2 +- .../handler/src/processor/eventProcessor.ts | 4 +- packages/poller/src/helpers/balance.ts | 46 +++++++++++++++- packages/poller/src/helpers/erc20.ts | 11 ++++ packages/poller/src/helpers/intent.ts | 4 ++ packages/poller/src/init.ts | 4 +- .../poller/src/invoice/processInvoices.ts | 2 +- packages/poller/src/rebalance/mantleEth.ts | 4 +- packages/poller/src/rebalance/onDemand.ts | 41 +++++++++------ packages/poller/src/rebalance/rebalance.ts | 22 ++++++-- 13 files changed, 117 insertions(+), 79 deletions(-) diff --git a/packages/agent/src/adapters.ts b/packages/agent/src/adapters.ts index 654e1d10..8b9846ea 100644 --- a/packages/agent/src/adapters.ts +++ b/packages/agent/src/adapters.ts @@ -14,14 +14,13 @@ import * as database from '@mark/database'; */ export interface BaseAdapters { chainService: ChainService; - fillServiceChainService?: ChainService; everclear: EverclearAdapter; purchaseCache: PurchaseCache; prometheus: PrometheusAdapter; rebalance: RebalanceAdapter; database: typeof database; web3Signer: Web3Signer; - inventory?: InventoryServiceClient; + inventory: InventoryServiceClient; } /** @@ -66,39 +65,11 @@ export function initializeBaseAdapters( logger, ); - // Initialize fill service chain service if configured - // Check all rebalance configs that may use a separate fill service sender - let fillServiceChainService: ChainService | undefined; - const fsSenderAddress = - config.tacRebalance?.fillService?.senderAddress ?? - config.tacRebalance?.fillService?.address ?? - config.methRebalance?.fillService?.senderAddress ?? - config.methRebalance?.fillService?.address ?? - config.aManUsdeRebalance?.fillService?.senderAddress ?? - config.aManUsdeRebalance?.fillService?.address ?? - config.aMansyrupUsdtRebalance?.fillService?.senderAddress ?? - config.aMansyrupUsdtRebalance?.fillService?.address; - if (config.fillServiceSignerUrl && fsSenderAddress) { - logger.info('Initializing Fill Service chain service', { - signerUrl: config.fillServiceSignerUrl, - senderAddress: fsSenderAddress, - }); - - const fillServiceSigner = config.fillServiceSignerUrl.startsWith('http') - ? new Web3Signer(config.fillServiceSignerUrl) - : new EthWallet(config.fillServiceSignerUrl); - - fillServiceChainService = new ChainService( - { - chains: config.chains, - maxRetries: 3, - retryDelay: 15000, - logLevel: config.logLevel, - }, - fillServiceSigner as EthWallet, - logger, - ); - } + // Initialize inventory service client. + // The inventory API is served from the same connext-api as everclearApiUrl. + // Always initialized — all methods are non-blocking and degrade gracefully + // when the inventory service endpoints are not available. + const inventory = new InventoryServiceClient(config.everclearApiUrl, logger); // Initialize other adapters const everclear = new EverclearAdapter(config.everclearApiUrl, logger); @@ -109,19 +80,8 @@ export function initializeBaseAdapters( // Initialize database database.initializeDatabase(config.database); - // Initialize inventory service client if unified inventory is enabled. - // The inventory API is served from the same connext-api as everclearApiUrl. - let inventory: InventoryServiceClient | undefined; - if (config.unifiedInventoryEnabled) { - inventory = new InventoryServiceClient(config.everclearApiUrl, logger); - logger.info('Unified inventory service client initialized', { - url: config.everclearApiUrl, - }); - } - const baseAdapters: BaseAdapters & { solanaSigner?: SolanaSigner } = { chainService, - fillServiceChainService, everclear, purchaseCache, prometheus, diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index 94b6e14d..39175deb 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -467,8 +467,6 @@ export async function loadConfiguration(): Promise { const config: MarkConfiguration = { pushGatewayUrl: configJson.pushGatewayUrl ?? (await requireEnv('PUSH_GATEWAY_URL')), web3SignerUrl: configJson.web3SignerUrl ?? (await requireEnv('SIGNER_URL')), - fillServiceSignerUrl: - configJson.fillServiceSignerUrl ?? (await fromEnv('FILL_SERVICE_SIGNER_URL', true)) ?? undefined, everclearApiUrl: configJson.everclearApiUrl ?? (await fromEnv('EVERCLEAR_API_URL')) ?? apiUrl, relayer: { url: configJson?.relayer?.url ?? (await fromEnv('RELAYER_URL')) ?? undefined, diff --git a/packages/core/src/types/config.ts b/packages/core/src/types/config.ts index 34d82911..928f1504 100644 --- a/packages/core/src/types/config.ts +++ b/packages/core/src/types/config.ts @@ -222,7 +222,6 @@ export interface DatabaseConfig { export interface MarkConfiguration extends RebalanceConfig { pushGatewayUrl: string; web3SignerUrl: string; - fillServiceSignerUrl?: string; // Optional: separate web3signer for fill service sender everclearApiUrl: string; relayer: { url?: string; @@ -275,7 +274,6 @@ export interface MarkConfiguration extends RebalanceConfig { bridgeContractAddress?: string; // Override Mantle bridge contract }; quoteServiceUrl?: string; // Quote service URL for DEX swap quotes (default: https://quotes.api.everclear.org) - unifiedInventoryEnabled?: boolean; // Enable unified inventory service (reservations/nonce via everclearApiUrl) redis: RedisConfig; database: DatabaseConfig; ownAddress: string; diff --git a/packages/handler/src/init.ts b/packages/handler/src/init.ts index e82d9f74..2726e64a 100644 --- a/packages/handler/src/init.ts +++ b/packages/handler/src/init.ts @@ -40,13 +40,13 @@ export async function initializeAdapters(config: MarkConfiguration, logger: Logg const processingContext: ProcessingContext = { purchaseCache: baseAdapters.purchaseCache, chainService: baseAdapters.chainService, - fillServiceChainService: baseAdapters.fillServiceChainService, everclear: baseAdapters.everclear, web3Signer: baseAdapters.web3Signer, logger, prometheus: baseAdapters.prometheus, rebalance: baseAdapters.rebalance, database: baseAdapters.database, + inventory: baseAdapters.inventory, config, requestId: bytesToHex(randomBytes(32)), startTime: Math.floor(Date.now() / 1000), diff --git a/packages/handler/src/processor/eventProcessor.ts b/packages/handler/src/processor/eventProcessor.ts index 10f22de9..0ac72dbb 100644 --- a/packages/handler/src/processor/eventProcessor.ts +++ b/packages/handler/src/processor/eventProcessor.ts @@ -50,7 +50,7 @@ export class EventProcessor { */ async processInvoiceEnqueued(event: QueuedEvent): Promise { const startTime = Date.now(); - const { config, everclear, chainService, purchaseCache, logger, prometheus, requestId, database } = + const { config, everclear, chainService, purchaseCache, logger, prometheus, requestId, database, inventory } = this.processingContext; let start = getTimeSeconds(); @@ -250,7 +250,7 @@ export class EventProcessor { // Query all of Mark's balances across chains logger.info('Getting mark balances', { requestId, chains: Object.keys(config.chains) }); start = getTimeSeconds(); - const balances = await getMarkBalances(config, chainService, prometheus); + const balances = await getMarkBalances(config, chainService, prometheus, inventory); logger.debug('Retrieved balances', { requestId, invoiceId: event.id, diff --git a/packages/poller/src/helpers/balance.ts b/packages/poller/src/helpers/balance.ts index 7a891d4d..d8124833 100644 --- a/packages/poller/src/helpers/balance.ts +++ b/packages/poller/src/helpers/balance.ts @@ -16,6 +16,7 @@ import { PrometheusAdapter } from '@mark/prometheus'; import { getValidatedZodiacConfig, getActualOwner } from './zodiac'; import { ChainService } from '@mark/chainservice'; import { TronWeb } from 'tronweb'; +import type { InventoryServiceClient } from '@mark/inventory'; /** * Returns the gas balance of mark on all chains. @@ -92,13 +93,14 @@ export const getMarkBalances = async ( config: MarkConfiguration, chainService: ChainService, prometheus: PrometheusAdapter, + inventory?: InventoryServiceClient, ): Promise>> => { const tickers = getTickers(config); const addresses = await chainService.getAddress(); const results = await Promise.all( tickers.map(async (ticker) => { - const tickerBalances = await getMarkBalancesForTicker(ticker, config, chainService, prometheus, addresses); + const tickerBalances = await getMarkBalancesForTicker(ticker, config, chainService, prometheus, addresses, inventory); return { ticker, tickerBalances }; }), ); @@ -121,6 +123,7 @@ export const getMarkBalancesForTicker = async ( chainService: ChainService, prometheus: PrometheusAdapter, addresses?: Record, + inventory?: InventoryServiceClient, ): Promise> => { const { chains } = config; @@ -144,6 +147,18 @@ export const getMarkBalancesForTicker = async ( if (!tokenAddr || !decimals || tokenAddr === zeroAddress || isNative) { continue; } + + // When inventory service is available, use its availableBalance for EVM chains. + // This reflects reservations from all services (Mark + fill service), giving + // an accurate view of funds actually available for use. + if (inventory && !isSvm && !isTvm) { + balancePromises.push({ + domain, + promise: getInventoryAvailableBalance(inventory, domain, tokenAddr, decimals, prometheus), + }); + continue; + } + const address = isSvm ? config.ownSolAddress : isTvm ? resolvedAddresses[domain] : config.ownAddress; const balancePromise = isSvm ? getSvmBalance(config, chainService, domain, address, tokenAddr, decimals, prometheus) @@ -275,6 +290,35 @@ export const getEvmBalance = async ( } }; +/** + * Gets the available balance from the unified inventory service. + * The inventory service returns availableBalance = totalBalance - reservations - pendingIntents + pendingInbound, + * which accounts for spending by all services (Mark, fill service, etc.). + * Falls back to 0n on error (caller can retry with on-chain read). + */ +const getInventoryAvailableBalance = async ( + inventory: InventoryServiceClient, + domain: string, + tokenAddr: string, + decimals: number, + prometheus: PrometheusAdapter, +): Promise => { + const balance = await inventory.getInventoryBalance(domain, tokenAddr); + if (!balance) { + return 0n; + } + + let available = BigInt(balance.availableBalance); + + // Inventory service returns raw on-chain units; convert to 18 decimals + if (decimals !== 18) { + available = convertTo18Decimals(available, decimals); + } + + prometheus.updateChainBalance(domain, tokenAddr, available); + return available; +}; + /** * Returns all of the custodied amounts for supported assets across all chains * @returns Mapping of balances keyed on tickerhash - chain - amount diff --git a/packages/poller/src/helpers/erc20.ts b/packages/poller/src/helpers/erc20.ts index c432e539..09122b74 100644 --- a/packages/poller/src/helpers/erc20.ts +++ b/packages/poller/src/helpers/erc20.ts @@ -6,6 +6,7 @@ import { TransactionReason } from '@mark/prometheus'; import { PrometheusAdapter } from '@mark/prometheus'; import { TronWeb } from 'tronweb'; import { submitTransactionWithLogging } from './transactions'; +import type { InventoryServiceClient } from '@mark/inventory'; export interface ApprovalParams { config: MarkConfiguration; @@ -19,6 +20,8 @@ export interface ApprovalParams { owner: string; zodiacConfig: WalletConfig; context?: LoggingContext; // For logging context (requestId, invoiceId, etc.) + inventory?: InventoryServiceClient; // For nonce sequencing with shared EOA + walletAddress?: string; // Wallet address for nonce management } export interface ApprovalResult { @@ -86,6 +89,8 @@ export async function checkAndApproveERC20(params: ApprovalParams): Promise { - const { logger, requestId, config, rebalance, chainService, fillServiceChainService, database: db } = context; + const { logger, requestId, config, rebalance, chainService, fillServiceChainService, database: db, inventory } = context; const logContext = { requestId, operationId: operation.id, @@ -1009,7 +1009,7 @@ async function processMethOperation(operation: RebalanceOperation, context: Proc // Step 4: Submit the Mantle bridge transactions try { const { receipt: mantleReceipt, effectiveBridgedAmount } = await submitBridgeTransactions({ - context: { requestId, logger, config }, + context: { requestId, logger, config, inventory }, chainService: selectedChainService as ChainService, route, bridgeType: mantleBridgeType, diff --git a/packages/poller/src/rebalance/onDemand.ts b/packages/poller/src/rebalance/onDemand.ts index 3a23f9ed..e6d537a0 100644 --- a/packages/poller/src/rebalance/onDemand.ts +++ b/packages/poller/src/rebalance/onDemand.ts @@ -65,13 +65,18 @@ export async function evaluateOnDemandRebalancing( return { canRebalance: false }; } - const balances = await getMarkBalances(config, context.chainService, context.prometheus); - - // Get active earmarks to exclude from available balance - const activeEarmarks = await database.getEarmarks({ - status: [EarmarkStatus.INITIATING, EarmarkStatus.PENDING, EarmarkStatus.READY], - }); - const earmarkedFunds = calculateEarmarkedFunds(activeEarmarks); + const balances = await getMarkBalances(config, context.chainService, context.prometheus, context.inventory); + + // Get active earmarks to exclude from available balance. + // When inventory service provides the balance, it already accounts for all reservations, + // so we use empty earmarked funds to avoid double-counting. + let earmarkedFunds: EarmarkedFunds[] = []; + if (!context.inventory) { + const activeEarmarks = await database.getEarmarks({ + status: [EarmarkStatus.INITIATING, EarmarkStatus.PENDING, EarmarkStatus.READY], + }); + earmarkedFunds = calculateEarmarkedFunds(activeEarmarks); + } // For each potential destination chain, evaluate if we can aggregate enough funds const evaluationResults: Map = new Map(); @@ -1065,7 +1070,7 @@ export async function executeOnDemandRebalancing( operationType: 'REBALANCE_ONDEMAND', operationId: invoice.intent_id, requestedBy: 'mark', - ttlSeconds: 3600, // spec: REBALANCE_ONDEMAND default TTL is 3600s + ttlSeconds: 3600, metadata: { invoiceId: invoice.intent_id, tickerHash: invoice.ticker_hash, @@ -1458,16 +1463,20 @@ export async function handleMinAmountIncrease( difference: additionalAmount.toString(), }); - // Get current balances and earmarked funds - const balances = await getMarkBalances(config, context.chainService, context.prometheus); - const activeEarmarks = await database.getEarmarks({ - status: [EarmarkStatus.INITIATING, EarmarkStatus.PENDING, EarmarkStatus.READY], - }); - const earmarkedFunds = calculateEarmarkedFunds(activeEarmarks); + // Get current balances and earmarked funds. + // When inventory provides balance, it already accounts for reservations — skip earmark subtraction. + const balances = await getMarkBalances(config, context.chainService, context.prometheus, context.inventory); + let earmarkedFundsForMinAmount: EarmarkedFunds[] = []; + if (!context.inventory) { + const activeEarmarks = await database.getEarmarks({ + status: [EarmarkStatus.INITIATING, EarmarkStatus.PENDING, EarmarkStatus.READY], + }); + earmarkedFundsForMinAmount = calculateEarmarkedFunds(activeEarmarks); + } // Check if destination already has enough available balance const destinationBalance = balances.get(ticker)?.get(earmark.designatedPurchaseChain.toString()) || 0n; - const earmarkedOnDestination = earmarkedFunds + const earmarkedOnDestination = earmarkedFundsForMinAmount .filter((e) => e.chainId === earmark.designatedPurchaseChain && e.tickerHash.toLowerCase() === ticker) .reduce((sum, e) => sum + e.amount, 0n); const availableBalance = destinationBalance - earmarkedOnDestination; @@ -1498,7 +1507,7 @@ export async function handleMinAmountIncrease( additionalAmount, additionalRouteEntries, balances, - earmarkedFunds, + earmarkedFundsForMinAmount, invoice.ticker_hash.toLowerCase(), earmark.invoiceId, context, diff --git a/packages/poller/src/rebalance/rebalance.ts b/packages/poller/src/rebalance/rebalance.ts index 924db70c..14ee148b 100644 --- a/packages/poller/src/rebalance/rebalance.ts +++ b/packages/poller/src/rebalance/rebalance.ts @@ -30,7 +30,7 @@ export async function rebalanceInventory(context: ProcessingContext): Promise Date: Fri, 27 Mar 2026 22:09:18 +0800 Subject: [PATCH 3/3] test: add unit tests for inventory --- .../adapters/inventory/test/client.spec.ts | 203 ++++++++++++++++++ .../helpers/inventory-integration.spec.ts | 100 +++++++++ .../helpers/inventory-reservations.spec.ts | 173 +++++++++++++++ 3 files changed, 476 insertions(+) create mode 100644 packages/adapters/inventory/test/client.spec.ts create mode 100644 packages/poller/test/helpers/inventory-integration.spec.ts create mode 100644 packages/poller/test/helpers/inventory-reservations.spec.ts diff --git a/packages/adapters/inventory/test/client.spec.ts b/packages/adapters/inventory/test/client.spec.ts new file mode 100644 index 00000000..5ef500ae --- /dev/null +++ b/packages/adapters/inventory/test/client.spec.ts @@ -0,0 +1,203 @@ +import { describe, it, expect, beforeEach, jest } from '@jest/globals'; +import { InventoryServiceClient } from '../src'; +import { axiosGet, axiosPost } from '@mark/core'; +import { Logger } from '@mark/logger'; +import axios from 'axios'; + +jest.mock('@mark/core', () => { + const actual = jest.requireActual('@mark/core') as any; + return { ...actual, axiosGet: jest.fn(), axiosPost: jest.fn() }; +}); + +jest.mock('axios', () => ({ put: jest.fn(), delete: jest.fn() })); + +jest.mock('@mark/logger', () => ({ + jsonifyError: jest.fn((error: any) => ({ message: error?.message || 'Unknown error' })), + Logger: jest.fn().mockImplementation(() => ({ + info: jest.fn(), error: jest.fn(), warn: jest.fn(), debug: jest.fn(), logger: {}, + })), +})); + +describe('InventoryServiceClient', () => { + const apiUrl = 'https://api.everclear.org'; + let client: InventoryServiceClient; + let logger: Logger; + let mockAxiosGet: jest.MockedFunction; + let mockAxiosPost: jest.MockedFunction; + let mockAxiosPut: jest.MockedFunction; + let mockAxiosDelete: jest.MockedFunction; + + beforeEach(() => { + jest.clearAllMocks(); + logger = new Logger({ service: 'test' }); + client = new InventoryServiceClient(apiUrl, logger); + mockAxiosGet = axiosGet as jest.MockedFunction; + mockAxiosPost = axiosPost as jest.MockedFunction; + mockAxiosPut = axios.put as jest.MockedFunction; + mockAxiosDelete = axios.delete as jest.MockedFunction; + }); + + describe('createReservation', () => { + const params = { chainId: '1', asset: '0xUSDC', amount: '1000000', operationType: 'MARK_PURCHASE' as const, operationId: 'i-1', requestedBy: 'mark' }; + + it('should create reservation successfully', async () => { + const res = { id: 'res-1', ...params, priority: 4, status: 'PENDING' }; + mockAxiosPost.mockResolvedValue({ data: res } as any); + expect(await client.createReservation(params)).toEqual(res); + expect(mockAxiosPost).toHaveBeenCalledWith(`${apiUrl}/inventory/reserve`, params, undefined, 1, 0); + }); + + it('should return undefined on error', async () => { + mockAxiosPost.mockRejectedValue(new Error('fail')); + expect(await client.createReservation(params)).toBeUndefined(); + expect(logger.warn).toHaveBeenCalled(); + }); + }); + + describe('updateReservationStatus', () => { + it('should update successfully', async () => { + mockAxiosPut.mockResolvedValue({ data: { id: 'res-1', status: 'ACTIVE' } } as any); + expect(await client.updateReservationStatus('res-1', 'ACTIVE')).toEqual({ id: 'res-1', status: 'ACTIVE' }); + }); + + it('should pass metadata', async () => { + mockAxiosPut.mockResolvedValue({ data: {} } as any); + await client.updateReservationStatus('res-1', 'COMPLETED', { txHash: '0x1' }); + expect(mockAxiosPut).toHaveBeenCalledWith(`${apiUrl}/inventory/reserve/res-1/status`, { status: 'COMPLETED', metadata: { txHash: '0x1' } }); + }); + + it('should return undefined on error', async () => { + mockAxiosPut.mockRejectedValue(new Error('fail')); + expect(await client.updateReservationStatus('res-1', 'ACTIVE')).toBeUndefined(); + }); + }); + + describe('deleteReservation', () => { + it('should return true on success', async () => { + mockAxiosDelete.mockResolvedValue({ data: { success: true } } as any); + expect(await client.deleteReservation('res-1')).toBe(true); + }); + + it('should return false on error', async () => { + mockAxiosDelete.mockRejectedValue(new Error('fail')); + expect(await client.deleteReservation('res-1')).toBe(false); + }); + }); + + describe('getInventoryBalance', () => { + it('should return balance', async () => { + const balance = { chainId: '1', asset: '0xU', totalBalance: '5000000', availableBalance: '3000000', reservedByType: {}, pendingInbound: '0', pendingIntents: '0', reservationCount: 0, timestamp: Date.now() }; + mockAxiosGet.mockResolvedValue({ data: balance } as any); + expect(await client.getInventoryBalance('1', '0xU')).toEqual(balance); + }); + + it('should return undefined on error', async () => { + mockAxiosGet.mockRejectedValue(new Error('fail')); + expect(await client.getInventoryBalance('1', '0xU')).toBeUndefined(); + }); + }); + + describe('getReservationsByOperation', () => { + it('should return reservations', async () => { + mockAxiosGet.mockResolvedValue({ data: { reservations: [{ id: 'r1' }] } } as any); + expect(await client.getReservationsByOperation('op-1')).toEqual([{ id: 'r1' }]); + }); + + it('should return empty array on error', async () => { + mockAxiosGet.mockRejectedValue(new Error('fail')); + expect(await client.getReservationsByOperation('op-1')).toEqual([]); + }); + }); + + describe('assignNonce', () => { + it('should assign nonce', async () => { + const assignment = { nonce: 42, nonceId: '1:0xw:42', chainId: '1', wallet: '0xw', assignedAt: Date.now() }; + mockAxiosPost.mockResolvedValue({ data: assignment } as any); + expect(await client.assignNonce('1', '0xw', 'op-1')).toEqual(assignment); + }); + + it('should return undefined on error', async () => { + mockAxiosPost.mockRejectedValue(new Error('fail')); + expect(await client.assignNonce('1', '0xw')).toBeUndefined(); + }); + }); + + describe('confirmNonce', () => { + it('should confirm', async () => { + mockAxiosPost.mockResolvedValue({ data: { success: true } } as any); + await client.confirmNonce('1', '0xw', 42, '0xtx'); + expect(mockAxiosPost).toHaveBeenCalledWith(`${apiUrl}/inventory/nonce/confirm`, { chainId: '1', wallet: '0xw', nonce: 42, txHash: '0xtx' }, undefined, 1, 0); + }); + + it('should not throw on error', async () => { + mockAxiosPost.mockRejectedValue(new Error('fail')); + await expect(client.confirmNonce('1', '0xw', 42)).resolves.toBeUndefined(); + }); + }); + + describe('failNonce', () => { + it('should report failure', async () => { + mockAxiosPost.mockResolvedValue({ data: { success: true } } as any); + await client.failNonce('1', '0xw', 42); + expect(mockAxiosPost).toHaveBeenCalledWith(`${apiUrl}/inventory/nonce/fail`, { chainId: '1', wallet: '0xw', nonce: 42 }, undefined, 1, 0); + }); + + it('should not throw on error', async () => { + mockAxiosPost.mockRejectedValue(new Error('fail')); + await expect(client.failNonce('1', '0xw', 42)).resolves.toBeUndefined(); + }); + }); + + describe('registerInbound', () => { + it('should register', async () => { + const params = { chainId: '42161', asset: '0xU', amount: '1000000', sourceChain: '1', operationType: 'REBALANCE_ONDEMAND', operationId: 'op-1', expectedArrivalSeconds: 1800 }; + mockAxiosPost.mockResolvedValue({ data: { id: 'inb-1', ...params, status: 'PENDING' } } as any); + const result = await client.registerInbound(params); + expect(result?.id).toBe('inb-1'); + }); + + it('should return undefined on error', async () => { + mockAxiosPost.mockRejectedValue(new Error('fail')); + expect(await client.registerInbound({ chainId: '1', asset: '0x', amount: '0', sourceChain: '2', operationType: 'REBALANCE_ONDEMAND', operationId: 'x' })).toBeUndefined(); + }); + }); + + describe('confirmInbound', () => { + it('should confirm', async () => { + mockAxiosPost.mockResolvedValue({ data: { id: 'inb-1', status: 'CONFIRMED' } } as any); + expect((await client.confirmInbound('inb-1', '0xtx'))?.status).toBe('CONFIRMED'); + }); + }); + + describe('cancelInbound', () => { + it('should cancel', async () => { + mockAxiosPost.mockResolvedValue({ data: { id: 'inb-1', status: 'CANCELLED' } } as any); + expect((await client.cancelInbound('inb-1', 'reason'))?.status).toBe('CANCELLED'); + }); + }); + + describe('reportTransactionSuccess', () => { + it('should call updateReservationStatus with COMPLETED', async () => { + mockAxiosPut.mockResolvedValue({ data: {} } as any); + await client.reportTransactionSuccess('res-1', '0xtx', '1', { bridge: 'across' }); + expect(mockAxiosPut).toHaveBeenCalledWith(`${apiUrl}/inventory/reserve/res-1/status`, { status: 'COMPLETED', metadata: { txHash: '0xtx', chainId: '1', bridge: 'across' } }); + }); + }); + + describe('reportTransactionFailure', () => { + it('should call updateReservationStatus with FAILED', async () => { + mockAxiosPut.mockResolvedValue({ data: {} } as any); + await client.reportTransactionFailure('res-1', 'gas too low'); + expect(mockAxiosPut).toHaveBeenCalledWith(`${apiUrl}/inventory/reserve/res-1/status`, { status: 'FAILED', metadata: { failureReason: 'gas too low' } }); + }); + }); + + describe('URL handling', () => { + it('should strip trailing slash', () => { + const c = new InventoryServiceClient('https://api.everclear.org/', logger); + mockAxiosPost.mockResolvedValue({ data: {} } as any); + c.assignNonce('1', '0xw'); + expect(mockAxiosPost).toHaveBeenCalledWith('https://api.everclear.org/inventory/nonce/assign', expect.any(Object), undefined, 1, 0); + }); + }); +}); diff --git a/packages/poller/test/helpers/inventory-integration.spec.ts b/packages/poller/test/helpers/inventory-integration.spec.ts new file mode 100644 index 00000000..3235e8af --- /dev/null +++ b/packages/poller/test/helpers/inventory-integration.spec.ts @@ -0,0 +1,100 @@ +import { SinonStubbedInstance, stub, createStubInstance, restore } from 'sinon'; +import * as contractModule from '../../src/helpers/contracts'; +import * as assetModule from '../../src/helpers/asset'; +import * as zodiacModule from '../../src/helpers/zodiac'; +import { getMarkBalances } from '../../src/helpers/balance'; +import { submitTransactionWithLogging } from '../../src/helpers/transactions'; +import { AssetConfiguration, MarkConfiguration, WalletType } from '@mark/core'; +import { PrometheusAdapter } from '@mark/prometheus'; +import { ChainService } from '@mark/chainservice'; +import { InventoryServiceClient } from '@mark/inventory'; + +describe('Inventory Integration', () => { + const mockAssetConfig: AssetConfiguration = { symbol: 'USDC', address: '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', decimals: 6, tickerHash: '0xusdc', isNative: false, balanceThreshold: '10000000000' }; + const mockConfig = { ownAddress: '0xOwnAddress', ownSolAddress: 'SolAddress', chains: { '1': { providers: ['https://rpc'], assets: [mockAssetConfig] } } } as unknown as MarkConfiguration; + + let prometheus: SinonStubbedInstance; + let chainService: SinonStubbedInstance; + let inventory: SinonStubbedInstance; + + beforeEach(() => { + prometheus = createStubInstance(PrometheusAdapter); + chainService = createStubInstance(ChainService); + inventory = createStubInstance(InventoryServiceClient); + chainService.getAddress.resolves({ '1': '0xOwnAddress' }); + stub(assetModule, 'getTickers').returns(['0xusdc']); + stub(assetModule, 'convertTo18Decimals').callsFake((val: bigint) => val * 10n ** 12n); + }); + + afterEach(() => { restore(); }); + + describe('getMarkBalances with inventory (double-counting fix)', () => { + it('should use inventory availableBalance for EVM chains', async () => { + inventory.getInventoryBalance.resolves({ chainId: '1', asset: '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', totalBalance: '5000000', availableBalance: '3000000', reservedByType: {}, pendingInbound: '0', pendingIntents: '0', reservationCount: 2, timestamp: Date.now() }); + const balances = await getMarkBalances(mockConfig, chainService, prometheus, inventory); + expect(inventory.getInventoryBalance.calledOnce).toBe(true); + expect(balances.get('0xusdc')!.get('1')).toBe(3000000n * 10n ** 12n); + }); + + it('should return 0n when inventory returns undefined (conservative)', async () => { + inventory.getInventoryBalance.resolves(undefined); + const balances = await getMarkBalances(mockConfig, chainService, prometheus, inventory); + expect(balances.get('0xusdc')!.get('1')).toBe(0n); + }); + + it('should use on-chain balance when no inventory provided', async () => { + const mockContract = { read: { balanceOf: stub().resolves(5000000n) } }; + stub(contractModule, 'getERC20Contract').resolves(mockContract as any); + stub(zodiacModule, 'getValidatedZodiacConfig').returns({ walletType: WalletType.EOA }); + stub(zodiacModule, 'getActualOwner').returns('0xOwnAddress'); + const balances = await getMarkBalances(mockConfig, chainService, prometheus); + expect(balances.get('0xusdc')!.get('1')).toBe(5000000n * 10n ** 12n); + }); + }); + + describe('submitTransactionWithLogging nonce management', () => { + const baseTx = { to: '0xAddr', data: '0x1234', chainId: 1, from: '0xOwner', value: '0', funcSig: 'test()' }; + const mockReceipt = { transactionHash: '0xtxhash', from: '0xOwner', to: '0xAddr', cumulativeGasUsed: '21000', effectiveGasPrice: '1000000000', blockNumber: 100, status: 1, logs: [], confirmations: 1 }; + let mockLogger: any; + + beforeEach(() => { mockLogger = { info: stub(), warn: stub(), error: stub(), debug: stub() }; }); + + it('should assign nonce and confirm on success', async () => { + inventory.assignNonce.resolves({ nonce: 42, nonceId: '1:0xw:42', chainId: '1', wallet: '0xw', assignedAt: Date.now() }); + chainService.submitAndMonitor.resolves(mockReceipt); + const result = await submitTransactionWithLogging({ chainService, logger: mockLogger, chainId: '1', txRequest: baseTx, zodiacConfig: { walletType: WalletType.EOA }, inventory, walletAddress: '0xOwner', operationId: 'op-1' }); + expect(inventory.assignNonce.calledOnce).toBe(true); + expect(inventory.confirmNonce.calledOnce).toBe(true); + expect(inventory.confirmNonce.firstCall.args).toEqual(['1', '0xOwner', 42, '0xtxhash']); + expect(result.nonceAssignment?.nonce).toBe(42); + }); + + it('should report nonce failure on tx error', async () => { + inventory.assignNonce.resolves({ nonce: 43, nonceId: '1:0xw:43', chainId: '1', wallet: '0xw', assignedAt: Date.now() }); + chainService.submitAndMonitor.rejects(new Error('reverted')); + await expect(submitTransactionWithLogging({ chainService, logger: mockLogger, chainId: '1', txRequest: baseTx, zodiacConfig: { walletType: WalletType.EOA }, inventory, walletAddress: '0xOwner' })).rejects.toThrow('reverted'); + expect(inventory.failNonce.calledOnce).toBe(true); + expect(inventory.confirmNonce.called).toBe(false); + }); + + it('should fall back when inventory nonce unavailable', async () => { + inventory.assignNonce.resolves(undefined); + chainService.submitAndMonitor.resolves(mockReceipt); + const result = await submitTransactionWithLogging({ chainService, logger: mockLogger, chainId: '1', txRequest: baseTx, zodiacConfig: { walletType: WalletType.EOA }, inventory, walletAddress: '0xOwner' }); + expect(inventory.confirmNonce.called).toBe(false); + expect(result.nonceAssignment).toBeUndefined(); + }); + + it('should not assign nonce without walletAddress', async () => { + chainService.submitAndMonitor.resolves(mockReceipt); + await submitTransactionWithLogging({ chainService, logger: mockLogger, chainId: '1', txRequest: baseTx, zodiacConfig: { walletType: WalletType.EOA }, inventory }); + expect(inventory.assignNonce.called).toBe(false); + }); + + it('should not assign nonce without inventory', async () => { + chainService.submitAndMonitor.resolves(mockReceipt); + const result = await submitTransactionWithLogging({ chainService, logger: mockLogger, chainId: '1', txRequest: baseTx, zodiacConfig: { walletType: WalletType.EOA } }); + expect(result.hash).toBe('0xtxhash'); + }); + }); +}); diff --git a/packages/poller/test/helpers/inventory-reservations.spec.ts b/packages/poller/test/helpers/inventory-reservations.spec.ts new file mode 100644 index 00000000..02d0b2b8 --- /dev/null +++ b/packages/poller/test/helpers/inventory-reservations.spec.ts @@ -0,0 +1,173 @@ +import { stub, createStubInstance, restore, SinonStub, SinonStubbedInstance } from 'sinon'; + +jest.mock('@mark/core', () => ({ + ...jest.requireActual('@mark/core'), + getDecimalsFromConfig: jest.fn(() => 6), + getTokenAddressFromConfig: jest.fn(() => '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'), + getIsNativeFromConfig: jest.fn(() => false), +})); + +jest.mock('@mark/database', () => ({ + ...jest.requireActual('@mark/database'), + createEarmark: jest.fn(), + getEarmarks: jest.fn().mockResolvedValue([]), + updateEarmarkStatus: jest.fn(), + getActiveEarmarkForInvoice: jest.fn().mockResolvedValue(null), + createRebalanceOperation: jest.fn().mockResolvedValue({ id: 'op-1' }), + getRebalanceOperations: jest.fn().mockResolvedValue({ operations: [], total: 0 }), + getRebalanceOperationsByEarmark: jest.fn().mockResolvedValue([]), + initializeDatabase: jest.fn(), + getPool: jest.fn(), + isPaused: jest.fn().mockResolvedValue(false), +})); + +import * as database from '@mark/database'; +import { InventoryServiceClient } from '@mark/inventory'; +import { EarmarkStatus, Invoice } from '@mark/core'; +import { ChainService } from '@mark/chainservice'; +import { Logger } from '@mark/logger'; +import { PrometheusAdapter } from '@mark/prometheus'; +import { PurchaseCache } from '@mark/cache'; +import { RebalanceAdapter } from '@mark/rebalance'; +import { ProcessingContext } from '../../src/init'; +import { checkAndApproveERC20 } from '../../src/helpers/erc20'; +import * as balanceHelpers from '../../src/helpers/balance'; +import * as transactionHelper from '../../src/helpers/transactions'; +import * as callbacks from '../../src/rebalance/callbacks'; +import * as assetHelpers from '../../src/helpers/asset'; + +describe('Inventory Reservations', () => { + let inventory: SinonStubbedInstance; + let mockLogger: SinonStubbedInstance; + let chainService: SinonStubbedInstance; + let prometheus: SinonStubbedInstance; + let rebalance: SinonStubbedInstance; + let purchaseCache: SinonStubbedInstance; + + const OWN_ADDR = '0x1111111111111111111111111111111111111111'; + const TOKEN_ADDR = '0x2222222222222222222222222222222222222222'; + const SPENDER_ADDR = '0x3333333333333333333333333333333333333333'; + + const makeContext = (overrides?: Partial): ProcessingContext => + ({ + config: { ownAddress: OWN_ADDR, ownSolAddress: 'SolAddr', chains: { '1': { providers: ['http://rpc'], assets: [] }, '42161': { providers: ['http://rpc2'], assets: [] } }, onDemandRoutes: [], routes: [], earmarkTTLMinutes: 60 }, + requestId: 'test-req-1', startTime: Date.now(), logger: mockLogger, chainService, prometheus, rebalance, purchaseCache, inventory, everclear: undefined, web3Signer: undefined, database, + ...overrides, + }) as unknown as ProcessingContext; + + beforeEach(() => { + jest.clearAllMocks(); + inventory = createStubInstance(InventoryServiceClient); + mockLogger = createStubInstance(Logger); + chainService = createStubInstance(ChainService); + prometheus = createStubInstance(PrometheusAdapter); + rebalance = createStubInstance(RebalanceAdapter); + purchaseCache = createStubInstance(PurchaseCache); + chainService.getAddress.resolves({ '1': OWN_ADDR, '42161': OWN_ADDR }); + }); + + afterEach(() => { restore(); }); + + // ── 1.3 On-Demand Rebalance Reservations ──────────────────────── + + describe('on-demand rebalance reservations', () => { + let executeOnDemandRebalancing: typeof import('../../src/rebalance/onDemand').executeOnDemandRebalancing; + + beforeEach(async () => { + const mod = await import('../../src/rebalance/onDemand'); + executeOnDemandRebalancing = mod.executeOnDemandRebalancing; + }); + + const mockInvoice: Invoice = { intent_id: 'inv-001', amount: '1000000000000000000', owner: '0x4444444444444444444444444444444444444444', entry_epoch: 1, origin: '42161', destinations: ['1'], ticker_hash: '0xusdc', discountBps: 10, hub_status: 'INVOICED', hub_invoice_enqueued_timestamp: Math.floor(Date.now() / 1000) }; + + it('should create reservation BEFORE earmark', async () => { + const order: string[] = []; + inventory.createReservation.callsFake(async () => { order.push('reservation'); return { id: 'res-1', status: 'PENDING' } as any; }); + inventory.updateReservationStatus.resolves({ id: 'res-1', status: 'ACTIVE' } as any); + (database.createEarmark as jest.Mock).mockImplementation(async () => { order.push('earmark'); return { id: 'ear-1', invoiceId: 'inv-001', designatedPurchaseChain: 1, tickerHash: '0xusdc', minAmount: '500000', status: EarmarkStatus.INITIATING }; }); + + await executeOnDemandRebalancing(mockInvoice, { canRebalance: true, destinationChain: 1, rebalanceOperations: [], minAmount: '500000' }, makeContext()); + expect(order[0]).toBe('reservation'); + expect(order[1]).toBe('earmark'); + }); + + it('should return existing earmark on race condition', async () => { + inventory.createReservation.resolves({ id: 'res-1' } as any); + (database.getActiveEarmarkForInvoice as jest.Mock).mockResolvedValue({ id: 'existing-ear', status: EarmarkStatus.PENDING }); + const result = await executeOnDemandRebalancing(mockInvoice, { canRebalance: true, destinationChain: 1, rebalanceOperations: [], minAmount: '500000' }, makeContext()); + expect(result).toBe('existing-ear'); + }); + + it('should mark reservation FAILED when no bridge ops', async () => { + (database.getActiveEarmarkForInvoice as jest.Mock).mockResolvedValue(null); + inventory.createReservation.resolves({ id: 'res-main', status: 'PENDING' } as any); + inventory.updateReservationStatus.resolves({ id: 'res-main' } as any); + (database.createEarmark as jest.Mock).mockResolvedValue({ id: 'ear-1', invoiceId: 'inv-001', designatedPurchaseChain: 1, tickerHash: '0xusdc', minAmount: '500000', status: EarmarkStatus.INITIATING }); + + const result = await executeOnDemandRebalancing(mockInvoice, { canRebalance: true, destinationChain: 1, rebalanceOperations: [], minAmount: '500000' }, makeContext()); + expect(result).toBeNull(); + expect(inventory.updateReservationStatus.getCalls().some(c => c.args[1] === 'FAILED')).toBe(true); + }); + }); + + // ── 1.4 Threshold Rebalance Reservations ──────────────────────── + + describe('threshold rebalance reservations', () => { + let rebalanceInventory: typeof import('../../src/rebalance/rebalance').rebalanceInventory; + + beforeEach(async () => { + const mod = await import('../../src/rebalance/rebalance'); + rebalanceInventory = mod.rebalanceInventory; + stub(callbacks, 'executeDestinationCallbacks').resolves(); + stub(balanceHelpers, 'getMarkBalances').resolves(new Map([['0xusdc', new Map([['1', 20000000000000000000n]])]])); + stub(assetHelpers, 'getTickerForAsset').returns('0xusdc'); + const onDemandMod = await import('../../src/rebalance/onDemand'); + stub(onDemandMod, 'getEarmarkedBalance').resolves(0n); + rebalance.isPaused.resolves(false); + }); + + it('should create REBALANCE_THRESHOLD reservation', async () => { + inventory.createReservation.resolves({ id: 'res-t-1', status: 'PENDING' } as any); + inventory.updateReservationStatus.resolves({} as any); + rebalance.getAdapter.returns({ getReceivedAmount: stub().resolves('19000000000000000000'), send: stub().resolves([]), type: stub().returns('across') } as any); + const config = { ...makeContext().config, routes: [{ origin: 1, destination: 42161, asset: '0xUSDC', maximum: '10000000000000000000', slippagesDbps: [5000], preferences: ['across'] }] }; + await rebalanceInventory(makeContext({ config } as any)); + expect(inventory.createReservation.called).toBe(true); + expect(inventory.createReservation.firstCall.args[0].operationType).toBe('REBALANCE_THRESHOLD'); + }); + + it('should skip bridge when reservation rejected', async () => { + inventory.createReservation.resolves(undefined); + const mockAdapter = { getReceivedAmount: stub(), send: stub(), type: stub().returns('across') }; + rebalance.getAdapter.returns(mockAdapter as any); + const config = { ...makeContext().config, routes: [{ origin: 1, destination: 42161, asset: '0xUSDC', maximum: '10000000000000000000', slippagesDbps: [5000], preferences: ['across'] }] }; + await rebalanceInventory(makeContext({ config } as any)); + expect(mockAdapter.send.called).toBe(false); + }); + }); + + // ── 1.6 ERC20 Approval Nonce ──────────────────────────────────── + + describe('ERC20 approval with inventory nonce', () => { + let submitStub: SinonStub; + + beforeEach(() => { + submitStub = stub(transactionHelper, 'submitTransactionWithLogging').resolves({ submissionType: 1, hash: '0xapproval', receipt: { transactionHash: '0xapproval', from: OWN_ADDR, to: TOKEN_ADDR, cumulativeGasUsed: '50000', effectiveGasPrice: '1000000000', blockNumber: 100, status: 1, logs: [], confirmations: 1 } }); + }); + + it('should pass inventory to submitTransactionWithLogging for approvals', async () => { + chainService.readTx.resolves('0x0000000000000000000000000000000000000000000000000000000000000000' as any); + await checkAndApproveERC20({ config: makeContext().config as any, chainService, logger: mockLogger, chainId: '1', tokenAddress: TOKEN_ADDR, spenderAddress: SPENDER_ADDR, amount: 1000000n, owner: OWN_ADDR, zodiacConfig: { walletType: 'EOA' as any }, inventory, walletAddress: OWN_ADDR }); + expect(submitStub.called).toBe(true); + expect(submitStub.firstCall.args[0].inventory).toBe(inventory); + expect(submitStub.firstCall.args[0].walletAddress).toBe(OWN_ADDR); + }); + + it('should work without inventory', async () => { + chainService.readTx.resolves('0x0000000000000000000000000000000000000000000000000000000000000000' as any); + await checkAndApproveERC20({ config: makeContext().config as any, chainService, logger: mockLogger, chainId: '1', tokenAddress: TOKEN_ADDR, spenderAddress: SPENDER_ADDR, amount: 1000000n, owner: OWN_ADDR, zodiacConfig: { walletType: 'EOA' as any } }); + expect(submitStub.called).toBe(true); + expect(submitStub.firstCall.args[0].inventory).toBeUndefined(); + }); + }); +});