Skip to content

Sample Agent - IBM BeeAI Framework Integration (OpenAI & A2A Endpoints) #912

@crivetimihai

Description

@crivetimihai

Overview

Create a sample agent implementation using IBM BeeAI Framework with support for both OpenAI-compatible endpoints and A2A (Agent-to-Agent) integration, following the established pattern from CrewAI and LangChain agent implementations.

Agent Specifications

Agent Details

  • Name: beeai-agent
  • Framework: IBM BeeAI Framework
  • Location: agent_runtimes/beeai_agent/
  • Language: Python 3.11+ with TypeScript support
  • Purpose: Demonstrate enterprise-grade multi-agent systems via BeeAI Framework

BeeAI Framework Features

  • Multi-Language Support: Python and TypeScript implementations
  • Production-Grade: Enterprise-ready with Linux Foundation hosting
  • MCP Integration: Native MCP client capabilities
  • Multi-Agent Systems: Advanced agent collaboration patterns
  • Open Governance: Linux Foundation governance model

Integration Patterns

OpenAI-Compatible Endpoint Integration

# OpenAI API endpoint integration
POST /openai/v1/chat/completions
Content-Type: application/json
Authorization: Bearer ${OPENAI_API_KEY}

{
  "model": "gpt-4",
  "messages": [
    {"role": "user", "content": "Analyze this data using BeeAI multi-agent workflow"}
  ],
  "tools": [
    {
      "type": "function",
      "function": {
        "name": "beeai_analyze_data",
        "description": "Multi-agent data analysis workflow",
        "parameters": {
          "type": "object",
          "properties": {
            "data_source": {"type": "string"},
            "analysis_type": {"type": "string"},
            "collaboration_mode": {"type": "string"}
          }
        }
      }
    }
  ]
}

A2A (Agent-to-Agent) Integration

# A2A endpoint integration
POST /a2a/v1/agents/{agent_id}/invoke
Content-Type: application/json
Authorization: Bearer ${A2A_API_KEY}

{
  "agent_id": "beeai-multi-agent-system",
  "input": {
    "task": "Collaborative document analysis",
    "agents": ["analyst", "reviewer", "summarizer"],
    "workflow": "sequential_with_feedback"
  },
  "context": {
    "session_id": "session-123",
    "user_id": "user-456"
  }
}

Agent Capabilities

1. Multi-Agent Orchestration

  • Agent Coordination: Manage multiple specialized agents
  • Workflow Patterns: Sequential, parallel, hierarchical execution
  • Communication: Inter-agent message passing and state sharing
  • Conflict Resolution: Handle agent disagreements and conflicts

2. Production-Grade Features

  • Error Handling: Robust error recovery and retry mechanisms
  • Monitoring: Comprehensive agent performance tracking
  • Scaling: Horizontal scaling of agent systems
  • Security: Enterprise security and access controls

3. MCP Tool Integration

  • Native MCP Support: Direct integration with MCP servers
  • Tool Discovery: Automatic MCP tool discovery and registration
  • Context Management: Shared context across agent interactions
  • State Persistence: Agent state management and persistence

Implementation Requirements

Directory Structure

agent_runtimes/beeai_agent/
├── src/
│   ├── __init__.py
│   ├── main.py                    # Main agent runtime
│   ├── beeai_integration/
│   │   ├── __init__.py
│   │   ├── framework_client.py    # BeeAI Framework integration
│   │   ├── multi_agent_system.py  # Multi-agent orchestration
│   │   └── workflow_engine.py     # Agent workflow management
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── base_agent.py         # Base BeeAI agent class
│   │   ├── analyst_agent.py      # Data analysis agent
│   │   ├── researcher_agent.py   # Research and information gathering
│   │   └── coordinator_agent.py  # Agent coordination and management
│   ├── endpoints/
│   │   ├── __init__.py
│   │   ├── openai_handler.py     # OpenAI-compatible endpoint
│   │   └── a2a_handler.py        # A2A endpoint integration
│   ├── tools/
│   │   ├── __init__.py
│   │   ├── mcp_integration.py    # MCP tool integration
│   │   └── tool_registry.py      # Tool discovery and management
│   └── utils/
│       ├── __init__.py
│       ├── config.py            # Configuration management
│       └── monitoring.py        # Agent monitoring and metrics
├── tests/
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── config/
│   ├── agent_config.yaml
│   ├── workflow_config.yaml
│   └── integration_config.yaml
├── requirements.txt
├── requirements-dev.txt
├── README.md
├── Dockerfile
├── docker-compose.yml
└── examples/
    ├── multi_agent_analysis.py
    ├── workflow_examples.py
    └── mcp_integration.py

