Skip to content

Commit f95ae30

Browse files
committed
Internal: message processing is now event-based
Problem: the pending message fetcher and processor use a polling loop to look for messages to fetch/process. This leads to some latency when the pending_messages table is empty as the task sleeps while waiting for new pending messages. Solution: add an exchange + queue in RabbitMQ to signal the arrival of new messages. To avoid modifying the message processor too much and avoid depending on coherency between the DB and RabbitMQ, the fetcher and processor simply spawn a new task that looks for messages and sets an asyncio Event object. The main fetching/processing loop waits on this event (with a timeout). Note that this system is not used for retries as this would require another task that posts messages to the MQ on their next attempt. Retried messages simply wait for the next iteration of the loop (every second). This solution has the following advantages and drawbacks: + No more arbitrary latency when processing new messages + No major modification of the pipeline, even if the MQ system fails for some reason the pending message processor will still process messages every second + No dependency on the state of the message queue, if the RabbitMQ queue is deleted for any reason the processor will keep on working - RabbitMQ overhead (one more exchange + queue).
1 parent 0affbc9 commit f95ae30

File tree

12 files changed

+193
-68
lines changed

12 files changed

+193
-68
lines changed

src/aleph/commands.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
__copyright__ = "Moshe Malawach"
4646
__license__ = "mit"
4747

48+
from aleph.toolkit.rabbitmq import make_mq_conn
49+
4850
LOGGER = logging.getLogger(__name__)
4951

5052

@@ -129,6 +131,9 @@ async def main(args: List[str]) -> None:
129131

130132
setup_logging(args.loglevel)
131133

134+
mq_conn = await make_mq_conn(config)
135+
mq_channel = await mq_conn.channel()
136+
132137
node_cache = await init_node_cache(config)
133138
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
134139
storage_service = StorageService(
@@ -172,11 +177,12 @@ async def main(args: List[str]) -> None:
172177
LOGGER.debug("Initialized p2p")
173178

174179
LOGGER.debug("Initializing listeners")
175-
tasks += listener_tasks(
180+
tasks += await listener_tasks(
176181
config=config,
177182
session_factory=session_factory,
178183
node_cache=node_cache,
179184
p2p_client=p2p_client,
185+
mq_channel=mq_channel,
180186
)
181187
tasks.append(chain_connector.chain_event_loop(config))
182188
LOGGER.debug("Initialized listeners")

src/aleph/db/accessors/pending_messages.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ def get_pending_messages(
7676
return session.execute(select_stmt).scalars()
7777

7878

79+
def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]:
80+
select_stmt = select(PendingMessageDb).where(
81+
PendingMessageDb.id == pending_message_id
82+
)
83+
return session.execute(select_stmt).scalar_one_or_none()
84+
85+
7986
def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int:
8087
"""
8188
Counts pending messages.

src/aleph/handlers/message_handler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
from typing import Optional, Dict, Any, Mapping
44

5+
import aio_pika.abc
56
import psycopg2
67
import sqlalchemy.exc
78
from aleph_message.models import MessageType, ItemType
@@ -175,12 +176,21 @@ def __init__(
175176
session_factory: DbSessionFactory,
176177
storage_service: StorageService,
177178
config: Config,
179+
pending_message_exchange: aio_pika.abc.AbstractExchange,
178180
):
179181
super().__init__(
180182
storage_service=storage_service,
181183
config=config,
182184
)
183185
self.session_factory = session_factory
186+
self.pending_message_exchange = pending_message_exchange
187+
188+
async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None:
189+
mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8"))
190+
process_or_fetch = "process" if pending_message.fetched else "fetch"
191+
await self.pending_message_exchange.publish(
192+
mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}"
193+
)
184194

185195
async def add_pending_message(
186196
self,
@@ -241,7 +251,6 @@ async def add_pending_message(
241251
session.execute(upsert_message_status_stmt)
242252
session.execute(insert_pending_message_stmt)
243253
session.commit()
244-
return pending_message
245254

246255
except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e:
247256
LOGGER.warning(
@@ -259,6 +268,9 @@ async def add_pending_message(
259268
session.commit()
260269
return None
261270

271+
await self._publish_pending_message(pending_message)
272+
return pending_message
273+
262274

263275
class MessageHandler(BaseMessageHandler):
264276
"""
@@ -299,7 +311,6 @@ async def confirm_existing_message(
299311
)
300312
)
301313

302-
303314
async def insert_message(
304315
self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb
305316
):

src/aleph/jobs/fetch_pending_messages.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,19 @@
1313
NewType,
1414
)
1515

