Skip to content
19 changes: 11 additions & 8 deletions crawlers/mooncrawl/mooncrawl/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,21 @@ def connect(
blockchain_type: AvailableBlockchainType,
web3_uri: Optional[str] = None,
access_id: Optional[UUID] = None,
request_timeout: Optional[int] = None,
) -> Web3:
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()

request_kwargs: Any = None
request_kwargs = {}
if access_id is not None:
request_kwargs = {
"headers": {
NB_ACCESS_ID_HEADER: str(access_id),
NB_DATA_SOURCE_HEADER: "blockchain",
"Content-Type": "application/json",
}
request_kwargs["headers"] = {
NB_ACCESS_ID_HEADER: str(access_id),
NB_DATA_SOURCE_HEADER: "blockchain",
"Content-Type": "application/json",
}

if request_timeout is not None:
request_kwargs["timeout"] = request_timeout

if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI
Expand All @@ -73,10 +75,11 @@ def connect(

if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs)
elif web3_uri.startswith("wss://"):
web3_provider = Web3.WebsocketProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)

# Inject --dev middleware if it is not Ethereum mainnet
# Docs: https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority
if blockchain_type != AvailableBlockchainType.ETHEREUM:
Expand Down
153 changes: 108 additions & 45 deletions crawlers/mooncrawl/mooncrawl/generic_crawler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Union

from typing import Any, Dict, List, Optional, Set, Union, Callable
from eth_abi.codec import ABICodec
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params
import web3
from eth_typing import ChecksumAddress
from hexbytes.main import HexBytes
Expand All @@ -19,7 +21,6 @@
ContractFunctionCall,
utfy_dict,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from tqdm import tqdm
from web3 import Web3
Expand All @@ -44,13 +45,21 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# TODO: ADD VALUE!!!

@dataclass
class ExtededFunctionCall(ContractFunctionCall):
class ExtededFunctionCall:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended

block_number: int
block_timestamp: int
transaction_hash: str
contract_address: str
caller_address: str
function_name: str
function_args: Dict[str, Any]
gas_price: int
value: int = 0
max_fee_per_gas: Optional[int] = None
max_priority_fee_per_gas: Optional[int] = None
value: int = 0
status: Optional[str] = None


def _function_call_with_gas_price_to_label(
Expand All @@ -69,8 +78,6 @@ def _function_call_with_gas_price_to_label(
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
"gasPrice": function_call.gas_price,
"maxFeePerGas": function_call.max_fee_per_gas,
"maxPriorityFeePerGas": function_call.max_priority_fee_per_gas,
Expand All @@ -96,7 +103,7 @@ def add_function_calls_with_gas_price_to_session(
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]

logger.info(f"Querrying existing labels (function call)")
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
Expand All @@ -106,6 +113,7 @@ def add_function_calls_with_gas_price_to_session(
)
.all()
)
logger.info(f"Querry finished")

existing_labels_transactions = [label[0] for label in existing_labels]

Expand Down Expand Up @@ -152,6 +160,76 @@ def _transform_to_w3_tx(
return tx


def _fetch_events_chunk(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(zomglings): Why update here and not moonworm?

web3,
event_abi,
from_block: int,
to_block: int,
addresses: Optional[List[ChecksumAddress]] = None,
on_decode_error: Optional[Callable[[Exception], None]] = None,
address_block_list: Optional[List[ChecksumAddress]] = None,
) -> List[Any]:
"""Get events using eth_getLogs API.

Event structure:
{
"event": Event name,
"args": dictionary of event arguments,
"address": contract address,
"blockNumber": block number,
"transactionHash": transaction hash,
"logIndex": log index
}

"""

if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")

# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec

_, event_filter_params = construct_event_filter_params(
event_abi,
codec,
fromBlock=from_block,
toBlock=to_block,
)
if addresses:
event_filter_params["address"] = addresses

logs = web3.eth.get_logs(event_filter_params)
logger.info(f"Fetched {len(logs)} raw logs")
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
if address_block_list and log["address"] in address_block_list:
continue
try:
raw_event = get_event_data(codec, event_abi, log)
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))),
"address": raw_event["address"],
"blockNumber": raw_event["blockNumber"],
"transactionHash": raw_event["transactionHash"].hex(),
"logIndex": raw_event["logIndex"],
}
all_events.append(event)
except Exception as e:
if address_block_list is not None:
address_block_list.append(log["address"])
if on_decode_error:
on_decode_error(e)
continue
logger.info(f"Decoded {len(all_events)} logs")
return all_events