Core Dependencies

# requirements.txt
# BeeAI Framework
bee-agent-framework>=0.1.0  # or latest version
bee-agent-framework[typescript]

# Core dependencies
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.5.0
httpx>=0.25.0
websockets>=12.0
redis>=5.0.0

# MCP integration
mcp>=1.0.0

# OpenAI compatibility
openai>=1.3.0
tiktoken>=0.5.0

# Multi-agent coordination
celery>=5.3.0
kombu>=5.3.0

# Monitoring and observability
prometheus-client>=0.19.0
structlog>=23.2.0

Configuration

Agent Configuration

# config/agent_config.yaml
beeai:
  framework_version: "latest"
  runtime_mode: "production"  # development, staging, production
  
agents:
  analyst:
    type: "data_analyst"
    capabilities: ["data_processing", "statistical_analysis", "visualization"]
    model: "gpt-4"
    temperature: 0.1
    
  researcher:
    type: "research_assistant"
    capabilities: ["web_search", "document_analysis", "fact_checking"]
    model: "gpt-3.5-turbo"
    temperature: 0.3
    
  coordinator:
    type: "workflow_coordinator"
    capabilities: ["agent_management", "task_distribution", "result_aggregation"]
    model: "gpt-4"
    temperature: 0.0

workflows:
  collaborative_analysis:
    agents: ["researcher", "analyst", "coordinator"]
    pattern: "sequential_with_feedback"
    max_iterations: 3
    timeout: 300

mcp_integration:
  auto_discover_tools: true
  tool_timeout: 30
  max_concurrent_tools: 10

Endpoint Configuration

# config/integration_config.yaml
openai_endpoint:
  host: "0.0.0.0"
  port: 8001
  path_prefix: "/openai/v1"
  api_key_required: true
  rate_limiting:
    requests_per_minute: 100
    burst_size: 20

a2a_endpoint:
  host: "0.0.0.0" 
  port: 8002
  path_prefix: "/a2a/v1"
  authentication:
    type: "bearer_token"
    token_validation: true
  
monitoring:
  metrics_enabled: true
  prometheus_port: 8003
  log_level: "INFO"
  
redis:
  url: "redis://localhost:6379"
  db: 0
  key_prefix: "beeai_agent:"

Agent Workflows

Multi-Agent Data Analysis Workflow

# Example workflow implementation
class CollaborativeAnalysisWorkflow:
    def __init__(self):
        self.researcher = ResearcherAgent()
        self.analyst = AnalystAgent()
        self.coordinator = CoordinatorAgent()
    
    async def execute(self, task: str) -> Dict[str, Any]:
        # Phase 1: Research
        research_results = await self.researcher.gather_information(task)
        
        # Phase 2: Analysis  
        analysis_results = await self.analyst.analyze_data(
            data=research_results,
            context=task
        )
        
        # Phase 3: Coordination and synthesis
        final_results = await self.coordinator.synthesize_results(
            research=research_results,
            analysis=analysis_results
        )
        
        return final_results

BeeAI Framework Integration

# BeeAI Framework integration
from bee_agent_framework import Agent, MultiAgentSystem
from bee_agent_framework.integrations.mcp import MCPToolProvider

class BeeAIIntegration:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.multi_agent_system = MultiAgentSystem()
        self.mcp_tools = MCPToolProvider()
    
    async def create_agent_system(self) -> MultiAgentSystem:
        # Create specialized agents
        analyst = Agent(
            name="analyst",
            role="Data Analyst",
            tools=await self.mcp_tools.discover_tools("data_analysis")
        )
        
        researcher = Agent(
            name="researcher", 
            role="Research Assistant",
            tools=await self.mcp_tools.discover_tools("research")
        )
        
        # Add agents to system
        self.multi_agent_system.add_agent(analyst)
        self.multi_agent_system.add_agent(researcher)
        
        return self.multi_agent_system

