π Forked from tmc/langgraphgo - Enhanced with streaming, visualization, observability, and production-ready features.
go get github.com/paulnegz/langgraphgo- LangChain Compatible - Works with OpenAI, Anthropic, Google AI, and more
- Graph Visualization - Export as Mermaid, DOT, or ASCII diagrams
- Real-time Streaming - Live progress updates with event listeners
- State Checkpointing - Pause and resume execution
- Langfuse Integration - Automatic observability and tracing for workflows
- Production Ready - Error handling, tracing, metrics, and backpressure
// Simple LLM pipeline
g := graph.NewMessageGraph()
g.AddNode("generate", func(ctx context.Context, state interface{}) (interface{}, error) {
messages := state.([]llms.MessageContent)
response, _ := model.GenerateContent(ctx, messages)
return append(messages, llms.TextParts("ai", response.Choices[0].Content)), nil
})
g.AddEdge("generate", graph.END)
g.SetEntryPoint("generate")
// Compile and run
runnable, _ := g.Compile()
result, _ := runnable.Invoke(ctx, initialState)- Basic LLM - Simple LangChain integration
- RAG Pipeline - Complete retrieval-augmented generation
- Streaming - Real-time progress updates
- Conditional Routing - Dynamic path selection
- Checkpointing - Save and resume state
- Visualization - Export graph diagrams
- Listeners - Progress, metrics, and logging
- Subgraphs - Nested graph composition
%%{init: {'theme':'dark'}}%%
flowchart TD
START(["π START"])
query[["π Query Classifier"]]
retrieve["π Retrieve Docs"]
rerank["π― Rerank"]
check{"β
Relevance?"}
generate["π€ Generate"]
fallback["π Web Search"]
format["π Format"]
END(["β
END"])
START --> query --> retrieve --> rerank --> check
check -->|>0.7| generate
check -->|β€0.7| fallback --> generate
generate --> format --> END
style START fill:#90EE90,stroke:#fff,stroke-width:2px
style END fill:#FFB6C1,stroke:#fff,stroke-width:2px
linkStyle default stroke:#fff,stroke-width:2px
exporter := graph.NewGraphExporter(g)
mermaid := exporter.DrawMermaid() // Mermaid diagram
dot := exporter.DrawDOT() // Graphviz DOT
ascii := exporter.DrawASCII() // Terminal outputg.AddConditionalEdge("router", func(ctx context.Context, state interface{}) string {
if state.(Task).Priority == "high" {
return "urgent_handler"
}
return "normal_handler"
})g := graph.NewCheckpointableMessageGraph()
g.SetCheckpointConfig(graph.CheckpointConfig{
Store: graph.NewMemoryCheckpointStore(),
AutoSave: true,
})progress := graph.NewProgressListener().WithTiming(true)
metrics := graph.NewMetricsListener()
node.AddListener(progress)
node.AddListener(metrics)// Create a callback handler (e.g., for Langfuse tracing)
config := &graph.Config{
Callbacks: []graph.CallbackHandler{myCallbackHandler},
Tags: []string{"operation-name"},
Metadata: map[string]interface{}{
"user_id": "user123",
"session_id": "session456",
},
}
// Invoke with callbacks for automatic tracing
result, _ := runnable.InvokeWithConfig(ctx, initialState, config)// With a Langfuse callback adapter (see langfuse-go for implementation)
import langfuseCallbacks "github.com/paulnegz/langfuse-go/langchain"
handler := langfuseCallbacks.NewCallbackHandler()
handler.SetTraceParams("my-operation", "user123", "session456", metadata)
config := &graph.Config{
Callbacks: []graph.CallbackHandler{handler},
}
result, _ := runnable.InvokeWithConfig(ctx, initialState, config)- Graph Operations: ~14-94ΞΌs depending on format
- Tracing Overhead: ~4ΞΌs per execution
- Event Processing: 1000+ events/second
- Streaming Latency: <100ms
go test ./graph -v # Run tests
go test ./graph -bench=. # Run benchmarks- Core Graph - Basic graph operations
- Streaming - Real-time events
- Listeners - Event handlers
- Checkpointing - State persistence
- Visualization - Export formats
- Tracing - Execution tracing infrastructure
This fork enhances tmc/langgraphgo with production features while maintaining API compatibility.
MIT License - see original repository for details.