Comprehensive PPO implementation for crypto trading.
- Standard PPO: Clipped surrogate objective with GAE
- PPO2: Enhanced version with improvements
- Adaptive Clipping: Dynamic clipping parameters
- KL Penalty: Alternative to clipping for policy updates
- Natural Gradients: More principled policy updates
- Actor-Critic: Shared and separate network options
- Multi-Modal: Support for different input types
- CNN Integration: For price chart processing
- LSTM Support: For sequential data
- Attention Mechanisms: Multi-head attention for complex patterns
- Crypto-Specific: Specialized networks for trading
- GAE (Generalized Advantage Estimation): λ-return advantages
- TD(λ): Temporal difference learning with eligibility traces
- Multi-Step: N-step returns for robust estimation
- Adaptive: Dynamic λ parameter adjustment
- Risk-Adjusted: Risk-aware advantage computation
- Distributed Training: Multi-process data collection
- Asynchronous Rollouts: Parallel environment interaction
- Checkpointing: Automatic model saving and recovery
- Monitoring: Comprehensive performance tracking
- Fault Tolerance: Error recovery and worker management
- Multi-Asset: Support for multiple cryptocurrencies
- Risk Management: Position sizing and drawdown control
- Real-Time: Live trading decision making
- Backtesting: Historical data simulation
- Performance Analytics: Comprehensive trading metrics
# Install core dependencies
pip install torch torchvision gymnasium numpy pandas
# Install additional dependencies
pip install tensorboard wandb ray[rllib] psutil
# For development
pip install pytest black flake8 mypy isort pre-commit
ml-ppo/
├── src/
│ ├── core/ # Core PPO algorithms
│ │ ├── ppo.py # Standard PPO
│ │ └── ppo2.py # Enhanced PPO2
│ ├── networks/ # Neural network architectures
│ │ ├── actor_critic.py # Actor-critic networks
│ │ ├── policy_network.py # Policy networks
│ │ └── value_network.py # Value networks
│ ├── advantages/ # Advantage estimation
│ │ ├── gae.py # GAE implementation
│ │ └── td_lambda.py # TD(λ) methods
│ ├── optimization/ # Optimization techniques
│ │ ├── clipped_objective.py # Clipped objectives
│ │ └── kl_penalty.py # KL penalty methods
│ ├── buffers/ # Data storage
│ │ ├── rollout_buffer.py # Rollout buffer
│ │ └── trajectory_buffer.py # Trajectory buffer
│ ├── training/ # Training infrastructure
│ │ ├── ppo_trainer.py # Main trainer
│ │ └── distributed_ppo.py # Distributed training
│ ├── agents/ # Trading agents
│ │ └── ppo_trader.py # Crypto trading agent
│ ├── environments/ # Trading environments
│ │ └── crypto_env.py # Crypto trading env
│ └── utils/ # Utilities
│ ├── normalization.py # Data normalization
│ └── scheduling.py # Parameter scheduling
├── tests/ # Comprehensive tests
└── docs/ # Documentation
from src.core.ppo import PPOAlgorithm, PPOConfig
from src.networks.actor_critic import ActorCriticNetwork, ActorCriticConfig
from src.environments.crypto_env import CryptoTradingEnvironment
# Configure environment
env = CryptoTradingEnvironment()
# Configure network
network_config = ActorCriticConfig(
obs_dim=env.observation_space.shape[0],
action_dim=env.action_space.shape[0],
action_type="continuous"
)
actor_critic = ActorCriticNetwork(network_config)
# Configure PPO
ppo_config = PPOConfig(
learning_rate=3e-4,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2
)
ppo = PPOAlgorithm(actor_critic, ppo_config)
# Training loop
for episode in range(1000):
obs, _ = env.reset()
done = False
while not done:
with torch.no_grad():
action_dist, value = actor_critic(torch.tensor(obs).unsqueeze(0))
action = action_dist.sample()
obs, reward, done, _, info = env.step(action.numpy())from src.training.ppo_trainer import PPOTrainer, PPOTrainerConfig
# Configure training
config = PPOTrainerConfig(
total_timesteps=1_000_000,
rollout_steps=2048,
batch_size=64,
num_envs=4,
# Algorithm
algorithm="ppo",
learning_rate=3e-4,
# Environment
env_type="crypto_trading",
# Monitoring
use_wandb=True,
wandb_project="ppo-crypto-trading",
# Checkpointing
save_interval=100,
checkpoint_dir="./checkpoints"
)
# Create trainer
trainer = PPOTrainer(config)
# Start training
results = trainer.train()
print(f"Training completed!")
print(f"Final reward: {results['final_reward']:.2f}")
print(f"Training time: {results['total_time']:.1f} seconds")from src.training.distributed_ppo import DistributedPPOTrainer, DistributedPPOConfig
# Configure distributed training
config = DistributedPPOConfig(
total_timesteps=5_000_000,
world_size=4,
rollout_workers=8,
async_rollouts=True,
# Performance optimization
use_gpu=True,
gradient_compression=True,
# Fault tolerance
auto_restart_failed_workers=True,
max_worker_failures=3
)
# Create distributed trainer
trainer = DistributedPPOTrainer(config)
# Start distributed training
results = trainer.train()from src.agents.ppo_trader import PPOTrader, PPOTraderConfig
# Configure trading agent
config = PPOTraderConfig(
assets=["BTC", "ETH", "BNB"],
max_position_size=0.3,
stop_loss_threshold=0.05,
take_profit_threshold=0.10,
# Risk management
max_drawdown=0.15,
portfolio_heat=0.02,
# Model
model_path="./models/ppo_crypto_model.pt",
# Real-time trading
realtime_updates=True,
update_frequency=60
)
# Create trader
trader = PPOTrader(config)
# Get trading decision
decision = trader.get_trading_decision("BTC", current_price=50000.0)
if decision["action"] != "hold":
success = trader.execute_trading_decision("BTC", decision)
if success:
print(f"Executed {decision['action']} for BTC")from src.advantages.gae import GAE, GAEConfig
# Configure GAE
gae_config = GAEConfig(
gamma=0.99,
gae_lambda=0.95,
normalize_advantages=True,
adaptive_lambda=True # Dynamic λ adjustment
)
gae = GAE(gae_config)
# Compute advantages
advantages, returns = gae.compute_advantages_and_returns(
rewards=torch.tensor(episode_rewards),
values=torch.tensor(episode_values),
dones=torch.tensor(episode_dones)
)@dataclass
class PPOConfig:
# Core parameters
learning_rate: float = 3e-4
gamma: float = 0.99
gae_lambda: float = 0.95
clip_range: float = 0.2
# Training
n_epochs: int = 10
batch_size: int = 64
max_grad_norm: float = 0.5
# Regularization
ent_coef: float = 0.01
vf_coef: float = 0.5
# Advanced
target_kl: Optional[float] = 0.01
normalize_advantage: bool = True@dataclass
class ActorCriticConfig:
# Architecture
shared_backbone: bool = True
hidden_dims: List[int] = field(default_factory=lambda: [256, 256])
activation: str = "tanh"
# Input/Output
obs_dim: int = 64
action_dim: int = 4
action_type: str = "continuous"
# CNN (for price charts)
use_cnn: bool = False
cnn_channels: List[int] = field(default_factory=lambda: [32, 64, 64])
# LSTM (for sequences)
use_lstm: bool = False
lstm_hidden_size: int = 128
# Attention
use_attention: bool = False
attention_heads: int = 8
# Multi-asset
multi_asset: bool = False
num_assets: int = 1@dataclass
class CryptoEnvConfig:
# Trading parameters
initial_balance: float = 10000.0
assets: List[str] = field(default_factory=lambda: ["BTC", "ETH"])
transaction_cost: float = 0.001
max_position_size: float = 1.0
# Market dynamics
volatility_factor: float = 1.0
trend_strength: float = 0.1
# Risk management
liquidation_threshold: float = 0.8
max_drawdown_limit: float = 0.5
# Features
include_technical_indicators: bool = Trueconfig = PPOTrainerConfig(
use_wandb=True,
wandb_project="crypto-ppo-trading",
wandb_entity="your-team",
wandb_tags=["ppo", "crypto", "production"]
)# Automatic logging of training metrics
# View with: tensorboard --logdir ./runsThe system tracks comprehensive metrics:
- Training: Loss, KL divergence, clip fraction, entropy
- Environment: Episode reward, length, success rate
- Trading: Portfolio value, Sharpe ratio, max drawdown
- System: FPS, memory usage, GPU utilization
# Run all tests
python -m pytest tests/ -v
# Run specific test categories
python -m pytest tests/test_ppo.py::TestPPOCore -v
# Run with coverage
python -m pytest tests/ --cov=src --cov-report=html
# Performance benchmarks
python -m pytest tests/test_ppo.py::TestPerformance -v
from src.core.ppo2 import PPO2Algorithm, PPO2Config
config = PPO2Config(
adaptive_clipping=True,
use_multi_step=True,
dynamic_lambda=True,
mixed_precision=True # Faster training
)
ppo2 = PPO2Algorithm(actor_critic, config)from src.advantages.gae import RiskAdjustedGAE
risk_gae = RiskAdjustedGAE(
config=gae_config,
risk_adjustment_factor=0.1,
volatility_window=20
)
advantages, returns = risk_gae.compute_advantages_and_returns(
rewards, values, dones,
prices=price_data,
positions=position_data
)from src.optimization.kl_penalty import NaturalPolicyGradientKL
natural_ppo = NaturalPolicyGradientKL(kl_config)
natural_gradient = natural_ppo.compute_natural_policy_gradient(
policy_gradient, action_dist
)config = PPOTrainerConfig(
device="cuda",
pin_memory=True,
non_blocking_transfer=True
)config = PPO2Config(
mixed_precision=True,
gradient_accumulation_steps=4
)config = DistributedPPOConfig(
world_size=8,
backend="nccl",
gradient_compression=True
)# Save trained model
trainer.save_model("./models/ppo_crypto.pt")
# Load for inference
trader = PPOTrader(config)
trader.load_model("./models/ppo_crypto.pt")
# Real-time trading
async def trading_loop():
while True:
for asset in assets:
decision = trader.get_trading_decision(asset, get_current_price(asset))
if decision["action"] != "hold":
execute_trade(asset, decision)
await asyncio.sleep(60) # Update every minuteFROM pytorch/pytorch:2.0.1-cuda11.7-runtime
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
COPY models/ ./models/
CMD ["python", "-m", "src.agents.ppo_trader"]
config = PPOTraderConfig(
monitoring_enabled=True,
alert_thresholds={
"max_loss": -0.10,
"max_drawdown": -0.15,
"min_sharpe": 0.5
}
)- Fork the repository
- Create a feature branch:
git checkout -b feature/new-feature - Make your changes and add tests
- Run tests:
python -m pytest tests/ - Run linting:
black src/ tests/ && flake8 src/ tests/ - Commit changes:
git commit -am 'Add new feature' - Push to branch:
git push origin feature/new-feature - Create a Pull Request
- Proximal Policy Optimization Algorithms - Schulman et al., 2017
- High-Dimensional Continuous Control Using Generalized Advantage Estimation - Schulman et al., 2016
- Implementation Matters in Deep RL - Engstrom et al., 2020
- Enterprise-grade error handling and recovery
- Production monitoring and alerting
- Scalable distributed training
- Performance optimization techniques
MIT License - see LICENSE file for details.
- v1.0.0 - Initial release with core PPO implementation
- v1.1.0 - Added PPO2 with advanced features
- v1.2.0 - Crypto trading integration
- v1.3.0 - Distributed training support
- v1.4.0 - Production deployment features
- Start with smaller networks and scale up
- Use learning rate scheduling for better convergence
- Monitor KL divergence to detect training instability
- Adjust clip range based on policy update magnitude
- Use GAE λ parameter to balance bias/variance
- Start with paper trading to validate strategies
- Implement proper risk management from day one
- Monitor portfolio heat and correlation
- Use multiple timeframes for better decisions
- Backtest thoroughly before live deployment
- Use GPU acceleration for faster training
- Enable mixed precision for memory efficiency
- Use distributed training for large-scale experiments
- Profile your code to identify bottlenecks
- Cache frequently accessed data
Built for the crypto trading community.
For questions and support, please open an issue on GitHub.