@@ -112,9 +112,22 @@ async fn main() -> Result<()> {
112112 } ;
113113 state. pool . get ( ) . await ?. batch_execute ( SCHEMA ) . await ?;
114114
115- tokio:: spawn ( update_daily_user_queries ( state. pool . clone ( ) ) ) ;
116- tokio:: spawn ( update_wl_daily_user_queries ( state. pool . clone ( ) ) ) ;
117- tokio:: spawn ( cleanup_user_queries ( state. pool . clone ( ) ) ) ;
115+ let pool = state. pool . clone ( ) ;
116+ tokio:: spawn ( async move {
117+ loop {
118+ let pool = pool. clone ( ) ;
119+ let res = tokio:: spawn ( async move {
120+ cleanup_user_queries ( & pool) . await . unwrap ( ) ;
121+ update_daily_user_queries ( & pool) . await . unwrap ( ) ;
122+ update_wl_daily_user_queries ( & pool) . await . unwrap ( ) ;
123+ } )
124+ . await ;
125+ if let Err ( e) = res {
126+ tracing:: error!( "updating usage task error: {}" , e) ;
127+ }
128+ tokio:: time:: sleep ( Duration :: from_secs ( 30 ) ) . await ;
129+ }
130+ } ) ;
118131
119132 let listener = tokio:: net:: TcpListener :: bind ( "0.0.0.0:8001" ) . await ?;
120133 axum:: serve ( listener, service ( state) ) . await ?;
@@ -252,105 +265,108 @@ fn service(state: web::State) -> IntoMakeServiceWithConnectInfo<Router, SocketAd
252265 . into_make_service_with_connect_info :: < SocketAddr > ( )
253266}
254267
255- #[ tracing:: instrument( skip_all) ]
256- async fn update_daily_user_queries ( pool : deadpool_postgres:: Pool ) {
257- loop {
258- let pool = pool. clone ( ) ;
259- let result = tokio:: spawn ( async move {
260- pool. get ( )
261- . await
262- . expect ( "getting pg from pool" )
263- . query (
264- "
265- insert into daily_user_queries (owner_email, day, n, updated_at)
266- select
267- k.owner_email,
268- date_trunc('day', q.created_at)::date as day,
269- sum(qty)::int8,
270- now()
271- from user_queries q
272- join api_keys k on q.api_key = k.secret
273- where q.created_at >= date_trunc('day', now())
274- and q.created_at < date_trunc('day', now() + interval '1 day')
275- group by k.owner_email, date_trunc('day', q.created_at)::date
276- on conflict (owner_email, day)
277- do update set n = excluded.n, updated_at = excluded.updated_at;
278- " ,
279- & [ ] ,
280- )
281- . await
282- . expect ( "updating records" ) ;
283- } )
284- . await ;
285- match result {
286- Ok ( _) => tracing:: info!( "updated" ) ,
287- Err ( e) => tracing:: error!( "{}" , e) ,
288- }
289- tokio:: time:: sleep ( Duration :: from_secs ( 30 ) ) . await ;
290- }
268+ #[ tracing:: instrument( skip_all, fields( count, qty) ) ]
269+ async fn update_daily_user_queries (
270+ pool : & deadpool_postgres:: Pool ,
271+ ) -> Result < ( i64 , i64 ) , shared:: Error > {
272+ let row = pool
273+ . get ( )
274+ . await ?
275+ . query_one (
276+ "
277+ with one_day as (
278+ select
279+ q.api_key,
280+ k.owner_email,
281+ date_trunc('day', q.created_at)::date as day,
282+ q.qty
283+ from user_queries q
284+ join api_keys k on q.api_key = k.secret
285+ where q.created_at >= date_trunc('day', now())
286+ and q.created_at < date_trunc('day', now() + interval '1 day')
287+ ),
288+ aggregated as (
289+ select owner_email, day, sum(qty)::int8 as total_qty
290+ from one_day
291+ group by owner_email, day
292+ ),
293+ upserted as (
294+ insert into daily_user_queries (owner_email, day, n, updated_at)
295+ select owner_email, day, total_qty, now()
296+ from aggregated
297+ on conflict (owner_email, day)
298+ do update set n = excluded.n, updated_at = excluded.updated_at
299+ )
300+ select count(*)::int8, coalesce(sum(qty), 0)::int8
301+ from one_day
302+ " ,
303+ & [ ] ,
304+ )
305+ . await ?;
306+ let ( count, qty) = ( row. get ( 0 ) , row. get ( 1 ) ) ;
307+ tracing:: Span :: current ( )
308+ . record ( "count" , count)
309+ . record ( "qty" , qty) ;
310+ Ok ( ( count, qty) )
291311}
292312
293- #[ tracing:: instrument( skip_all) ]
294- async fn update_wl_daily_user_queries ( pool : deadpool_postgres:: Pool ) {
295- loop {
296- let pool = pool. clone ( ) ;
297- let result = tokio:: spawn ( async move {
298- pool. get ( )
299- . await
300- . expect ( "getting pg from pool" )
301- . query (
302- "
303- insert into wl_daily_user_queries (provision_key, org, day, n, updated_at)
304- select
305- k.provision_key,
306- k.org,
307- date_trunc('day', q.created_at)::date as day,
308- sum(qty)::int8,
309- now()
310- from user_queries q
311- join wl_api_keys k on q.api_key = k.secret
312- where q.created_at >= date_trunc('day', now())
313- and q.created_at < date_trunc('day', now() + interval '1 day')
314- group by k.provision_key, k.org, date_trunc('day', q.created_at)::date
315- on conflict (provision_key, org, day)
316- do update set n = excluded.n, updated_at = excluded.updated_at;
317- " ,
318- & [ ] ,
319- )
320- . await
321- . expect ( "updating records" ) ;
322- } )
323- . await ;
324- match result {
325- Ok ( _) => tracing:: info!( "updated" ) ,
326- Err ( e) => tracing:: error!( "{}" , e) ,
327- }
328- tokio:: time:: sleep ( Duration :: from_secs ( 30 ) ) . await ;
329- }
313+ #[ tracing:: instrument( skip_all, fields( count, qty) ) ]
314+ async fn update_wl_daily_user_queries (
315+ pool : & deadpool_postgres:: Pool ,
316+ ) -> Result < ( i64 , i64 ) , shared:: Error > {
317+ let row = pool
318+ . get ( )
319+ . await ?
320+ . query_one (
321+ "
322+ with one_day as (
323+ select
324+ k.provision_key,
325+ k.org,
326+ date_trunc('day', q.created_at)::date as day,
327+ q.qty
328+ from user_queries q
329+ join wl_api_keys k on q.api_key = k.secret
330+ where q.created_at >= date_trunc('day', now())
331+ and q.created_at < date_trunc('day', now() + interval '1 day')
332+ ),
333+ aggregated as (
334+ select provision_key, org, day, sum(qty)::int8 as total_qty
335+ from one_day
336+ group by provision_key, org, day
337+ ),
338+ upserted as (
339+ insert into wl_daily_user_queries (provision_key, org, day, n, updated_at)
340+ select provision_key, org, day, total_qty, now()
341+ from aggregated
342+ on conflict (provision_key, org, day)
343+ do update set n = excluded.n, updated_at = excluded.updated_at
344+ )
345+ select count(*)::int8, coalesce(sum(qty), 0)::int8
346+ from one_day;
347+ " ,
348+ & [ ] ,
349+ )
350+ . await ?;
351+ let ( count, qty) = ( row. get ( 0 ) , row. get ( 1 ) ) ;
352+ tracing:: Span :: current ( )
353+ . record ( "count" , count)
354+ . record ( "qty" , qty) ;
355+ Ok ( ( count, qty) )
330356}
331357
332- #[ tracing:: instrument( skip_all) ]
333- async fn cleanup_user_queries ( pool : deadpool_postgres:: Pool ) {
334- loop {
335- let pool = pool. clone ( ) ;
336- let task = tokio:: spawn ( async move {
337- pool. get ( )
338- . await
339- . expect ( "getting pg from pool" )
340- . execute (
341- "delete from user_queries where created_at < now() - '45 days'::interval" ,
342- & [ ] ,
343- )
344- . await
345- . expect ( "deleting records" ) ;
346- } )
347- . await ;
348- match task {
349- Ok ( _) => tracing:: info!( "updated" ) ,
350- Err ( e) => tracing:: error!( "{}" , e) ,
351- }
352- tokio:: time:: sleep ( Duration :: from_secs ( 30 ) ) . await ;
353- }
358+ #[ tracing:: instrument( skip_all, fields( count) ) ]
359+ async fn cleanup_user_queries ( pool : & deadpool_postgres:: Pool ) -> Result < u64 , shared:: Error > {
360+ let n = pool
361+ . get ( )
362+ . await ?
363+ . execute (
364+ "delete from user_queries where created_at < now() - '45 days'::interval" ,
365+ & [ ] ,
366+ )
367+ . await ?;
368+ tracing:: Span :: current ( ) . record ( "count" , n) ;
369+ Ok ( n)
354370}
355371
356372#[ cfg( test) ]
0 commit comments