OpenAI Integration

Chat Completions Handler

@app.post("/openai/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    """OpenAI-compatible chat completions endpoint"""
    
    # Convert OpenAI request to BeeAI workflow
    workflow_task = convert_openai_to_beeai(request)
    
    # Execute multi-agent workflow
    results = await beeai_system.execute_workflow(
        task=workflow_task,
        agents=determine_required_agents(request),
        workflow_type=request.get("workflow", "collaborative")
    )
    
    # Convert results back to OpenAI format
    return format_openai_response(results)

A2A Integration

Agent Invocation Handler

@app.post("/a2a/v1/agents/{agent_id}/invoke")
async def invoke_agent(agent_id: str, request: A2AInvokeRequest):
    """A2A agent invocation endpoint"""
    
    # Get agent configuration
    agent_config = get_agent_config(agent_id)
    
    # Execute BeeAI multi-agent workflow
    results = await beeai_system.execute_collaborative_task(
        task=request.input,
        agent_config=agent_config,
        context=request.context
    )
    
    return A2AResponse(
        agent_id=agent_id,
        output=results,
        metadata={
            "execution_time": results.get("execution_time"),
            "agents_used": results.get("agents_used"),
            "workflow_pattern": results.get("workflow_pattern")
        }
    )

Advanced Features

Enterprise Capabilities

  • High Availability: Multi-instance deployment with load balancing
  • Security: Role-based access control and audit logging
  • Compliance: Enterprise governance and regulatory compliance
  • Integration: Native MCP client with tool auto-discovery
  • Monitoring: Comprehensive observability and performance metrics

Multi-Agent Patterns

  • Hierarchical: Supervisor-worker agent relationships
  • Collaborative: Peer-to-peer agent collaboration
  • Pipeline: Sequential agent processing chains
  • Swarm: Distributed agent coordination
  • Federated: Cross-organization agent collaboration

Testing Strategy

Integration Tests

# Test OpenAI endpoint compatibility
async def test_openai_integration():
    client = AsyncOpenAI(base_url="http://localhost:8001/openai/v1")
    
    response = await client.chat.completions.create(
        model="beeai-multi-agent",
        messages=[{"role": "user", "content": "Analyze sales data trends"}],
        tools=[{"type": "function", "function": {"name": "beeai_workflow"}}]
    )
    
    assert response.choices[0].message.tool_calls is not None

# Test A2A endpoint
async def test_a2a_integration():
    response = await httpx.post(
        "http://localhost:8002/a2a/v1/agents/beeai-system/invoke",
        json={
            "input": {"task": "Multi-agent document analysis"},
            "context": {"user_id": "test-user"}
        }
    )
    
    assert response.status_code == 200
    assert "agents_used" in response.json()["metadata"]

Acceptance Criteria

  • Python agent runtime using IBM BeeAI Framework
  • OpenAI-compatible endpoint (/openai/v1/chat/completions)
  • A2A endpoint integration (/a2a/v1/agents/{agent_id}/invoke)
  • Multi-agent system orchestration and coordination
  • Native MCP client integration with tool discovery
  • Production-grade features (monitoring, scaling, security)
  • Agent workflow patterns (sequential, parallel, hierarchical)
  • Comprehensive configuration management
  • Docker containerization support
  • Integration tests for both endpoint types
  • Performance monitoring and observability
  • Complete documentation with workflow examples

Priority

High - Showcases IBM's enterprise-grade multi-agent framework

Dependencies

  • IBM BeeAI Framework (Python/TypeScript)
  • MCP client library
  • FastAPI for endpoint implementation
  • Redis for agent coordination
  • OpenAI client compatibility

Use Cases

  • Enterprise multi-agent workflows
  • Collaborative document analysis
  • Complex decision-making processes
  • Distributed problem solving
  • Agent-based automation systems
  • Research and analysis pipelines
  • Customer service automation
  • Business process orchestration

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAgent SamplesenhancementNew feature or requestoicOpen Innovation Community Contributions

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions