Skip to content

Commit 3145d34

Browse files
committed
feat(apis): circuit breaker and rate_limiter
1 parent fd7398c commit 3145d34

File tree

2 files changed

+89
-76
lines changed

2 files changed

+89
-76
lines changed
Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,69 @@
1-
"""Circuit breaker for resilient API calls."""
1+
"""Circuit breaker for resilient API calls.
2+
3+
This module adapts the resilience module's circuit breaker implementations
4+
for use in the APIs module, maintaining backward compatibility.
5+
"""
26

37
from __future__ import annotations
48

5-
import asyncio
6-
import time
79
from collections.abc import Callable
810
from typing import Any
911

1012
from agentle.agents.apis.circuit_breaker_error import CircuitBreakerError
11-
from agentle.agents.apis.circuit_breaker_state import CircuitBreakerState
1213
from agentle.agents.apis.request_config import RequestConfig
14+
from agentle.resilience.circuit_breaker.in_memory_circuit_breaker import (
15+
InMemoryCircuitBreaker,
16+
)
1317

1418

1519
class CircuitBreaker:
16-
"""Circuit breaker implementation for resilient API calls."""
20+
"""
21+
Circuit breaker implementation for resilient API calls.
22+
23+
This wraps the resilience module's InMemoryCircuitBreaker to provide
24+
a simpler call-based API for endpoint usage.
25+
"""
1726

1827
def __init__(self, config: RequestConfig):
1928
self.config = config
20-
self.state = CircuitBreakerState.CLOSED
21-
self.failure_count = 0
22-
self.success_count = 0
23-
self.last_failure_time: float | None = None
24-
self._lock = asyncio.Lock()
29+
self._circuit_id = "default" # Single circuit per endpoint
30+
# Initialize the underlying circuit breaker from resilience module
31+
self._impl = InMemoryCircuitBreaker(
32+
failure_threshold=config.circuit_breaker_failure_threshold,
33+
recovery_timeout=config.circuit_breaker_recovery_timeout,
34+
half_open_success_threshold=config.circuit_breaker_success_threshold,
35+
enable_metrics=config.enable_metrics,
36+
)
2537

2638
async def call(self, func: Callable[[], Any]) -> Any:
27-
"""Execute function with circuit breaker protection."""
28-
async with self._lock:
29-
# Check if circuit is open
30-
if self.state == CircuitBreakerState.OPEN:
31-
# Check if we should transition to half-open
32-
if self.last_failure_time:
33-
elapsed = time.time() - self.last_failure_time
34-
if elapsed >= self.config.circuit_breaker_recovery_timeout:
35-
self.state = CircuitBreakerState.HALF_OPEN
36-
self.success_count = 0
37-
else:
38-
raise CircuitBreakerError(
39-
f"Circuit breaker is OPEN. Retry after {self.config.circuit_breaker_recovery_timeout - elapsed:.1f}s"
40-
)
39+
"""
40+
Execute function with circuit breaker protection.
41+
42+
Args:
43+
func: Async function to execute
44+
45+
Returns:
46+
Result of func call
47+
48+
Raises:
49+
CircuitBreakerError: If circuit is open
50+
"""
51+
# Check if circuit is open
52+
if await self._impl.is_open(self._circuit_id):
53+
# Get circuit state for more details
54+
state = await self._impl.get_circuit_state(self._circuit_id)
55+
next_retry_seconds = state.get("next_recovery_attempt_in_seconds", 0)
56+
57+
if next_retry_seconds > 0:
58+
raise CircuitBreakerError(
59+
f"Circuit breaker is OPEN. Retry after {next_retry_seconds:.1f}s"
60+
)
4161

4262
# Execute the function
4363
try:
4464
result = await func()
45-
await self._on_success()
65+
await self._impl.record_success(self._circuit_id)
4666
return result
4767
except Exception:
48-
await self._on_failure()
68+
await self._impl.record_failure(self._circuit_id)
4969
raise
50-
51-
async def _on_success(self) -> None:
52-
"""Handle successful call."""
53-
async with self._lock:
54-
if self.state == CircuitBreakerState.HALF_OPEN:
55-
self.success_count += 1
56-
if self.success_count >= self.config.circuit_breaker_success_threshold:
57-
self.state = CircuitBreakerState.CLOSED
58-
self.failure_count = 0
59-
elif self.state == CircuitBreakerState.CLOSED:
60-
self.failure_count = 0
61-
62-
async def _on_failure(self) -> None:
63-
"""Handle failed call."""
64-
async with self._lock:
65-
self.failure_count += 1
66-
self.last_failure_time = time.time()
67-
68-
if self.state == CircuitBreakerState.HALF_OPEN:
69-
# Failure in half-open state reopens circuit
70-
self.state = CircuitBreakerState.OPEN
71-
elif self.state == CircuitBreakerState.CLOSED:
72-
# Check if we've hit the failure threshold
73-
if self.failure_count >= self.config.circuit_breaker_failure_threshold:
74-
self.state = CircuitBreakerState.OPEN
Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,57 @@
1-
"""Rate limiter for API calls."""
1+
"""Rate limiter for API calls.
2+
3+
This module adapts the resilience module's rate limiter implementations
4+
for use in the APIs module, maintaining backward compatibility.
5+
"""
26

37
from __future__ import annotations
48

59
import asyncio
6-
import time
710

811
from agentle.agents.apis.request_config import RequestConfig
12+
from agentle.resilience.rate_limiting.rate_limit_config import RateLimitConfig
13+
from agentle.resilience.rate_limiting.in_memory_rate_limiter import (
14+
InMemoryRateLimiter as ResilienceRateLimiter,
15+
)
916

1017

1118
class RateLimiter:
12-
"""Rate limiter for API calls."""
19+
"""
20+
Rate limiter for API calls.
21+
22+
This wraps the resilience module's InMemoryRateLimiter to provide
23+
a simpler acquire-based API for endpoint usage.
24+
"""
1325

1426
def __init__(self, config: RequestConfig):
1527
self.config = config
16-
self.calls: list[float] = []
17-
self._lock = asyncio.Lock()
28+
self._identifier = "default" # Single rate limit per endpoint
29+
30+
# Convert rate limit config to resilience module format
31+
rate_limit_config: RateLimitConfig = {
32+
"max_requests_per_minute": int(
33+
config.rate_limit_calls * (60 / config.rate_limit_period)
34+
)
35+
if config.rate_limit_period <= 60
36+
else config.rate_limit_calls,
37+
}
38+
39+
# Initialize the underlying rate limiter from resilience module
40+
self._impl = ResilienceRateLimiter(
41+
default_config=rate_limit_config,
42+
enable_metrics=config.enable_metrics,
43+
)
1844

1945
async def acquire(self) -> None:
20-
"""Acquire rate limit slot, waiting if necessary."""
21-
async with self._lock:
22-
now = time.time()
23-
24-
# Remove old calls outside the window
25-
cutoff = now - self.config.rate_limit_period
26-
self.calls = [t for t in self.calls if t > cutoff]
27-
28-
# Check if we're at the limit
29-
if len(self.calls) >= self.config.rate_limit_calls:
30-
# Calculate wait time
31-
oldest_call = self.calls[0]
32-
wait_time = self.config.rate_limit_period - (now - oldest_call)
33-
if wait_time > 0:
34-
await asyncio.sleep(wait_time)
35-
# Recursively try again
36-
return await self.acquire()
37-
38-
# Record this call
39-
self.calls.append(now)
46+
"""
47+
Acquire rate limit slot, waiting if necessary.
48+
49+
This will block until a slot is available.
50+
"""
51+
# Wait until we can proceed
52+
while not await self._impl.can_proceed(self._identifier):
53+
# Wait a short time before checking again
54+
await asyncio.sleep(0.1)
55+
56+
# Record the request
57+
await self._impl.record_request(self._identifier)

0 commit comments

Comments
 (0)