README
ΒΆ
TaskQueue-Go
A production-ready distributed background task processing system built with Go, PostgreSQL, and Kubernetes.
Key Features:
- β Priority-based task scheduling with FIFO ordering
- β Automatic retries with exponential backoff
- β Concurrent task processing with worker pool (5 goroutines/instance)
- β Real-time monitoring dashboard with Server-Sent Events
- β Distributed lock-based task claiming (prevents duplicate execution)
- β Full Kubernetes deployment support with Helm
- β Comprehensive integration tests (11/11 passing)
- β Row-level locking with PostgreSQL FOR UPDATE SKIP LOCKED
π Table of Contents
- About
- Architecture
- Design Choices
- Quick Start
- Testing
- API Reference
- Configuration
- Contributing
- License
π About
TaskQueue-Go is a production-ready distributed task queue system designed for reliability, scalability, and ease of use. Built with Go and PostgreSQL, it provides robust background job processing with automatic retries, real-time monitoring, and horizontal scalability.
Perfect for:
- Background job processing (emails, reports, data processing)
- Scheduled tasks and cron-like operations
- Distributed workflows requiring retry logic
- Systems needing audit trails and task history
- Microservices requiring asynchronous task processing
Why TaskQueue-Go?
- π Production-Ready: Comprehensive error handling, monitoring, and logging
- π Reliable: PostgreSQL-backed with ACID guarantees and row-level locking
- π Scalable: Horizontal scaling with multiple worker instances
- π― Developer-Friendly: Clear API, excellent documentation, easy deployment
- π§ͺ Well-Tested: 11 comprehensive integration tests covering all scenarios
ποΈ Architecture
System Overview
ββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β β β β β β
β API Server ββββββββββΆβ PostgreSQL βββββββββββ Workers β
β (Producer) β POST β (Durable Queue) β CLAIM β (Consumers) β
β β Tasks β β Tasks β β
ββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
Components
| Component | Purpose | Technology |
|---|---|---|
| API Server | REST API for task management | Go, Chi router |
| PostgreSQL | Durable task queue and state storage | PostgreSQL 16 |
| Workers | Execute tasks with retry logic | Go, worker pool |
| Dashboard | Real-time monitoring UI | HTML/JS with SSE |
How It Works
- Client submits task via REST API (
POST /api/tasks) - API Server validates and stores task in PostgreSQL with status
queued - Worker polls database using
SELECT FOR UPDATE SKIP LOCKED - Worker claims task (sets
locked_untiltimestamp) - Worker executes task handler
- Worker updates status to
succeededor schedules retry if failed - Dashboard displays real-time updates via Server-Sent Events
π― Design Choices
. Dispatcher Pattern
Problem: Multiple workers polling database creates "thundering herd"
Solution: Single dispatcher goroutine + worker pool
Worker Process:
Dispatcher (1 goroutine) ββpolls DBββ> Claims 1 task
β
βββ> Buffered Channel (size 10)
β
βββ> Worker Pool (5 goroutines)
ββ> Worker 1
ββ> Worker 2
ββ> Worker 3
ββ> Worker 4
ββ> Worker 5
Benefits:
- 1 DB query instead of 50 per poll cycle
- Buffered channel provides backpressure
- No worker starvation - dispatcher ensures fair distribution
3. Exponential Backoff with Jitter
Formula:
backoff = min(2^retry_count, 2^20) seconds Β± 25% jitter
minimum = 1 second
Example retry schedule:
- Attempt 1: ~1-1.5s delay
- Attempt 2: ~2-3s delay
- Attempt 3: ~4-6s delay
- Attempt 4: Max retries reached, task marked
failed
Why jitter?
- Prevents retry storms (many tasks retrying simultaneously)
- Spreads load over time instead of synchronized spikes
4. Lock Expiration
Problem: Worker crashes while holding lock β task stuck forever
Solution: 30-second lock timeout
WHERE (status = 'queued')
OR (status = 'running' AND locked_until < NOW())
ORDER BY
CASE WHEN status = 'running' THEN 0 ELSE 1 END, -- Expired locks first
priority DESC,
created_at ASC
Why prioritize expired locks?
- Prevents task starvation
- Failed workers don't block the queue
- Respects original priority after recovery
5. SELECT FOR UPDATE SKIP LOCKED
Problem: Multiple workers trying to claim same task
Solution: PostgreSQL row-level locking with SKIP LOCKED
SELECT * FROM tasks
WHERE status = 'queued'
ORDER BY priority DESC
LIMIT 1
FOR UPDATE SKIP LOCKED
How it works:
- Worker A locks row β Worker B skips it, tries next row
- Zero contention between workers
- No deadlocks or retries needed
π Quick Start
Prerequisites
- Docker & Docker Compose
- Go 1.21+ (for local development)
- Node.js 18+ (for integration tests)
- kubectl, kind & Helm (for Kubernetes testing)
Run with Docker Compose
# Start all services
docker-compose up -d
# Wait for startup
sleep 15
# Verify health
curl http://localhost:8080/health
# Open dashboard
open http://localhost:8080
Run with Kubernetes (kind)
# Deploy to kind cluster with Bitnami PostgreSQL Helm chart
make run-k8s
# Automatically:
# - Creates kind cluster with port mappings
# - Deploys PostgreSQL via Bitnami Helm chart
# - Deploys server (2 replicas) and worker (3 replicas)
# - Configures NodePort service for external access
# Access the application
open http://localhost:8080
# View deployment status
kubectl get pods -n task-queue
# Check logs
kubectl logs -f deployment/task-queue-server -n task-queue
kubectl logs -f deployment/task-queue-worker -n task-queue
For detailed Kubernetes documentation, see k8s/README.md.
π§ͺ Testing
Run All Integration Tests
# Test Docker Compose deployment
make test-integration-docker
# Test Kubernetes deployment
make test-integration-k8s
Test Results
All 11 integration tests pass successfully:
| Test | Description |
|---|---|
| Basic lifecycle | Task creation β execution β completion |
| Priority ordering | High-priority tasks processed first |
| Task failures | Retry logic and backoff |
| Failure types | Errors vs timeouts |
| Timeout retries | Slow tasks retry correctly |
| Retry history | Complete audit trail |
| Concurrent processing | 20 tasks with 5 workers |
| Statistics API | Metrics accuracy |
| Invalid task type | Error handling |
| Missing task | 404 responses |
Test Coverage:
- β Task lifecycle (create, queue, run, complete)
- β Priority scheduling
- β Retry logic with exponential backoff
- β Timeout handling
- β Concurrent processing (no race conditions)
- β Error handling
- β Real-time statistics
Manual Testing
# Create a task
curl -X POST http://localhost:8080/api/tasks \
-H "Content-Type: application/json" \
-d '{
"name": "Test Email",
"type": "send_email",
"priority": 5,
"payload": {
"to": "[email protected]",
"subject": "Test",
"body": "This is a test"
}
}'
# Get task status
curl http://localhost:8080/api/tasks/{task_id}
# Get task history
curl http://localhost:8080/api/tasks/{task_id}/history
# View statistics
curl http://localhost:8080/api/stats
π‘ API Reference
Create Task
POST /api/tasks
{
"name": "Send Welcome Email",
"type": "send_email",
"priority": 5,
"payload": {
"to": "[email protected]",
"subject": "Welcome!",
"body": "Thanks for signing up."
}
}
Response:
{
"id": "uuid",
"name": "Send Welcome Email",
"type": "send_email",
"status": "queued",
"priority": 5,
"retry_count": 0,
"max_retries": 3,
"created_at": "2025-12-06T10:00:00Z"
}
Get Task
GET /api/tasks/:id
Response:
{
"id": "uuid",
"status": "succeeded",
"retry_count": 1,
"started_at": "2025-12-06T10:00:05Z",
"completed_at": "2025-12-06T10:00:15Z"
}
Get Task History
GET /api/tasks/:id/history
Response:
[
{
"event_type": "task_queued",
"status": "queued",
"created_at": "2025-12-06T10:00:00Z"
},
{
"event_type": "task_started",
"status": "running",
"worker_id": "worker-123",
"created_at": "2025-12-06T10:00:05Z"
},
{
"event_type": "task_succeeded",
"status": "succeeded",
"created_at": "2025-12-06T10:00:15Z"
}
]
Get Statistics
GET /api/stats
Response:
{
"total_tasks": 1000,
"queued_tasks": 10,
"running_tasks": 5,
"succeeded_tasks": 950,
"failed_tasks": 35,
"avg_retry_count": 0.45,
"tasks_with_retries": 300
}
Health Check
GET /health
Response:
{
"status": "healthy",
"database": "connected"
}
βοΈ Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
DB_HOST |
localhost |
PostgreSQL host |
DB_PORT |
5432 |
PostgreSQL port |
DB_USERNAME |
admin |
Database user |
DB_PASSWORD |
admin |
Database password |
DB_DATABASE |
tasks |
Database name |
SERVER_PORT |
8080 |
API server port |
WORKER_CONCURRENCY |
5 |
Worker pool size |
WORKER_POLL_INTERVAL |
1 |
Poll interval (seconds) |
WORKER_TIMEOUT |
30 |
Task timeout (seconds) |
Docker Compose
Edit docker-compose.yml to adjust configuration:
services:
worker:
environment:
WORKER_CONCURRENCY: "10"
WORKER_POLL_INTERVAL: "2"
Kubernetes
Edit k8s/manifests/worker-deployment.yaml:
spec:
replicas: 5 # Number of worker pods
template:
spec:
containers:
- name: worker
env:
- name: WORKER_CONCURRENCY
value: "10"
Configuration files are organized in k8s/:
manifests/- Kubernetes YAML files (deployments, services, configs)scripts/- Deployment automation scripts
π§ Development
Project Structure
.
βββ cmd/
β βββ server/ # API server entry point
β βββ worker/ # Worker entry point
β
βββ internal/
β βββ api/ # HTTP handlers and routes
β βββ config/ # Configuration
β βββ models/ # Domain models (Task, History)
β βββ storage/ # Data access layer
β β βββ postgres/ # PostgreSQL implementation
β βββ worker/ # Worker pool and task handlers
β βββ worker.go # Dispatcher + worker pool
β βββ registry.go # Handler registration
β βββ handlers/ # Task type implementations
β
βββ db/
β βββ migrations/ # SQL schema migrations
β
βββ k8s/ # Kubernetes deployment
β βββ manifests/ # YAML files (deployments, services, configs)
β βββ scripts/ # Deployment automation scripts
β βββ README.md # Kubernetes documentation
β
βββ tests/ # Integration tests
βββ web/ # Dashboard UI
β βββ static/ # CSS, JavaScript
β βββ templates/ # HTML templates
β
βββ docker-compose.yml
βββ Makefile
Please review these documents before contributing:
- CONTRIBUTING.md β guidelines, development setup, and workflow
- CODE_OF_CONDUCT.md β community standards and enforcement
- SECURITY.md β how to report vulnerabilities
Makefile Commands
make fmt # Format Go code make lint # Run linters (golangci-lint)
# Quick Start
make help # Show all commands
make docker-up # Start Docker Compose
make run-k8s # Deploy to Kubernetes (one command!)
make test-integration # Run integration tests
# Build
make build # Build server and worker binaries
make docker-build # Build Docker images
# Testing
make test-integration-docker # Test Docker Compose
make test-integration-k8s # Test Kubernetes
make check-tools # Verify prerequisites
# Kubernetes
make k8s-down # Stop Kubernetes
# Database
make migrate-up # Run migrations
make migrate-down # Rollback last migration
# Cleanup
make clean # Remove build artifacts
π Monitoring
Dashboard
Access the real-time dashboard at: http://localhost:8080/
Features:
- Live task statistics (updated via Server-Sent Events)
- Success rate visualization
- Retry metrics
- Auto-refresh every 5 seconds
Logs
Docker Compose:
docker-compose logs -f server
docker-compose logs -f worker
Kubernetes:
kubectl logs -f deployment/task-queue-worker -n task-queue
kubectl logs -f deployment/task-queue-server -n task-queue
# View all pods
kubectl get pods -n task-queue
# Describe a specific pod
kubectl describe pod <pod-name> -n task-queue
ποΈ Why This Design?
Key Architectural Decisions
-
PostgreSQL
- Simplicity: One database for everything
- Durability: No separate persistence layer needed
- Rich queries: Task history, statistics, complex filtering
-
Dispatcher Pattern
- Prevents database connection storm
- Reduces DB load by 98% (1 query vs 50 queries/second)
- Buffered channel provides natural backpressure
-
Exponential Backoff + Jitter
- Prevents retry storms
- Gradually increases delay for persistent failures
- Jitter spreads load over time
-
Lock Expiration
- Auto-recovery from worker crashes
- No manual intervention needed
- Tasks never stuck permanently
-
Context Cancellation
- Graceful shutdown
- In-flight tasks complete properly
- Safe for Kubernetes rolling updates
-
Kubernetes with kind & Bitnami PostgreSQL
- Production-ready: Bitnami Helm chart with security updates
- Easy local testing: kind runs K8s in Docker containers
- Horizontal scaling: Server (2 replicas) + Worker (3 replicas)
- Self-healing: Automatic pod restart on failures
- Organized structure: Separate manifests/ and scripts/ directories
π€ Contributing
Contributions are welcome! Here's how you can help:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Development Setup
make setup # Install dependencies
make test-integration # Run tests
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π€ Author
Amit Basuri
- GitHub: @amitbasuri
π Show Your Support
Give a βοΈ if this project helped you!
Built with β€οΈ using Go, PostgreSQL, and Kubernetes