def process_transaction(
db_session: Session,
web3: Web3,
Expand All @@ -160,19 +238,20 @@ def process_transaction(
secondary_abi: List[Dict[str, Any]],
transaction: Dict[str, Any],
blocks_cache: Dict[int, int],
skip_decoding: bool = False,
):
selector = transaction["input"][:10]
function_name = selector
function_args = "unknown"
if not skip_decoding:
try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
function_args = utfy_dict(raw_function_call[1])
except Exception as e:
pass
# logger.error(f"Failed to decode transaction : {str(e)}")

try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
function_args = utfy_dict(raw_function_call[1])
except Exception as e:
# logger.error(f"Failed to decode transaction : {str(e)}")
selector = transaction["input"][:10]
function_name = selector
function_args = "unknown"

transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"])
block_timestamp = get_block_timestamp(
db_session,
web3,
Expand All @@ -190,8 +269,6 @@ def process_transaction(
caller_address=transaction["from"],
function_name=function_name,
function_args=function_args,
status=transaction_reciept["status"],
gas_used=transaction_reciept["gasUsed"],
gas_price=transaction["gasPrice"],
max_fee_per_gas=transaction.get(
"maxFeePerGas",
Expand All @@ -200,28 +277,7 @@ def process_transaction(
value=transaction["value"],
)

secondary_logs = []
for log in transaction_reciept["logs"]:
for abi in secondary_abi:
try:
raw_event = get_event_data(web3.codec, abi, log)
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))),
"address": raw_event["address"],
"blockNumber": raw_event["blockNumber"],
"transactionHash": raw_event["transactionHash"].hex(),
"logIndex": raw_event["logIndex"],
"blockTimestamp": block_timestamp,
}
processed_event = _processEvent(event)
secondary_logs.append(processed_event)

break
except:
pass

return function_call, secondary_logs
return function_call, []


def _get_transactions(
Expand Down Expand Up @@ -350,6 +406,7 @@ def crawl(
crawl_transactions: bool = True,
addresses: Optional[List[ChecksumAddress]] = None,
batch_size: int = 100,
skip_decoding_transactions: bool = False,
) -> None:
current_block = from_block

Expand All @@ -371,13 +428,15 @@ def crawl(
logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}")
events = []
logger.info("Fetching events")
block_list = []
for event_abi in events_abi:
raw_events = _fetch_events_chunk(
web3,
event_abi,
current_block,
batch_end,
addresses,
address_block_list=block_list,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
Expand All @@ -386,7 +445,7 @@ def crawl(
blockchain_type,
raw_event["blockNumber"],
blocks_cache=db_blocks_cache,
max_blocks_batch=1000,
max_blocks_batch=100,
)
event = _processEvent(raw_event)
events.append(event)
Expand All @@ -401,6 +460,7 @@ def crawl(
)
logger.info(f"Fetched {len(transactions)} transactions")

logger.info(f"Processing transactions")
function_calls = []
for tx in transactions:
processed_tx, secondary_logs = process_transaction(
Expand All @@ -411,9 +471,12 @@ def crawl(
secondary_abi,
tx,
db_blocks_cache,
skip_decoding=skip_decoding_transactions,
)
function_calls.append(processed_tx)
events.extend(secondary_logs)
logger.info(f"Processed {len(function_calls)} transactions")

add_function_calls_with_gas_price_to_session(
db_session,
function_calls,
Expand Down
33 changes: 17 additions & 16 deletions crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from mooncrawl.data import AvailableBlockchainType # type: ignore

from ..blockchain import connect
from .base import crawl, get_checkpoint, populate_with_events
from ..settings import NB_CONTROLLER_ACCESS_ID
from .base import crawl, get_checkpoint, populate_with_events

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,12 +43,12 @@ def handle_nft_crawler(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
web3 = connect(
blockchain_type,
access_id=args.access_id,
web3_uri=args.web3,
request_timeout=60,
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand All @@ -64,6 +64,7 @@ def handle_nft_crawler(args: argparse.Namespace) -> None:
from_block=last_crawled_block,
to_block=to_block,
batch_size=args.max_blocks_batch,
skip_decoding_transactions=True,
)


Expand Down Expand Up @@ -95,12 +96,12 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
web3 = connect(
blockchain_type,
access_id=args.access_id,
web3_uri=args.web3,
request_timeout=60,
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand All @@ -120,6 +121,8 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None:


def handle_crawl(args: argparse.Namespace) -> None:
# TODO(yhtiyar): fix it
raise NotImplementedError("Deprecated for now, since blocklist is added")
logger.info(f"Starting generic crawler")

label = args.label_name
Expand All @@ -141,12 +144,10 @@ def handle_crawl(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),

web3 = connect(
blockchain_type, access_id=args.access_id, web3_uri=args.web3
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand Down
Loading