16+
import aio_pika.abc
1617
from configmanager import Config
1718
from setproctitle import setproctitle
1819

19-
from ..chains.signature_verifier import SignatureVerifier
20+
from aleph.chains.signature_verifier import SignatureVerifier
2021
from aleph.db.accessors.pending_messages import (
2122
make_pending_message_fetched_statement,
2223
get_next_pending_messages,
2324
)
2425
from aleph.db.connection import make_engine, make_session_factory
25-
from aleph.db.models import PendingMessageDb, MessageDb
26+
from aleph.db.models import MessageDb, PendingMessageDb
2627
from aleph.handlers.message_handler import MessageHandler
28+
from aleph.services.cache.node_cache import NodeCache
2729
from aleph.services.ipfs import IpfsService
2830
from aleph.services.ipfs.common import make_ipfs_client
2931
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
@@ -32,8 +34,8 @@
3234
from aleph.toolkit.monitoring import setup_sentry
3335
from aleph.toolkit.timestamp import utc_now
3436
from aleph.types.db_session import DbSessionFactory
35-
from .job_utils import prepare_loop, MessageJob
36-
from ..services.cache.node_cache import NodeCache
37+
from .job_utils import prepare_loop, MessageJob, make_pending_message_queue
38+
from ..toolkit.rabbitmq import make_mq_conn
3739

3840
LOGGER = getLogger(__name__)
3941

@@ -47,12 +49,15 @@ def __init__(
4749
session_factory: DbSessionFactory,
4850
message_handler: MessageHandler,
4951
max_retries: int,
52+
pending_message_queue: aio_pika.abc.AbstractQueue,
5053
):
5154
super().__init__(
5255
session_factory=session_factory,
5356
message_handler=message_handler,
5457
max_retries=max_retries,
58+
pending_message_queue=pending_message_queue,
5559
)
60+
self.pending_message_queue = pending_message_queue
5661

5762
async def fetch_pending_message(self, pending_message: PendingMessageDb):
5863
with self.session_factory() as session:
@@ -76,6 +81,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb):
7681
exception=e,
7782
)
7883
session.commit()
84+
return None
7985

