-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
71 lines (61 loc) · 2.25 KB
/
Copy pathmain.py
File metadata and controls
71 lines (61 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse
from src.pipeline.crypto_pipeline import (
run_coins_pipeline,
run_history_pipeline,
run_csv_pipeline
)
from src.scheduler import start_scheduler, stop_scheduler, get_jobs_status
from src.config import config
from src.logger import get_logger
logger = get_logger("main")
@asynccontextmanager
async def lifespan(app: FastAPI):
start_scheduler()
yield
stop_scheduler()
app = FastAPI(
title="Crypto ETL API",
description="API para disparar y monitorear pipelines ETL de criptomonedas.",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/scheduler/status")
def scheduler_status():
"""Muestra los jobs activos y su próxima ejecución."""
return {"jobs": get_jobs_status()}
@app.post("/pipeline/coins")
def run_coins(limit: int = Query(default=20, ge=1, le=250)):
"""Ejecuta el pipeline de precios actuales."""
logger.info(f"Ejecutando pipeline de coins (limit={limit})")
result = run_coins_pipeline(limit)
return JSONResponse(content=result.to_dict())
@app.post("/pipeline/history")
def run_history(coins: str = Query(default="bitcoin,ethereum,solana")):
"""Ejecuta el pipeline histórico. Pasá los coins separados por coma."""
coin_list = [c.strip() for c in coins.split(",")]
logger.info(f"Ejecutando pipeline histórico para {coin_list}")
result = run_history_pipeline(coin_list)
return JSONResponse(content=result.to_dict())
@app.post("/pipeline/csv")
def run_csv(filepath: str = Query(...), table: str = Query(...)):
"""Ejecuta el pipeline para cargar un CSV a PostgreSQL."""
logger.info(f"Ejecutando pipeline CSV: {filepath} → {table}")
result = run_csv_pipeline(filepath, table)
return JSONResponse(content=result.to_dict())
@app.get("/pipeline/outputs")
def list_outputs():
"""Lista los archivos generados en data/output."""
import os
try:
files = os.listdir("data/output")
return {"files": files}
except Exception:
return {"files": []}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=config.API_PORT, reload=True)