Skip to content
This repository was archived by the owner on Sep 14, 2023. It is now read-only.

feat: abstract over consumption of different RPC flavors #1177

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cli/resolveNets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ import { register } from "../deps/shims/register-ts-node.ts"
import * as path from "../deps/std/path.ts"
import { NetSpec } from "../nets/mod.ts"

const $nets = $.record($.instance(NetSpec as new() => NetSpec, $.tuple(), (_: NetSpec) => []))
const $nets = $.record(
$.instance(NetSpec as new() => NetSpec, $.tuple(), (_: NetSpec) => []),
)

export async function resolveNets(maybeNetsPath?: string): Promise<Record<string, NetSpec>> {
export async function resolveNets(
maybeNetsPath?: string,
): Promise<Record<string, NetSpec>> {
const resolvedNetsPath = await resolveNetsPath(maybeNetsPath)
if (resolvedNetsPath.endsWith(".ts")) {
await register()
Expand Down
6 changes: 4 additions & 2 deletions deps/smoldot.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export * from "https://deno.land/x/[email protected].6/index-deno.js"
export * from "https://deno.land/x/[email protected].11/index-deno.js"
export type {
AddChainOptions,
Chain,
Client,
ClientOptions,
} from "https://deno.land/x/[email protected].6/public-types.d.ts"
} from "https://deno.land/x/[email protected].11/public-types.d.ts"
25 changes: 25 additions & 0 deletions examples/raw_rpc/consumer.eg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { connect } from "@capi/polkadot"
import { ExperimentalConsumer } from "capi"

const controller = new AbortController()
const { signal } = controller
const consumer = new ExperimentalConsumer(connect, signal, connect)
const x = await consumer.extrinsics()
console.log(x)
controller.abort()

// const metadata = await consumer.metadata()
// console.log(metadata)

// const blockHash = await consumer.blockHash(0)
// console.log(blockHash)

// const block = await consumer.block()
// console.log(block)

// const systemAccountKey = "26aa394eea5630e07c48ae0c9558cef7b99d880ec681799c0cf30e8886371da9"
// const keys = await consumer.keys(hex.decode(systemAccountKey), 1)
// console.log(keys)

// const values = await consumer.values(keys)
// console.log(values)
11,712 changes: 11,712 additions & 0 deletions examples/smoldot/spec.json
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: get rid of this spec before merging

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions rpc/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { RpcCallMessage, RpcIngressMessage, RpcSubscriptionHandler } from "./rpc
const connectionMemos = new Map<new(discovery: any) => Connection, Map<unknown, Connection>>()

export abstract class Connection {
nextId = 0
nextId = 1
references = 0
#controller = new AbortController()
signal = this.#controller.signal
Expand Down Expand Up @@ -72,7 +72,7 @@ export abstract class Connection {
subscriptionPendingInits: Record<number, (subscriptionId: string) => void> = {}
async subscription(
subscribe: string,
unsubscribe: string,
unsubscribe: string | undefined,
params: unknown[],
handler: RpcSubscriptionHandler,
signal: AbortSignal,
Expand All @@ -83,7 +83,7 @@ export abstract class Connection {
if (signal.aborted) return
signal.addEventListener("abort", () => {
delete this.subscriptionHandlers[subscriptionId]
this.send(this.nextId++, unsubscribe, [subscriptionId])
if (unsubscribe) this.send(this.nextId++, unsubscribe, [subscriptionId])
})
this.subscriptionHandlers[subscriptionId] = handler
}
Expand Down
58 changes: 58 additions & 0 deletions rpc/Consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Connect } from "./Connection.ts"
import { ExtrinsicStatus } from "./ExtrinsicStatus.ts"
import { SignedBlock } from "./known/mod.ts"
import { ServerError } from "./rpc_messages.ts"

export abstract class Consumer {
connection

constructor(readonly connect: Connect, readonly signal: AbortSignal) {
this.connection = connect(signal)
}

abstract stateCall(method: string, args: Uint8Array, blockHash?: string): Promise<Uint8Array>

abstract metadata(blockHash?: string): Promise<Uint8Array>

abstract blockHash(blockNumber?: number): Promise<string>

abstract block(blockHash?: string): Promise<SignedBlock>

abstract keys(
key: Uint8Array,
limit: number,
start?: Uint8Array,
blockHash?: string,
): Promise<Uint8Array[]>

abstract values(keys: Uint8Array[], blockHash?: string): Promise<(Uint8Array | undefined)[]>

// TODO: can this be `Uint8Array` while keeping the decoded metadata external?
abstract nonce(ss58Address: string): Promise<number>

abstract submitExtrinsic(
extrinsic: Uint8Array,
cb: (status: ExtrinsicStatus) => void,
signal: AbortSignal,
): void

protected async call<R>(method: string, params: unknown[]): Promise<R> {
const message = await this.connection.call(method, params)
if (message.error) throw new ServerError(message)
return message.result as R
}

protected subscription<R>(
subscribe: string,
unsubscribe: string | undefined,
params: unknown[],
handler: (result: R, subscriptionId: string) => void,
signal: AbortSignal,
) {
this.connection.subscription(subscribe, unsubscribe, params, (message) => {
if (message.error) throw new ServerError(message)
const { result, subscription } = message.params
handler(result as R, subscription)
}, signal)
}
}
245 changes: 245 additions & 0 deletions rpc/ExperimentalConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import { hex } from "../crypto/mod.ts"
import { Deferred, deferred } from "../deps/std/async.ts"
import { Connect } from "./Connection.ts"
import { Consumer } from "./Consumer.ts"
import { ExtrinsicStatus } from "./ExtrinsicStatus.ts"
import * as known from "./known/mod.ts"
import { LegacyConsumer } from "./LegacyConsumer.ts"

export class ExperimentalConsumer extends Consumer {
archiveConsumer

constructor(
connection: Connect,
signal: AbortSignal,
readonly archiveConnect: Connect,
) {
super(connection, signal)
this.archiveConsumer = new LegacyConsumer(archiveConnect, signal)
this.follow()
}

follow() {
this.subscription<known.ChainHeadUnstableFollowEvent>(
"chainHead_unstable_follow",
"chainHead_unstable_unfollow",
[true],
(event, subscriptionId) => {
if (event.event === "initialized") {
const { finalizedBlockRuntime } = event
if (!finalizedBlockRuntime || finalizedBlockRuntime.type === "invalid") {
throw new FinalizedBlockRuntimeInvalidError()
}
const incompatibleRuntime = !finalizedBlockRuntime.spec.apis.find(
([k, v]) => k === "0xd2bc9897eed08f15" && v === 3,
)
if (incompatibleRuntime) throw new IncompatibleRuntimeError()
} else if (event.event === "finalized") {
const finalizedHashesLeading = event.finalizedBlockHashes.slice(0, -1)
;[...finalizedHashesLeading, ...event.prunedBlockHashes].map((blockHash) =>
this.connection.call("chainHead_unstable_unpin", [subscriptionId, blockHash])
)
const finalizedHash = event.finalizedBlockHashes.at(-1)!
this.stateCallFlush(subscriptionId, finalizedHash)
this.blockHashPendings.forEach((pending) => pending.resolve(finalizedHash))
this.blockFlush(subscriptionId, finalizedHash)
this.extrinsicsFlush(subscriptionId, finalizedHash)
this.valuesFlush(subscriptionId, finalizedHash)
} else if (event.event === "stop") this.follow()
},
this.signal,
)
}

stateCallPendings: Record<string, Record<string, Deferred<unknown>[]>> = {}
stateCall(method: string, args: Uint8Array, blockHash?: string) {
if (blockHash) return this.archiveConsumer.stateCall(method, args, blockHash)
const methodPendings = this.stateCallPendings[method] ??= {}
const pending = deferred<Uint8Array>()
const argsPendings = methodPendings[hex.encode(args)] ??= []
argsPendings.push(pending)
return pending
}
stateCallFlush(followId: string, blockHash: string) {
Object.entries(this.stateCallPendings).forEach(([method, argPendings]) => {
Object.entries(argPendings).forEach(([args, pendings]) => {
const controller = new AbortController()
this.subscription<{ event: "done"; output: string }>(
"chainHead_unstable_call",
undefined,
[followId, blockHash, method, args],
(event) => {
delete argPendings[args]
if (!Object.values(argPendings).length) delete this.stateCallPendings[method]
const metadata = hex.decode(event.output)
pendings.forEach((pending) => pending.resolve(metadata))
controller.abort()
},
controller.signal,
)
})
})
}

metadata(blockHash?: string) {
if (blockHash) return this.archiveConsumer.metadata(blockHash)
return this.stateCall("Metadata_metadata", new Uint8Array())
}

blockHashPendings: Deferred<string>[] = []
blockHash(blockNumber?: number) {
if (typeof blockNumber === "number") return this.archiveConsumer.blockHash(blockNumber)
const pending = deferred<string>()
this.blockHashPendings.push(pending)
return pending
}

blockPendings: Deferred<known.SignedBlock>[] = []
block(blockHash?: string) {
if (blockHash) return this.archiveConsumer.block(blockHash)
const pending = deferred<known.SignedBlock>()
this.blockPendings.push(pending)
return pending
}
blockFlush(followId: string, blockHash: string) {
if (this.blockPendings.length) {
const controller = new AbortController()
// TODO: why not emitting?
this.subscription("chainHead_unstable_body", undefined, [followId, blockHash], (_result) => {
while (this.blockPendings.length) {
const blockPending = this.blockPendings.shift()!
blockPending.resolve(null!)
}
}, controller.signal)
}
}

extrinsicsPendings: Deferred<Uint8Array>[] = []
extrinsics(blockHash?: string) {
if (blockHash) return this.archiveConsumer.block(blockHash)
const pending = deferred<Uint8Array>()
this.extrinsicsPendings.push(pending)
return pending
}
extrinsicsFlush(followId: string, blockHash: string) {
if (this.extrinsicsPendings.length) {
const controller = new AbortController()
this.subscription<{ event: "done"; result: string }>(
"chainHead_unstable_body",
undefined,
[followId, blockHash],
(result) => {
while (this.extrinsicsPendings.length) {
const blockPending = this.extrinsicsPendings.shift()!
blockPending.resolve(hex.decode(result.result))
}
},
controller.signal,
)
}
}

keys(key: Uint8Array, limit: number, start?: Uint8Array, blockHash?: string) {
return this.archiveConsumer.keys(key, limit, start, blockHash)
}

valuesKeys: Record<string, true> = {}
valuesPendings: [keys: string[], pending: Deferred<Uint8Array[]>][] = []
values(keys: Uint8Array[], blockHash?: string) {
if (blockHash) return this.archiveConsumer.values(keys, blockHash)
const keysEncoded = keys.map((key) => {
const encoded = hex.encodePrefixed(key)
this.valuesKeys[encoded] = true
return encoded
})
const pending = deferred<Uint8Array[]>()
this.valuesPendings.push([keysEncoded, pending])
return pending
}
valuesFlush(followId: string, blockHash: string) {
const keys = Object.entries(this.valuesKeys)
if (keys.length) {
const items = keys.map((key) => ({ key, type: "value" }))
const controller = new AbortController()
this.subscription<{ event: "items"; items: { key: string; value: string }[] }>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other events that we need to think how to handle

  • waiting-for-continue, tricky because .valuesPending can be resolved only once
  • inaccessible, retry a few times and reject?
  • error, reject
  • disjoint, reject

"chainHead_unstable_storage",
"chainHead_unstable_stopStorage",
[followId, blockHash, items, null],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the docs are outdated or if the polkadot/substrate API implementation is not complete.

Passing a single hex encoded key works.

Something like

      this.subscription<{ event: "items"; items: { key: string; value: string }[] }>(
        "chainHead_unstable_storage",
        "chainHead_unstable_stopStorage",
        [followId, blockHash, items[0]?.key[0], null],
        (message) => {

And the response is something like

<<< {
  message: {
    jsonrpc: "2.0",
    method: "chainHead_unstable_storage",
    params: {
      subscription: "x1sMLqhwCWeg7QBJ",
      result: {
        event: "done",
        result: "0x000000000000000001000000000000004035323d3400000000000000000000000000000000000000000000000000000000"... 62 more characters
      }
    }
  }
}

It seems that the chainHead_unstable_storage is working for single key reads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated this API spec updates and the multiple item support was documented 3 weeks ago
See paritytech/json-rpc-interface-spec@bea1a60

Smoldot hasn't been updated as it supports a single item request
See https://github.com/paritytech/smoldot/blob/938055a638ec201c022f680c8e8cbd0349e70ed1/bin/light-base/src/json_rpc_service/chain_head.rs#L874

And I guess substrate is in the same state

(message) => {
const lookup = Object.fromEntries(message.items.map(({ key, value }) => [key, value]))
while (this.valuesPendings.length) {
const valuesPending = this.valuesPendings.shift()!
const [keys, pending] = valuesPending
pending.resolve(keys.map((key) => hex.decode(lookup[key]!)))
}
this.valuesKeys = {}
controller.abort()
},
controller.signal,
)
}
}

nonce(ss58Address: string) {
return this.archiveConsumer.nonce(ss58Address)
}
Comment on lines +183 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to get this using chainHead_storage and query for system.account(AccountId32)

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There a note in the docs for chainHead_unstable_call

Note: This can be used as a replacement for the legacy state_getMetadata, system_accountNextIndex, and payment_queryInfo functions.

https://paritytech.github.io/json-rpc-interface-spec/api/chainHead_unstable_call.html


submitExtrinsic(
extrinsic: Uint8Array,
handler: (status: ExtrinsicStatus) => void,
signal: AbortSignal,
) {
this.subscription<known.TransactionWatchEvent>(
"transaction_unstable_submitAndWatch",
"transaction_unstable_unwatch",
[extrinsic],
(event) => {
handler(((): ExtrinsicStatus => {
switch (event.event) {
case "validated":
return { type: "validated" }
case "invalid":
return {
type: "invalidated",
reason: event.error,
}
case "broadcasted":
return {
type: "broadcasted",
numPeers: event.numPeers,
}
case "bestChainBlockIncluded":
return {
type: "included",
block: event.block,
}
case "dropped":
return {
type: "dropped",
broadcasted: event.broadcasted,
reason: event.error,
}
case "finalized":
return {
type: "finalized",
block: event.block,
}
case "error":
return {
type: "errored",
message: event.error,
}
}
})())
},
signal,
)
}
}

class FinalizedBlockRuntimeInvalidError extends Error {
override readonly name = "FinalizedBlockRuntimeInvalidError"
}
class IncompatibleRuntimeError extends Error {
override readonly name = "IncompatibleRuntimeError"
}
Loading