Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions servers/mu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"express": "^4.18.2",
"http-message-signatures": "^1.0.4",
"hyper-async": "^1.1.2",
"keccak": "^3.0.4",
"lru-cache": "^10.2.0",
"node-cron": "^3.0.3",
"p-map": "^7.0.1",
Expand Down
12 changes: 6 additions & 6 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ const CONFIG_ENVS = {
MAX_WORKERS: process.env.MAX_WORKERS || Math.max(cpus().length - 1, 1),
DB_URL: process.env.DB_URL || 'ao-cache',
TRACE_DB_URL: process.env.TRACE_DB_URL || 'trace',
TASK_QUEUE_MAX_RETRIES: process.env.TASK_QUEUE_MAX_RETRIES || 5,
TASK_QUEUE_MAX_RETRIES: 10,
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000,
ENABLE_MESSAGE_RECOVERY: process.env.ENABLE_MESSAGE_RECOVERY === 'true',
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_MAX_RETRIES: 10,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_MAP: process.env.RELAY_MAP || '',
ENABLE_PUSH: process.env.ENABLE_PUSH === 'true',
Expand All @@ -151,7 +151,7 @@ const CONFIG_ENVS = {
HB_ROUTER_URL: process.env.HB_ROUTER_URL || 'https://forward.computer',
ENABLE_HB_WALLET_CHECK: process.env.ENABLE_HB_WALLET_CHECK !== 'false',
HB_GRAPHQL_URL: process.env.HB_GRAPHQL_URL || 'https://cache.forward.computer',
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || ""
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || ''
},
production: {
MODE,
Expand All @@ -171,15 +171,15 @@ const CONFIG_ENVS = {
MAX_WORKERS: process.env.MAX_WORKERS || Math.max(cpus().length - 1, 1),
DB_URL: process.env.DB_URL || 'ao-cache',
TRACE_DB_URL: process.env.TRACE_DB_URL || 'trace',
TASK_QUEUE_MAX_RETRIES: process.env.TASK_QUEUE_MAX_RETRIES || 5,
TASK_QUEUE_MAX_RETRIES: 10,
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1580000,
ENABLE_MESSAGE_RECOVERY: process.env.ENABLE_MESSAGE_RECOVERY === 'true',
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_MAX_RETRIES: 10,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_MAP: process.env.RELAY_MAP || '',
ENABLE_PUSH: process.env.ENABLE_PUSH === 'true',
Expand All @@ -192,7 +192,7 @@ const CONFIG_ENVS = {
HB_ROUTER_URL: process.env.HB_ROUTER_URL || 'https://forward.computer',
ENABLE_HB_WALLET_CHECK: process.env.ENABLE_HB_WALLET_CHECK !== 'false',
HB_GRAPHQL_URL: process.env.HB_GRAPHQL_URL || 'https://cache.forward.computer',
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || ""
RATE_LIMIT_FILE_URL: process.env.RATE_LIMIT_FILE_URL || ''
}
}

Expand Down
41 changes: 39 additions & 2 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { of, Rejected, fromPromise, Resolved } from 'hyper-async'
import { compose, head, identity, isEmpty, prop, propOr } from 'ramda'
import createKeccakHash from 'keccak'

import { getCuAddressWith } from '../lib/get-cu-address.js'
import { writeMessageTxWith } from '../lib/write-message-tx.js'
Expand Down Expand Up @@ -50,6 +51,38 @@ export function sendDataItemWith ({
const insertMessage = insertMessageWith({ db })

const locateProcessLocal = fromPromise(locateProcessSchema.implement(locateProcess))
function keyToEthereumAddress (key) {
/**
* We need to decode, then remove the first byte denoting compression in secp256k1
*/
const noCompressionByte = Buffer.from(key, 'base64url').subarray(1)

/**
* the un-prefixed address is the last 20 bytes of the hashed
* public key
*/
const noPrefix = createKeccakHash('keccak256')
.update(noCompressionByte)
.digest('hex')
.slice(-40)

/**
* Apply the checksum see https://eips.ethereum.org/EIPS/eip-55
*/
const hash = createKeccakHash('keccak256')
.update(noPrefix)
.digest('hex')

let checksumAddress = '0x'
for (let i = 0; i < noPrefix.length; i++) {
checksumAddress += parseInt(hash[i], 16) >= 8
? noPrefix[i].toUpperCase()
: noPrefix[i]
}

return checksumAddress
}

/**
* Check if the rate limit has been exceeded using rate limit injected
*/
Expand All @@ -68,7 +101,10 @@ export function sendDataItemWith ({
if (isWhitelisted) return Resolved(ctx)
const intervalStart = new Date().getTime() - IP_WALLET_RATE_LIMIT_INTERVAL
const wallet = ctx.dataItem.owner
const address = await toAddress(wallet) || null
let address = await toAddress(wallet) || null
if (ctx.dataItem.signature.length === 87) {
address = keyToEthereumAddress(ctx.dataItem.owner)
}
const rateLimitAllowance = calculateRateLimit(address, ctx.dataItem.target ?? 'SPAWN', rateLimits)
const recentTraces = await getRecentTraces({ wallet, timestamp: intervalStart, processId: ctx.dataItem.target })
const walletTracesCount = recentTraces.wallet.length
Expand Down Expand Up @@ -121,7 +157,8 @@ export function sendDataItemWith ({
assigns,
initialTxId,
parentId,
ip: ctx.ip
ip: ctx.ip,
parentOwner: ctx.dataItem?.owner
})
})
.bimap(
Expand Down
40 changes: 39 additions & 1 deletion servers/mu/src/domain/clients/taskQueue.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import createKeccakHash from 'keccak'

import { randomBytes } from 'crypto'
import { TASKS_TABLE } from './sqlite.js'

Expand Down Expand Up @@ -52,6 +54,38 @@ export async function createTaskQueue ({ queueId, logger, db }) {
* be removed from the database when dequeuing.
*/
export function enqueueWith ({ queue, queueId, logger, db, getRecentTraces, toAddress, getRateLimits, IP_WALLET_RATE_LIMIT, IP_WALLET_RATE_LIMIT_INTERVAL, rateLimits }) {
function keyToEthereumAddress (key) {
/**
* We need to decode, then remove the first byte denoting compression in secp256k1
*/
const noCompressionByte = Buffer.from(key, 'base64url').subarray(1)

/**
* the un-prefixed address is the last 20 bytes of the hashed
* public key
*/
const noPrefix = createKeccakHash('keccak256')
.update(noCompressionByte)
.digest('hex')
.slice(-40)

/**
* Apply the checksum see https://eips.ethereum.org/EIPS/eip-55
*/
const hash = createKeccakHash('keccak256')
.update(noPrefix)
.digest('hex')

let checksumAddress = '0x'
for (let i = 0; i < noPrefix.length; i++) {
checksumAddress += parseInt(hash[i], 16) >= 8
? noPrefix[i].toUpperCase()
: noPrefix[i]
}

return checksumAddress
}

async function checkRateLimitExceeded (task) {
function calculateRateLimit (walletID, procID, limits) {
if (!walletID) return 100
Expand All @@ -65,7 +99,11 @@ export function enqueueWith ({ queue, queueId, logger, db, getRecentTraces, toAd
const intervalStart = new Date().getTime() - IP_WALLET_RATE_LIMIT_INTERVAL
const wallet = task?.wallet || task?.cachedMsg?.wallet || null
const processId = task?.cachedMsg?.msg?.Target || task?.processId || null
const address = task?.cachedMsg?.cron ? wallet : await toAddress(wallet)
let address = task?.cachedMsg?.cron ? wallet : await toAddress(wallet)
const owner = (task.parentOwner ?? task?.wallet ?? task.cachedMsg?.wallet)
if (owner?.length === 87) {
address = keyToEthereumAddress(owner)
}
const rateLimits = getRateLimits()
const isWhitelisted = (rateLimits?.ips?.[task?.ip] ?? 0) > 1
if (isWhitelisted) return false
Expand Down
7 changes: 6 additions & 1 deletion servers/mu/src/domain/clients/worker-fn.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export function processResultWith ({
parentId: propOr(undefined, 'parentId', res),
processId: propOr(undefined, 'processId', res),
wallet: propOr(undefined, 'wallet', res),
ip: propOr(undefined, 'ip', res)
ip: propOr(undefined, 'ip', res),
parentOwner: propOr(undefined, 'parentOwner', res)
})
return of(res)
})
Expand All @@ -60,6 +61,7 @@ export function processResultWith ({
*/
export function enqueueResultsWith ({ enqueue }) {
return ({ msgs, spawns, assigns, initialTxId, parentId, processId, ...rest }) => {
console.dir({ m: 'ENQUEUING RESULTS' }, { depth: null })
const results = [
...msgs.map(msg => ({
type: 'MESSAGE',
Expand All @@ -70,6 +72,7 @@ export function enqueueResultsWith ({ enqueue }) {
parentId: msg.parentId,
logId: randomBytes(8).toString('hex'),
ip: rest.ip,
parentOwner: rest.parentOwner,
wallet: msg.wallet
})),
...spawns.map(spawn => ({
Expand All @@ -81,6 +84,7 @@ export function enqueueResultsWith ({ enqueue }) {
parentId: spawn.parentId,
logId: randomBytes(8).toString('hex'),
ip: rest.ip,
parentOwner: rest.parentOwner,
wallet: spawn.wallet
})),
...assigns.flatMap(assign => assign.Processes.map(
Expand All @@ -97,6 +101,7 @@ export function enqueueResultsWith ({ enqueue }) {
parentId,
logId: randomBytes(8).toString('hex'),
ip: rest.ip,
parentOwner: rest.parentOwner,
wallet: rest.wallet ?? null
})
))
Expand Down
Loading