8086
async def fetch_pending_messages(
8187
self, config: Config, node_cache: NodeCache, loop: bool = True
@@ -140,8 +146,11 @@ async def fetch_pending_messages(
140146
break
141147
# If we are done, wait a few seconds until retrying
142148
if not fetch_tasks:
143-
LOGGER.info("waiting 1 second(s) for new pending messages...")
144-
await asyncio.sleep(1)
149+
LOGGER.info("waiting for new pending messages...")
150+
try:
151+
await asyncio.wait_for(self.ready(), 1)
152+
except TimeoutError:
153+
pass
145154

146155
def make_pipeline(
147156
self,
@@ -156,12 +165,16 @@ def make_pipeline(
156165

157166

158167
async def fetch_messages_task(config: Config):
159-
# TODO: this sleep can probably be removed
160-
await asyncio.sleep(4)
161-
162168
engine = make_engine(config=config, application_name="aleph-fetch")
163169
session_factory = make_session_factory(engine)
164170

171+
mq_conn = await make_mq_conn(config=config)
172+
mq_channel = await mq_conn.channel()
173+
174+
pending_message_queue = await make_pending_message_queue(
175+
config=config, routing_key="fetch.*", channel=mq_channel
176+
)
177+
165178
node_cache = NodeCache(
166179
redis_host=config.redis.host.value, redis_port=config.redis.port.value
167180
)
@@ -182,10 +195,11 @@ async def fetch_messages_task(config: Config):
182195
session_factory=session_factory,
183196
message_handler=message_handler,
184197
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
198+
pending_message_queue=pending_message_queue,
185199
)
186200

187-
while True:
188-
with session_factory() as session:
201+
async with fetcher:
202+
while True:
189203
try:
190204
fetch_pipeline = fetcher.make_pipeline(
191205
config=config, node_cache=node_cache
@@ -197,11 +211,10 @@ async def fetch_messages_task(config: Config):
197211
)
198212

199213
except Exception:
200-
LOGGER.exception("Error in pending messages job")
201-
session.rollback()
214+
LOGGER.exception("Unexpected error in pending messages fetch job")
202215

203-
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
204-
await asyncio.sleep(1)
216+
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
217+
await asyncio.sleep(1)
205218

206219

207220
def fetch_pending_messages_subprocess(config_values: Dict):

src/aleph/jobs/job_utils.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import asyncio
22
import datetime as dt
33
import logging
4-
from typing import Dict, Union
4+
from typing import Dict, Union, Optional
55
from typing import Tuple
66

7-
import aio_pika.abc
7+
import aio_pika
88
from configmanager import Config
99
from sqlalchemy import update
1010

@@ -29,6 +29,58 @@
2929
MAX_RETRY_INTERVAL: int = 300
3030

3131

32+
async def _make_pending_queue(
33+
config: Config,
34+
exchange_name: str,
35+
queue_name: str,
36+
routing_key: str,
37+
channel: Optional[aio_pika.abc.AbstractChannel] = None,
38+
) -> aio_pika.abc.AbstractQueue:
39+
if not channel:
40+
mq_conn = await aio_pika.connect_robust(
41+
host=config.p2p.mq_host.value,
42+
port=config.rabbitmq.port.value,
43+
login=config.rabbitmq.username.value,
44+
password=config.rabbitmq.password.value,
45+
)
46+
channel = await mq_conn.channel()
47+
48+
exchange = await channel.declare_exchange(
49+
name=exchange_name,
50+
type=aio_pika.ExchangeType.TOPIC,
51+
auto_delete=False,
52+
)
53+
queue = await channel.declare_queue(name=queue_name, auto_delete=False)
54+
await queue.bind(exchange, routing_key=routing_key)
55+
return queue
56+
57+
58+
async def make_pending_tx_queue(
59+
config: Config, channel: aio_pika.abc.AbstractChannel
60+
) -> aio_pika.abc.AbstractQueue:
61+
return await _make_pending_queue(
62+
config=config,
63+
exchange_name=config.rabbitmq.pending_tx_exchange.value,
64+
queue_name="pending-tx-queue",
65+
routing_key="#",
66+
channel=channel,
67+
)
68+
69+
70+
async def make_pending_message_queue(
71+
config: Config,
72+
routing_key: str,
73+
channel: Optional[aio_pika.abc.AbstractChannel] = None,
74+
) -> aio_pika.abc.AbstractQueue:
75+
return await _make_pending_queue(
76+
config=config,
77+
exchange_name=config.rabbitmq.pending_message_exchange.value,
78+
queue_name="pending_message_queue",
79+
routing_key=routing_key,
80+
channel=channel,
81+
)
82+
83+
3284
def compute_next_retry_interval(attempts: int) -> dt.timedelta:
3385
"""
3486
Computes the time interval for the next attempt/retry of a message.
@@ -67,6 +119,8 @@ def schedule_next_attempt(
67119
set_next_retry(
68120
session=session, pending_message=pending_message, next_attempt=next_attempt
69121
)
122+
pending_message.next_attempt = next_attempt
123+
pending_message.retries += 1
70124

71125

72126
def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
@@ -125,19 +179,22 @@ async def ready(self):
125179
self._event.clear()
126180

127181

128-
class MessageJob:
182+
class MessageJob(MqWatcher):
129183
def __init__(
130184
self,
131185
session_factory: DbSessionFactory,
132186
message_handler: MessageHandler,
133187
max_retries: int,
188+
pending_message_queue: aio_pika.abc.AbstractQueue,
134189
):
190+
super().__init__(mq_queue=pending_message_queue)
191+
135192
self.session_factory = session_factory
136193
self.message_handler = message_handler
137194
self.max_retries = max_retries
138195

196+
@staticmethod
139197
def _handle_rejection(
140-
self,
141198
session: DbSession,
142199
pending_message: PendingMessageDb,
143200
exception: BaseException,
@@ -158,7 +215,7 @@ def _handle_rejection(
158215

159216
return RejectedMessage(pending_message=pending_message, error_code=error_code)
160217

161-
def _handle_retry(
218+
async def _handle_retry(
162219
self,
163220
session: DbSession,
164221
pending_message: PendingMessageDb,
@@ -222,6 +279,6 @@ async def handle_processing_error(
222279
session=session, pending_message=pending_message, exception=exception
223280
)
224281
else:
225-
return self._handle_retry(
282+
return await self._handle_retry(
226283
session=session, pending_message=pending_message, exception=exception
227284
)

0 commit comments

Comments
 (0)