A type-safe Go library for consuming messages from AWS SQS with support for custom message adapters, middleware chains, configurable worker pools, and OpenTelemetry observability.
- Type-safe message processing with Go generics
- Flexible message adapters - JSON adapter included, custom adapters supported
- Middleware support for cross-cutting concerns (logging, metrics, error handling)
- Configurable worker pools for polling and processing
- Built-in error handling with configurable thresholds
- OpenTelemetry observability - distributed tracing and metrics with automatic instrumentation
go get github.com/vmyroslav/sqs-go
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/vmyroslav/sqs-go/consumer"
)
type MyMessage struct {
ID string `json:"id"`
Body string `json:"body"`
}
func main() {
ctx := context.Background()
// Load AWS config
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
sqsClient := sqs.NewFromConfig(cfg)
queueURL := "https://sqs.region.amazonaws.com/account/queue-name"
// Create JSON message adapter
adapter := consumer.NewJSONMessageAdapter[MyMessage]()
// Configure consumer
cfg, err := consumer.NewConfig(queueURL)
if err != nil {
log.Fatal(err)
}
sqsConsumer := consumer.NewSQSConsumer[MyMessage](
cfg,
sqsClient,
adapter,
nil, // middleware
nil, // logger
)
// Define message handler
handler := consumer.HandlerFunc[MyMessage](func(ctx context.Context, msg MyMessage) error {
log.Printf("Processing message: %+v", msg)
return nil
})
// Start consuming
if err = sqsConsumer.Consume(ctx, queueURL, handler); err != nil {
log.Fatal(err)
}
}
Use functional options to configure the consumer:
config, err := consumer.NewConfig(queueURL,
consumer.WithProcessorWorkerPoolSize(10), // Number of worker goroutines for processing messages
consumer.WithPollerWorkerPoolSize(2), // Number of worker goroutines for polling SQS
consumer.WithMaxNumberOfMessages(10), // Max messages to receive in a single request (1-10)
consumer.WithWaitTimeSeconds(20), // Long polling wait time (0-20 seconds)
consumer.WithVisibilityTimeout(30), // Message visibility timeout in seconds
consumer.WithErrorNumberThreshold(5), // Max consecutive errors before stopping
consumer.WithGracefulShutdownTimeout(30), // Graceful shutdown timeout in seconds
)
if err != nil {
log.Fatal(err)
}
Use NewConfig(queueURL)
without options for defaults:
- 10 processor workers
- 2 poller workers
- 1-second long polling
- 30-second visibility timeout
- Graceful shutdown enabled
Enable OpenTelemetry-based observability with distributed tracing and metrics:
import (
"github.com/vmyroslav/sqs-go/consumer"
"github.com/vmyroslav/sqs-go/consumer/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/metric"
)
// Setup OpenTelemetry providers
traceExporter, _ := jaeger.New(jaeger.WithCollectorEndpoint())
tracerProvider := trace.NewTracerProvider(trace.WithBatcher(traceExporter))
metricExporter, _ := prometheus.New()
meterProvider := metric.NewMeterProvider(metric.WithReader(metricExporter))
// Configure observability
obsConfig := observability.NewConfig(
observability.WithServiceName("my-sqs-consumer"),
observability.WithServiceVersion("1.0.0"),
observability.WithTracerProvider(tracerProvider),
observability.WithMeterProvider(meterProvider),
)
// Create consumer with observability
config, err := consumer.NewConfig(queueURL,
consumer.WithObservability(obsConfig),
)
The library automatically collects these metrics:
- Message counters:
sqs_messages_total
(success/error status) - Processing duration:
sqs_processing_duration_seconds
- Polling duration:
sqs_polling_duration_seconds
- Active workers:
sqs_active_workers
(poller/processor) - Buffer size:
sqs_buffer_size
- Acknowledgment duration:
sqs_acknowledgment_duration_seconds
Traces are automatically created for:
- Poll operations: SQS message polling
- Message processing: End-to-end message handling
- Message transformation: Adapter operations
- Acknowledgment: Message ack/reject operations
Context propagation works automatically with SQS message attributes.
Observability is disabled by default. When disabled, noop providers are used with minimal performance impact.
For JSON messages, use the built-in JSON adapter:
type UserEvent struct {
UserID string `json:"user_id"`
Action string `json:"action"`
}
adapter := consumer.NewJSONMessageAdapter[UserEvent]()
Implement the MessageAdapter
interface for custom message formats:
type CustomAdapter struct{}
func (a CustomAdapter) Transform(ctx context.Context, msg sqstypes.Message) (MyType, error) {
// Custom message transformation logic
return MyType{
ID: *msg.MessageId,
Body: *msg.Body,
}, nil
}
// Or use MessageAdapterFunc for simple cases
adapter := consumer.MessageAdapterFunc[MyType](func(ctx context.Context, msg sqstypes.Message) (MyType, error) {
return MyType{
ID: *msg.MessageId,
Body: *msg.Body,
}, nil
})
Middleware enables cross-cutting concerns like logging, metrics, and error handling:
// Logging middleware
func loggingMiddleware[T any](logger *slog.Logger) consumer.Middleware[T] {
return func(next consumer.HandlerFunc[T]) consumer.HandlerFunc[T] {
return func(ctx context.Context, msg T) error {
logger.Info("Processing message", "message", msg)
err := next.Handle(ctx, msg)
if err != nil {
logger.Error("Failed to process message", "error", err)
}
return err
}
}
}
// Apply middleware
middlewares := []consumer.Middleware[MyMessage]{
loggingMiddleware[MyMessage](logger),
metricsMiddleware[MyMessage](),
}
sqsConsumer := consumer.NewSQSConsumer[MyMessage](
config, sqsClient, adapter, middlewares, logger,
)
For more examples including observability setups, see the /examples
directory:
examples/simple/
- Basic consumer usage with LocalStackexamples/json/
- JSON message processing with middlewareexamples/observability/
- Complete observability stack with OpenTelemetry
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
This project is licensed under the MIT License.