11import asyncio
22import json
3- from asyncio import Queue
4- from typing import List
3+ from typing import List , Any
54
65import psycopg2
76import psycopg2 .errors
87from psycopg2 ._psycopg import ReplicationMessage
98from psycopg2 .extras import LogicalReplicationConnection
109
10+ from loguru import logger
1111from meilisync .enums import EventType , SourceType
1212from meilisync .schemas import Event , ProgressEvent
1313from meilisync .settings import Sync
@@ -49,6 +49,7 @@ def __init__(
4949 self .conn = psycopg2 .connect (** self .kwargs , connection_factory = LogicalReplicationConnection )
5050 self .cursor = self .conn .cursor ()
5151 self .queue = None
52+ self ._loop = None
5253 if self .progress :
5354 self .start_lsn = self .progress ["start_lsn" ]
5455 else :
@@ -92,47 +93,55 @@ def _():
9293
9394 def _consumer (self , msg : ReplicationMessage ):
9495 payload = json .loads (msg .payload )
95- changes = payload . get ( "change" )
96- if not changes :
97- return
96+ next_lsn = payload [ "nextlsn" ]
97+
98+ changes = payload . get ( "change" , [])
9899 for change in changes :
99- kind = change .get ("kind" )
100- table = change .get ("table" )
101- if table not in self .tables :
102- return
103- columnnames = change .get ("columnnames" , [])
104- columnvalues = change .get ("columnvalues" , [])
105- columntypes = change .get ("columntypes" , [])
106-
107- for i in range (len (columntypes )):
108- if columntypes [i ] == "json" :
109- columnvalues [i ] = json .loads (columnvalues [i ])
110-
111- if kind == "update" :
112- values = dict (zip (columnnames , columnvalues ))
113- event_type = EventType .update
114- elif kind == "delete" :
115- values = (
116- dict (zip (columnnames , columnvalues ))
117- if columnvalues
118- else {change ["oldkeys" ]["keynames" ][0 ]: change ["oldkeys" ]["keyvalues" ][0 ]}
119- )
120- event_type = EventType .delete
121- elif kind == "insert" :
122- values = dict (zip (columnnames , columnvalues ))
123- event_type = EventType .create
124- else :
125- return
126- asyncio .new_event_loop ().run_until_complete (
127- self .queue .put ( # type: ignore
128- Event (
129- type = event_type ,
130- table = table ,
131- data = values ,
132- progress = {"start_lsn" : payload .get ("nextlsn" )},
133- )
134- )
100+ self .__handle_change (change , next_lsn )
101+
102+ # Always report success to the server to avoid a “disk full” condition.
103+ # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream
104+ msg .cursor .send_feedback (flush_lsn = msg .data_start )
105+
106+ def __handle_change (self , change : dict [str , Any ], next_lsn : str ):
107+ table = change .get ("table" )
108+ if table not in self .tables :
109+ return
110+
111+ columnnames = change .get ("columnnames" , [])
112+ columnvalues = change .get ("columnvalues" , [])
113+ columntypes = change .get ("columntypes" , [])
114+
115+ for i in range (len (columntypes )):
116+ if columntypes [i ] == "json" :
117+ columnvalues [i ] = json .loads (columnvalues [i ])
118+
119+ kind = change .get ("kind" )
120+ if kind == "update" :
121+ values = dict (zip (columnnames , columnvalues ))
122+ event_type = EventType .update
123+ elif kind == "delete" :
124+ values = (
125+ dict (zip (columnnames , columnvalues ))
126+ if columnvalues
127+ else {change ["oldkeys" ]["keynames" ][0 ]: change ["oldkeys" ]["keyvalues" ][0 ]}
135128 )
129+ event_type = EventType .delete
130+ elif kind == "insert" :
131+ values = dict (zip (columnnames , columnvalues ))
132+ event_type = EventType .create
133+ else :
134+ return
135+
136+ logger .debug (f'Creating event { event_type = } { values = } ' )
137+ event = Event (
138+ type = event_type ,
139+ table = table ,
140+ data = values ,
141+ progress = {"start_lsn" : next_lsn },
142+ )
143+ # schedule the task on the main event loop
144+ self ._loop .call_soon_threadsafe (self .queue .put_nowait , event )
136145
137146 async def get_count (self , sync : Sync ):
138147 with self .conn_dict .cursor () as cur :
@@ -141,7 +150,8 @@ async def get_count(self, sync: Sync):
141150 return ret [0 ]
142151
143152 async def __aiter__ (self ):
144- self .queue = Queue ()
153+ self .queue = asyncio .Queue ()
154+ self ._loop = asyncio .get_running_loop () # Store the running loop
145155 try :
146156 self .cursor .create_replication_slot (self .slot , output_plugin = "wal2json" )
147157 except psycopg2 .errors .DuplicateObject : # type: ignore
@@ -164,7 +174,9 @@ async def __aiter__(self):
164174 progress = {"start_lsn" : self .start_lsn },
165175 )
166176 while True :
167- yield await self .queue .get ()
177+ item = await self .queue .get ()
178+ logger .debug (f'Got item from queue { item = } ' )
179+ yield item
168180
169181 def _ping (self ):
170182 with self .conn_dict .cursor () as cur :
0 commit comments