eskit

package module
v0.0.0-...-c09f37c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 27 Imported by: 0

README

eskit — Event Sourcing Toolkit for Go

Go Reference

A pragmatic, generic event sourcing framework for Go. Built around the decider pattern with first-class support for all three event modeling patterns: State Changes, State Views, and Automations.

go get git.nullsoft.is/ash/eskit

Quick Start — Counter

package main

import (
	"context"
	"fmt"

	"git.nullsoft.is/ash/eskit"
)

type Incremented struct{}
type Decremented struct{}
type Increment struct{}
type Decrement struct{}

var counter = eskit.Decider[int, any, any]{
	InitialState: func() int { return 0 },
	Decide: func(state int, cmd any) ([]any, error) {
		switch cmd.(type) {
		case Increment:
			return []any{Incremented{}}, nil
		case Decrement:
			return []any{Decremented{}}, nil
		default:
			return nil, fmt.Errorf("unknown command: %T", cmd)
		}
	},
	Evolve: func(state int, event any) int {
		switch event.(type) {
		case Incremented:
			return state + 1
		case Decremented:
			return state - 1
		default:
			return state
		}
	},
}

func main() {
	ctx := context.Background()
	store := eskit.NewMemoryStore[any]()
	handler := eskit.NewDeciderHandler(store, counter)

	state, _, _ := handler.Handle(ctx, "counter", "1", Increment{})
	state, _, _ = handler.Handle(ctx, "counter", "1", Increment{})
	state, _, _ = handler.Handle(ctx, "counter", "1", Decrement{})

	fmt.Println("Counter:", state) // Counter: 1
}

See examples/counter/ for the runnable version.

Features

  • Decider pattern — pure functions: Decide(state, cmd) → events, Evolve(state, event) → state
  • Generic stores — MemoryStore, SQLite, PostgreSQL, NATS JetStream
  • Subscriptions — durable, checkpointed event delivery via subscription.Subscription
  • EventDispatcher — unified event dispatch engine with O(1) type filtering for both projections and processors
  • Subscription — single struct for registering event handlers (name, event type filter, handler function)
  • State ViewsStateView[E] struct for durable checkpoint-based projections, with sqlview helper for SQL-backed read models and OnChange callback for real-time notifications
  • Change notificationsChangeNotifier for in-process pub/sub of projection changes, ChangeRelay interface for cross-server broadcast (PG LISTEN/NOTIFY implementation included)
  • Real-time SSEServeChanges and MultiSubscription for wiring projections to Server-Sent Events with fallback polling, debounce, and Datastar integration
  • Snapshots — configurable snapshot intervals with schema versioning
  • Middleware — logging, metrics, retry, single-writer, custom
  • Codecs — JSON, jsoniter, CBOR, Protobuf (pluggable via codec.Codec, multi-codec migration support)
  • Ordering guarantees — strict global sequence delivery with gap detection, self-healing SequenceChecker, and safe-by-default blocking (see docs/ordering-and-gaps.md)
  • Schema evolution — upcasters for event versioning
  • GDPR — crypto-shredding for PII in events (envelope encryption with AES-256-GCM, sync.Pool-optimized cipher and nonce reuse)
  • Multi-tenancy — tenant isolation middleware
  • Observability — OpenTelemetry tracing + metrics (separate otelkit module)
  • NATS clustering — distributed commands, event bus, locks
  • Dynamic Consistency Boundary (DCB) — cross-stream decision logic
  • Health checks — HTTP health endpoint for projection lag monitoring, suitable for load balancer checks (see docs/operations.md)
  • Testing — Given/When/Then helpers, conformance suites
  • Store Parity — feature parity matrix across all backends (STORE_PARITY.md)

Sub-Modules

Core eskit has zero heavy dependencies (only go-cmp for test helpers). Store backends, codecs, and integrations are separate modules — you only pull in what you use.

The core module includes: deciders, MemoryStore, subscriptions, projections, live projections, middleware, snapshots, commands, GDPR crypto-shredding, DCB, processor, conformance testing, and all interfaces. Everything that doesn't require CGO or heavy third-party dependencies.

Separate modules for heavy dependencies:

# Store backends
go get git.nullsoft.is/ash/eskit/sqlitestore   # SQLite
go get git.nullsoft.is/ash/eskit/pgstore       # PostgreSQL
go get git.nullsoft.is/ash/eskit/natsstore     # NATS JetStream

# Serialization
go get git.nullsoft.is/ash/eskit/codec         # jsoniter, CBOR, Protobuf

# Command queue
go get git.nullsoft.is/ash/eskit/sqlitequeue   # SQLite command queue (durable, single-server)
go get git.nullsoft.is/ash/eskit/pgqueue       # PostgreSQL command queue

# NATS integrations
go get git.nullsoft.is/ash/eskit/natseventbus  # Event bus
go get git.nullsoft.is/ash/eskit/natscommand   # Distributed CommandBus
go get git.nullsoft.is/ash/eskit/natscluster   # Cluster command routing
go get git.nullsoft.is/ash/eskit/natsbatch     # Batch event processing
go get git.nullsoft.is/ash/eskit/natslock      # Distributed locks
go get git.nullsoft.is/ash/eskit/natsnotifier  # Event notifications
go get git.nullsoft.is/ash/eskit/embeddednats  # Embedded server (testing)

# Observability
go get git.nullsoft.is/ash/eskit/otelkit       # OpenTelemetry tracing + metrics

# Metrics
go get git.nullsoft.is/ash/eskit/metrics       # Recorder interface (zero deps)
go get git.nullsoft.is/ash/eskit/prommetrics   # Prometheus implementation

# Runtime
go get git.nullsoft.is/ash/eskit/runner        # Concurrent service runner

# Event processors
go get git.nullsoft.is/ash/eskit/processor     # Event-reactive processors & todolist pattern

Append Modes

By default, Append uses optimistic concurrency control — pass the expected stream version to detect conflicts. For append-only streams where ordering doesn't matter, use AppendAny:

// Standard — optimistic concurrency check
store.Append(ctx, "order", "123", 3, events)

// Append-only mode — skip version check (useful for log/audit streams)
store.Append(ctx, "audit", "log-1", eskit.AppendAny, events)

See docs/stores.md for details on when to use each mode.

Type Registration

Events are plain structs — no interfaces needed. Register them once, and the wire name is derived from the Go type:

reg := eskit.NewEventRegistry()
eskit.Register[OrderPlaced](reg)      // → "sales.OrderPlaced"
eskit.Register[PaymentProcessed](reg) // → "finance.PaymentProcessed"

store, _ := sqlitestore.New[any](db, sqlitestore.WithRegistry[any](reg))

Commands are plain structs — no interface needed. Register them with the bus, providing the stream type and an ID extractor:

type PlaceOrder struct {
    OrderID string
    Items   []Item
}

command.Register(bus, store, orderDecider, "order", func(c PlaceOrder) string {
    return c.OrderID
})

No string names. No EventType() methods. No CommandName() methods. The Go type system is the single source of truth.

The Decider Pattern

Domain logic as pure functions — easy to test, compose, and reason about:

Command → Decide(state, command) → []Event
State + Event → Evolve(state, event) → State

Test with Given/When/Then:

eskit.Given(decider, pastEvents...).
    When(command).
    Then(t, expectedEvents...)

Documentation

Topic Link
Store backends & comparison docs/stores.md
Serialization & codecs docs/serialization.md
Middleware docs/middleware.md
Subscriptions & watchers docs/subscriptions.md
Ordering guarantees & gap detection docs/ordering-and-gaps.md
Snapshots docs/snapshots.md
Advanced (GDPR, clustering, multi-tenancy, etc.) docs/advanced.md
Architecture docs/architecture.md
Processors (reactive + TodoProcessor) docs/processor.md
Metadata, correlation & causation docs/metadata.md
Observability (OTel tracing + metrics) docs/observability.md
State Views (green slices) & OnChange docs/state-views.md
Zero-downtime projection rebuild docs/rebuild.md
Change notifications (ChangeNotifier, ChangeRelay) docs/subscriptions.md
Real-time SSE & Datastar integration docs/sse-guide.md
Quick start guide docs/quickstart.md
Performance docs/performance.md
Testing guide docs/testing.md
FAQ docs/faq.md
Schema separation (eventstore + readmodel) docs/schema-separation.md
Production tuning (pools, checkpoints, Postgres) docs/production-tuning.md
Known limitations & improvement opportunities docs/limitations.md
Store feature parity matrix STORE_PARITY.md

Sub-Packages

Package Description
typereg Thread-safe type registry for wire deserialization of commands and events
commandqueue Transport-agnostic command queue interface with in-memory, SQLite, Postgres, and NATS implementations
processor Event-reactive processors with typed handlers and todolist pattern

See docs/command-queue.md for the command queue architecture and migration guide.

Conformance Suite

The eventstoretest package provides a comprehensive conformance test suite that any EventStore implementation must pass. Run it against your store with a single call:

func TestMyStore(t *testing.T) {
    eventstoretest.RunSuite(t, func(t *testing.T) eskit.EventStore[eventstoretest.TestEvent] {
        return mystore.New(t)
    })
}

The suite covers:

Category Tests
Basic CRUD Append single/multiple, read back, empty streams, stream isolation, sequential appends
Concurrency Optimistic concurrency control, wrong version detection, concurrent appends
Ordering Insertion order preserved, monotonically increasing versions
Read Patterns LoadFrom version offsets, beyond-end reads, empty stream reads
AppendAny Unconditional appends to new/existing streams, concurrent AppendAny
Stream Type Isolation Events scoped to stream types, cross-type isolation
Event Fields StreamType, EventType, StreamID, GlobalSequence all populated correctly
Special Stream Names Hyphens, underscores, UUIDs, slashes, unicode in stream IDs
Metadata Preservation, many extra keys, batch application, unicode values
Metadata Edge Cases Large metadata maps, batch metadata propagation, unicode metadata values
Idempotency Duplicate version append rejection
Edge Cases Large payloads, empty data, many streams, many events, unicode data, binary-like data, long stream IDs, unique event IDs, timestamp validation
Stress Concurrent appends (different streams), concurrent OCC (same stream), concurrent readers + writers

A companion RunBenchmarkSuite provides standardized performance benchmarks across stores.

Fuzz Testing

Fuzz tests cover codec deserialization (JSON, CBOR, Protobuf) and core operations (stream IDs, event types) to catch panics on untrusted input:

cd codec && go test -fuzz=FuzzJSONUnmarshal -fuzztime=30s
cd codec && go test -fuzz=FuzzCBORUnmarshal -fuzztime=30s
go test -fuzz=FuzzStreamID -fuzztime=30s
go test -fuzz=FuzzEventTypeName -fuzztime=30s

Lifecycle Hooks

The hooks package provides composable middleware hooks at three boundaries: event append, command dispatch, and projection processing.

Event Append Hooks

Wrap any EventStore with before/after append hooks:

import "git.nullsoft.is/ash/eskit/hooks"

// Validation hook — reject events that fail business rules
store := hooks.WrapStore(innerStore,
    hooks.WithBeforeAppend(func(ctx context.Context, streamID string, events []MyEvent) error {
        if streamID == "" {
            return errors.New("stream ID required")
        }
        return nil // allow append
    }),
    hooks.WithAfterAppend(func(ctx context.Context, streamID string, events []eskit.Event[MyEvent]) {
        log.Printf("appended %d events to %s", len(events), streamID)
    }),
)
  • BeforeAppend can reject (return error stops the append, no events written)
  • AfterAppend is fire-and-forget (panics are recovered, never breaks the append)
  • Multiple hooks per point, executed in registration order
  • Zero overhead when no hooks are registered (returns unwrapped store)
Command Dispatch Hooks
mw := hooks.CommandHookMiddleware(
    []hooks.BeforeCommandFunc{
        func(ctx context.Context, cmd any) error {
            log.Printf("dispatching: %T", cmd)
            return nil
        },
    },
    []hooks.AfterCommandFunc{
        func(ctx context.Context, cmd any, err error) {
            if err != nil {
                log.Printf("command failed: %v", err)
            }
        },
    },
)
Projection Hooks

Handle projection errors with Skip/Retry/Halt strategies, and get notified when caught up:

handler := hooks.NewProjectionHandler(innerHandler,
    hooks.WithOnError(func(ctx context.Context, event eskit.Event[MyEvent], err error) hooks.ErrorAction {
        if isTransient(err) {
            return hooks.Retry
        }
        return hooks.Skip // skip poison events
    }),
    hooks.WithOnCaughtUp[MyEvent](func(ctx context.Context, position uint64) {
        log.Printf("projection caught up to position %d", position)
    }),
    hooks.WithMaxRetries[MyEvent](5),
)
Atomic Checkpoint (Crash-Safe Projections)

For non-idempotent side effects (sending emails, charging payments), use CheckpointInTx to save the checkpoint within the same database transaction as Evolve. If the process crashes after Evolve but before commit, both the projection update and checkpoint are rolled back — preventing double processing on restart.

var EmailView = pgview.Config[domain.Event]{
    Name:           "email-sender",
    EventTypes:     []string{"OrderConfirmed"},
    CheckpointInTx: true, // checkpoint saved in same TX as Evolve
    Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
        // Record the email in the DB (same transaction)
        _, err := tx.Exec(ctx, `INSERT INTO sent_emails ...`, ...)
        return err
    },
}

Works with both pgview and sqlview. The subscription automatically injects checkpoint info via context — no manual wiring needed.

Leader Election (Multi-Node)

Running N instances means N copies of every processor and projection — duplicate emails, duplicate payments. eskit solves this with leader election via Postgres advisory locks.

Why Not Just Checkpoints?

The checkpoint is a bookmark, not a lock. Two nodes with the same checkpoint name both read the same position, process the same events, and execute duplicate side effects. The checkpoint tracks progress — it doesn't coordinate access.

How It Works
  1. On Start(), the processor calls LeaderLock.Acquire(ctx, "processor:send-emails")
  2. Winner: holds the lock, reads events, executes side effects, advances checkpoint
  3. Others: block on Acquire, waiting for the lock — zero CPU, zero duplicates
  4. Graceful shutdown (SIGTERM/context cancel): lock is released via defer release()instant handoff, next instance acquires immediately
  5. Hard crash: Postgres detects dead connection, releases advisory lock (~30s depending on TCP keepalive). New leader replays from checkpoint, rebuilds TodoList, resumes.

Rolling deploy scenario:

  1. Old node receives SIGTERM → defer release() runs → lock released
  2. New node was blocking on Acquire → acquires instantly
  3. Replays from shared checkpoint → rebuilds TodoList → executes pending items
  4. Zero downtime for normal deploys. 30s gap only on hard crashes.

⚠️ At-least-once guarantee: If a crash occurs after a side effect executes but before the checkpoint saves, the event replays and the side effect runs again. All Execute handlers and subscription Handlers with side effects must be idempotent. Use idempotency keys, INSERT ... ON CONFLICT DO NOTHING, or check-before-act patterns.

Configuration

Single-node (development) — no changes needed:

store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
    Name:    "send-emails",
    Execute: sendEmail,
    Interval: time.Second,
    // No leader lock → all instances process (fine for single node)
})

Multi-node (production) — add leader election:

store := processor.NewMemoryStore[string]()
proc := processor.NewTodoProcessor(store, processor.TodoConfig[string]{
    Name:    "send-emails",
    Execute:    sendEmail,
    LeaderLock: pgstore.NewPgLockRegistry(pool),               // advisory lock — one leader
})

Subscription-only (projections):

sub, _ := subscription.New(subscription.Config[MyEvent]{
    ConsumerID: "order-summary",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
    Handler:    updateProjection,
    LeaderLock: pgstore.NewPgLockRegistry(pool),
})

Everything in Postgres. No extra infrastructure. No extra tables. Advisory locks use the same connection pool.

Batch checkpointing reduces DB writes during catch-up — save every 100 events instead of every event:

cfg := subscription.FromStateView(view, reader, cp,
    subscription.WithCheckpointEvery[MyEvent](100),           // save every 100 events
    subscription.WithCheckpointFlushInterval[MyEvent](time.Second), // or every 1s
)

See docs/subscriptions.md for details.

Projections (State Views) with Leader Election

Projections are inherently safer than processors because they're idempotent — they UPSERT to a read model. If an event replays, the projection writes the same data again. No harm done.

Single-node: no LeaderLock needed. The subscription reads, updates the projection, checkpoints. Crash → replay from checkpoint → re-UPSERT → same result.

Multi-node: add LeaderLock to prevent duplicate work (wasted CPU, not data corruption):

// Define the projection (pgview handles Evolve + CheckpointInTx)
var OrderSummary = pgview.Config[domain.Event]{
    Name:           "order-summary",
    EventTypes:     []string{"OrderPlaced", "OrderShipped"},
    CheckpointInTx: true,  // checkpoint in same TX as projection update
    Evolve: func(ctx context.Context, tx pgx.Tx, event eskit.Event[domain.Event]) error {
        _, err := tx.Exec(ctx, `INSERT INTO order_summaries ... ON CONFLICT DO UPDATE ...`)
        return err
    },
}

// Wire it with LeaderLock via subscription
sub, _ := subscription.New(subscription.Config[domain.Event]{
    ConsumerID: "order-summary",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "order-summary"),
    Handler:    OrderSummary.Handler(pool),
    LeaderLock: pgstore.NewPgLockRegistry(pool),  // only one node runs this projection
})

Three levels of safety:

Level Config Replay window Use when
Basic default Handler → checkpoint save Single node, idempotent projections
Atomic CheckpointInTx: true Zero (same TX) Multi-node projections, paranoid mode
Leader + Atomic Both Zero + single writer Production multi-node (recommended)

CheckpointInTx eliminates the replay window. LeaderLock eliminates duplicate processing. Together: one node, zero gaps, zero duplicates.

When to Use Subscription vs Automation

Plain subscription + LeaderLock — for simple side effects (trigger → execute):

// Trigger event arrives → execute immediately → checkpoint advances
sub, _ := subscription.New(subscription.Config[MyEvent]{
    ConsumerID: "send-welcome-email",
    Reader:     pgStore,
    Checkpoint: pgstore.NewPgCheckpoint(pool, "send-welcome-email"),
    LeaderLock: pgstore.NewPgLockRegistry(pool),
    Handler: func(ctx context.Context, event GlobalEvent[MyEvent]) error {
        if event.EventType == "UserRegistered" {
            return sendWelcomeEmail(ctx, event.Data)  // execute inline
        }
        return nil
    },
})

Handler returns nil → checkpoint advances. Handler returns error → event replays. Crash → event replays. Simple, correct, no extra state.

Automation — for deferred execution, completion tracking, or retry with backoff:

The processor package adds a TodoList between event reading and execution. The checkpoint advances only after side effects execute — never before. This means:

  • Crash before execute → events replay, side effects re-run (at-least-once)
  • Failed items retry up to MaxAttempts → move to DLQ → checkpoint advances
  • Completion events can cancel pending items before they execute

Use Automation when you need to batch items, wait for external completion signals, or have complex retry logic. For everything else, prefer a plain subscription.

Processor

The processor package provides two patterns for event-driven processing: typed event handlers for immediate reactions, and the todolist pattern for stateful, filtered processing.

Pattern 1: Typed Event Handlers

Use processor.New with On[T] handlers when you want to react to specific events immediately. The processor subscribes only to the event types you register.

import "git.nullsoft.is/ash/eskit/processor"

p := processor.New(pgStore, checkpoint, bus, []processor.Handler{
    processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
        return bus.Send(ctx, ShipOrder{OrderID: e.OrderID})
    }),
    processor.On[PaymentFailed](func(ctx context.Context, e PaymentFailed) error {
        return bus.Send(ctx, CancelOrder{OrderID: e.OrderID})
    }),
}, processor.WithName("order-processor"))

go p.Start(ctx)
Pattern 2: TodoProcessor

Use NewTodoProcessor with a Store when you need to claim and process work items from a state-view table. Exactly-once across multiple servers via FOR UPDATE SKIP LOCKED.

import (
    "git.nullsoft.is/ash/eskit/processor"
    "git.nullsoft.is/ash/eskit/pgprocessor"
)

// Store claims from a state-view table (pgview populates it)
store := pgprocessor.NewStore[ChargeItem](pool, pgprocessor.Config{
    Table:      "charges",
    ClaimWhere: "pending = true AND attempts < 3",
    MarkSQL:    "UPDATE charges SET pending = false WHERE item_key = $1",
})

// Same processor works with MemStore (tests) or Postgres (production)
charger := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
    Name:    "charge-processor",
    Workers: 4,
    Execute: func(ctx context.Context, key string, item ChargeItem) error {
        // ctx has metadata (correlation, causation) — flows to commands automatically
        return bus.Send(ctx, ChargeCard{OrderID: key, Amount: item.Amount})
    },
})

go charger.Start(ctx)

Batch execution — process multiple items per commit for higher throughput:

proc := processor.NewTodoProcessor(store, processor.TodoConfig[ChargeItem]{
    Name:      "bulk-charger",
    BatchSize: 50,
    ExecuteBatch: func(ctx context.Context, items []processor.BatchItem[ChargeItem]) error {
        for _, it := range items {
            if err := chargeCard(ctx, it.Key, it.Item); err != nil {
                return err // entire batch rolls back
            }
        }
        return nil
    },
})

See docs/processor.md for batch configuration, benchmarks, and trade-offs.

Context Metadata — Zero Boilerplate

Metadata (correlation, causation, principal) flows through context.Context automatically. Set it once at the HTTP boundary:

// HTTP middleware — start a chain
ctx = eskit.NewChain(ctx, eskit.Principal{Kind: eskit.PrincipalUser, ID: userID})

// Handlers — just send commands, metadata flows through ctx
bus.Send(ctx, PlaceOrder{OrderID: "123"})

// Reactive processors — auto-propagated, zero boilerplate
processor.On[OrderPlaced](func(ctx context.Context, e OrderPlaced) error {
    return bus.Send(ctx, ChargePayment{OrderID: e.OrderID})
})

See Metadata Guide for details.

When to Use Which
Pattern Best for
On[T] handlers Immediate reactions: event → command, no state needed
TodoProcessor Work queues: claim items, retry on failure, exactly-once across servers
Observability

Full OpenTelemetry instrumentation via the separate otelkit module:

bus.Use(otelkit.CommandTracing(tracer))
bus.Use(otelkit.CommandMetrics(meter))
store = otelkit.WithTracing(otelkit.WithMetrics(store, meter), tracer)

See Observability Guide for full wiring with all components.

Command Persistence

The commandlog package records every command dispatched through the CommandBus — invaluable for production debugging, audit trails, and replay analysis.

Setup
import (
    "git.nullsoft.is/ash/eskit/commandlog"
    "git.nullsoft.is/ash/eskit/commandlog/memlog"   // or sqlitelog
)

// Create a store (memory for dev, SQLite for production).
store := memlog.New()

// Add middleware to the command bus.
bus.Use(commandlog.Middleware(store))
With Correlation and Metadata
bus.Use(commandlog.Middleware(store,
    commandlog.WithCorrelationFunc(func(ctx context.Context, cmd command.Command) string {
        return correlationIDFromContext(ctx)
    }),
    commandlog.WithMetadataFunc(func(ctx context.Context, cmd command.Command) map[string]string {
        return map[string]string{"user": userFromContext(ctx)}
    }),
))
Querying the Log
// Get a specific command record.
record, _ := store.Get(ctx, "cmd-id")

// Find all commands in a saga/correlation chain.
chain, _ := store.FindByCorrelation(ctx, "saga-42")

// List recent commands with pagination.
recent, _ := store.List(ctx, commandlog.QueryOpts{Limit: 50})

// Find by command type with time range filtering.
orders, _ := store.FindByType(ctx, "PlaceOrder", commandlog.QueryOpts{
    After: time.Now().Add(-24 * time.Hour),
})
Backends
Backend Module Use Case
Memory commandlog/memlog Testing, development
SQLite commandlog/sqlitelog Production, single-node

Write your own by implementing commandlog.CommandLogStore.

Examples

CLI Tool

The eskit CLI provides management and inspection of eskit event stores.

Installation
go install git.nullsoft.is/ash/eskit/cmd/eskit@latest
Usage
# Connect to a SQLite event store
eskit --store sqlite:///path/to/app.db <command>

# Output as JSON for scripting
eskit --store sqlite:///path/to/app.db --json streams list

# Decode CBOR payloads
eskit --store sqlite:///path/to/app.db --codec cbor events show 42

# Override stored codec (force JSON decode)
eskit --store sqlite:///path/to/app.db --codec json events show 42

# Show raw hex dump (skip decoding entirely)
eskit --store sqlite:///path/to/app.db --raw events show 42
Commands
Command Description
streams list List all streams with event counts
streams show <id> Show events in a stream, decoded
events search --correlation <id> Find events by correlation ID
events search --type <event-type> Find events by event type
events show <global-pos> Show single event with full detail
projections list Show projection checkpoint positions
projections rebuild <name> Reset a projection checkpoint to 0
commands list List recent commands (from command log)
commands show <id> Show command with resulting events
store stats Event count, stream count, DB size
store health Connectivity check
Flags
Flag Description
--store Store DSN (sqlite:///path or postgres://user:pass@host/db)
--json Output as JSON instead of table
--codec Payload codec (json, cbor, protobuf); auto-detect if omitted. Overrides the codec stored in each event row.
--raw Show raw payload bytes as hex dump (skip all decoding)
--limit Max results to return
Examples
# List all streams
$ eskit --store sqlite:///tmp/myapp.db streams list
STREAM      EVENTS  LATEST VERSION  LAST EVENT
----------  ------  --------------  --------------------------
account-1   5       5               2025-01-15T10:30:00Z
order-42    3       3               2025-01-15T11:00:00Z

# Search events by correlation ID (JSON output)
$ eskit --store sqlite:///tmp/myapp.db --json events search --correlation req-abc123
[
  {"id": 1, "stream_id": "account-1", "version": 1, "event_type": "AccountOpened", ...},
  {"id": 5, "stream_id": "order-42", "version": 1, "event_type": "OrderCreated", ...}
]

# Check store health
$ eskit --store sqlite:///tmp/myapp.db store health
Store is healthy.

# Get store statistics
$ eskit --store sqlite:///tmp/myapp.db store stats
Events:  1523
Streams: 42
DB Size: 2.3 MB
Store Migration

The migrate subcommand copies all event data between eskit stores, preserving stream IDs, event order, versions, codecs, timestamps, and all metadata.

# Basic migration between SQLite databases
eskit migrate --from sqlite:///source.db --to sqlite:///target.db

# Custom batch size (default 500)
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --batch-size 1000

# Dry-run: count events/streams without writing
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --dry-run

# Verify event counts match after migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --verify

# Resumable migration with checkpoint file
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --checkpoint migrate.ckpt

# Re-encode events from JSON to CBOR during migration
eskit migrate --from sqlite:///source.db --to sqlite:///target.db --recode-to cbor
Flag Description
--from Source store DSN (required)
--to Target store DSN (required)
--batch-size Events per write transaction (default 500, max 100000)
--dry-run Validate source readable, count events/streams, write nothing
--verify Post-migration comparison of event counts per stream
--checkpoint File to track completed streams for resumable migrations
--recode-to Re-encode events to json or cbor during migration

Progress is reported to stderr: [42/128 streams] [15,234 events] [stream: order-abc123]

The migration architecture uses MigrationSource and MigrationTarget interfaces, making it straightforward to add new backends (Postgres, NATS, etc.).

Codec Support

The CLI automatically decodes event payloads based on the codec name stored in each event row:

Codec Decoding
json, jsoniter Standard JSON decode
cbor CBOR binary decode (maps normalized for JSON output)
protobuf Generic wire-format decode (field numbers + values, no schema needed)
gob Hex dump (requires registered types not available in CLI)
Unknown/custom Hex dump with codec name shown

Use --codec to override the stored codec (e.g., when metadata is wrong). Use --raw to skip decoding entirely and see the raw hex dump.

License

MIT — see LICENSE.

Documentation

Overview

Package eskit provides an event sourcing toolkit for Go using the decider pattern.

The decider pattern models domain logic as pure functions:

Command → Decide(state, command) → []Event
State + Event → Evolve(state, event) → State

This separation makes domain logic easy to test, compose, and reason about.

registry_store.go provides helpers for wiring EventRegistry into store deserialization.

The core problem: when E is an interface (heterogeneous event stream), json.Unmarshal doesn't know which concrete type to create. The registry solves this by mapping event type names → factory functions, enabling correct deserialization.

Two paths:

  • No registry: json.Unmarshal into E directly (works for concrete struct E)
  • With registry: create concrete type from registry, unmarshal into it, cast to E

EventBus provides a publish/subscribe mechanism for distributing events across components or nodes. Use ChannelEventBus for single-process deployments and natseventbus.NATSEventBus (from git.nullsoft.is/ash/eskit/natseventbus) for multi-node clusters.

EventBus is separate from EventStore — the store is the source of truth, the bus is the notification mechanism. Events are published AFTER successful store append, and subscribers should be idempotent (events may be delivered more than once, or missed if a subscriber was offline).

register.go provides the generic Register function for type-safe event registration.

The wire name is derived from the Go type: "{package}.{TypeName}". This is deterministic, zero-duplication, and derived at registration time.

reg := eskit.NewEventRegistry()
eskit.Register[OrderPlaced](reg)      // → "sales.OrderPlaced"
eskit.Register[ItemAddedToCart](reg)   // → "sales.ItemAddedToCart"

Index

Constants

View Source
const (
	// StreamTypeLengthMax limits the length of stream type strings.
	StreamTypeLengthMax = 256

	// StreamIDLengthMax limits the length of stream ID strings.
	StreamIDLengthMax = 256

	// EventTypeLengthMax limits the length of event type strings.
	EventTypeLengthMax = 256

	// BatchSizeMax limits batch operations to prevent unbounded memory usage.
	BatchSizeMax = 10000

	// MetadataEntriesMax limits the number of metadata entries per event.
	MetadataEntriesMax = 100

	// MetadataMaxSizeBytes limits the total size of all metadata keys and values (64KB).
	MetadataMaxSizeBytes = 64 * 1024
)

Bounds — TigerStyle: every resource must be bounded.

View Source
const AppendAny = -1

AppendAny is a sentinel expectedVersion value meaning "skip optimistic concurrency check." Use for append-only streams where multiple writers are expected (e.g., integration event streams).

Variables

View Source
var (
	// ErrKeyNotFound is returned when a key ID is not in the store.
	ErrKeyNotFound = fmt.Errorf("eskit/crypto: key not found")

	// ErrNoActiveKey is returned when no active key is configured.
	ErrNoActiveKey = fmt.Errorf("eskit/crypto: no active key")

	// ErrDecryptionFailed is returned when authenticated decryption fails (tampered data).
	ErrDecryptionFailed = fmt.Errorf("eskit/crypto: decryption failed")

	// ErrInvalidEnvelope is returned for malformed encrypted envelopes.
	ErrInvalidEnvelope = fmt.Errorf("eskit/crypto: invalid envelope")

	// ErrPayloadTooLarge is returned when data exceeds maxPayloadSize.
	ErrPayloadTooLarge = fmt.Errorf("eskit/crypto: payload too large")
)
View Source
var (
	// ErrStreamDeleted is returned when attempting to append to a tombstoned stream.
	ErrStreamDeleted = errors.New("eskit: stream deleted")

	// ErrStreamArchived is returned when attempting to append to an archived stream.
	ErrStreamArchived = errors.New("eskit: stream archived")
)
View Source
var (
	// ErrDLQMessageNotFound is returned when a DLQ entry doesn't exist.
	ErrDLQMessageNotFound = fmt.Errorf("eskit/dlq: message not found")

	// ErrDLQFull is returned when the DLQ has reached its capacity.
	ErrDLQFull = fmt.Errorf("eskit/dlq: queue full")

	// ErrDLQClosed is returned when operating on a closed DLQ.
	ErrDLQClosed = fmt.Errorf("eskit/dlq: closed")
)
View Source
var (
	// ErrStreamIDEmpty is returned when a stream ID is empty or blank.
	ErrStreamIDEmpty = errors.New("eskit: stream ID must not be empty")

	// ErrStreamIDTooLong is returned when a stream ID exceeds the maximum allowed length.
	ErrStreamIDTooLong = errors.New("eskit: stream ID too long")

	// ErrEventsEmpty is returned when an Append call provides no events.
	ErrEventsEmpty = errors.New("eskit: events must not be empty")

	// ErrTooManyEvents is returned when an Append call exceeds the maximum batch size.
	ErrTooManyEvents = errors.New("eskit: too many events in batch")

	// ErrNegativeVersion is returned when a negative expectedVersion is provided.
	ErrNegativeVersion = errors.New("eskit: expected version must not be negative")

	// ErrNilHandler is returned when a nil handler or callback is provided
	// where a non-nil value is required.
	ErrNilHandler = errors.New("eskit: handler must not be nil")

	// ErrNilStore is returned when a nil event store is provided.
	ErrNilStore = errors.New("eskit: store must not be nil")

	// ErrAlreadyRunning is returned when Start is called on something already running.
	ErrAlreadyRunning = errors.New("eskit: already running")

	// ErrBusClosed is returned when publishing or subscribing on a closed event bus.
	ErrBusClosed = errors.New("eskit: bus is closed")

	// ErrTooManySubscribers is returned when a subscriber limit is exceeded.
	ErrTooManySubscribers = errors.New("eskit: too many subscribers")

	// ErrEventTypeNotRegistered is returned when an event type string cannot be resolved
	// to a registered Go type in the EventRegistry (not the TypeCache).
	ErrEventTypeNotRegistered = errors.New("eskit: event type not registered")

	// ErrNullByteInID is returned when a stream ID or stream type contains a null byte.
	// Null bytes are forbidden because they are used as internal key separators.
	ErrNullByteInID = errors.New("eskit: stream ID or type must not contain null bytes")
)
View Source
var (
	// ErrConcurrencyConflict is returned when the expected version doesn't match
	// the actual version in the store. This indicates a concurrent modification.
	ErrConcurrencyConflict = errors.New("eskit: concurrency conflict")

	// ErrStreamNotFound is returned when no events exist for a stream.
	ErrStreamNotFound = errors.New("eskit: stream not found")

	// ErrMetadataTooManyEntries is returned when metadata exceeds MetadataEntriesMax.
	ErrMetadataTooManyEntries = errors.New("eskit: metadata too many entries")

	// ErrMetadataTooLarge is returned when metadata exceeds MetadataMaxSizeBytes.
	ErrMetadataTooLarge = errors.New("eskit: metadata too large")

	// ErrStreamTypeEmpty is returned when stream type is empty.
	ErrStreamTypeEmpty = errors.New("eskit: stream type must not be empty")

	// ErrStreamTypeTooLong is returned when stream type exceeds StreamTypeLengthMax.
	ErrStreamTypeTooLong = errors.New("eskit: stream type too long")
)
View Source
var (
	// ErrTenantRequired is returned when a tenant ID is expected but not in context.
	ErrTenantRequired = fmt.Errorf("eskit/tenant: tenant ID required in context")

	// ErrInvalidTenantID is returned for tenant IDs that don't match the allowed pattern.
	ErrInvalidTenantID = fmt.Errorf("eskit/tenant: invalid tenant ID")
)
View Source
var DefaultBaseTime = time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC)

DefaultBaseTime is a stable timestamp for deterministic projection tests. Using a fixed time avoids flaky tests that depend on time.Now().

Functions

func Continue

func Continue[E any](ctx context.Context, source Event[E], principal Principal) context.Context

Continue propagates metadata from a source event into the context. Copies CorrelationID and Originator from the event, sets CausationID to the source event's identity, and sets Initiator to the given principal.

The processor framework calls this automatically before invoking handlers. You typically don't need to call this yourself.

ctx = eskit.Continue(ctx, sourceEvent, eskit.Principal{Kind: eskit.PrincipalAutomation, ID: "charger"})

func CurrentSchemaVersion

func CurrentSchemaVersion(upcasters *UpcasterRegistry, eventType string) int

CurrentSchemaVersion returns the latest schema version for an event type. Returns 1 if no upcasters are registered (default schema version).

func DeriveTypeName

func DeriveTypeName[E any]() string

DeriveTypeName returns the wire name for a Go type without registering it. Format: "{package}.{TypeName}" with PascalCase type name.

Panics if E is not a named type (anonymous structs, interfaces, etc.).

eskit.DeriveTypeName[OrderPlaced]() // → "sales.OrderPlaced"

func DeserializeWithCodec

func DeserializeWithCodec[E any](registry *EventRegistry, eventType string, data []byte, unmarshal UnmarshalFunc) (E, error)

DeserializeWithCodec deserializes event data using a custom unmarshal function and the registry for type resolution. This enables stores to use any codec (CBOR, msgpack, etc.) instead of hardcoded JSON.

func DeserializeWithPool

func DeserializeWithPool[E any](pool *pooledEventRegistry, eventType string, data []byte) (E, error)

DeserializeWithPool deserializes JSON event data using a pooled type registry. This avoids allocating a new instance for every event, reusing objects via sync.Pool.

The returned value is a copy (not pooled), so it's safe to hold indefinitely. The pooled instance is returned to the pool after copying.

Performance: avoids 1 allocation per event on the deserialization hot path.

func DeserializeWithRegistry

func DeserializeWithRegistry[E any](registry *EventRegistry, eventType string, data []byte) (E, error)

DeserializeWithRegistry deserializes JSON event data using the registry to create the correct concrete type, then returns it as E.

Steps:

  1. Look up the factory for eventType in the registry
  2. Create a new instance (pointer to concrete type)
  3. json.Unmarshal into the pointer
  4. Dereference to get the value, assert to E

Why dereference? The registry factory returns *ConcreteType (pointer) so json.Unmarshal can write into it. But E is typically the interface (not pointer), so we dereference before the type assertion.

func DeserializeWithUpcasting

func DeserializeWithUpcasting[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, data []byte, schemaVersion int) (E, int, error)

DeserializeWithUpcasting deserializes JSON event data, applying upcasters first if needed. If upcasters is nil or no upcaster is registered for this event type, falls through to standard DeserializeWithRegistry (zero overhead on the hot path).

Flow: raw JSON (schema v_old) → upcast chain → raw JSON (schema v_latest) → unmarshal → E

func DeserializeWithUpcastingCodec

func DeserializeWithUpcastingCodec[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, data []byte, schemaVersion int, unmarshal UnmarshalFunc) (E, int, error)

DeserializeWithUpcastingCodec is like DeserializeWithUpcasting but accepts a custom unmarshal function for codec-agnostic deserialization.

func GenerateKey

func GenerateKey() ([]byte, error)

GenerateKey generates a cryptographically secure random 32-byte key.

func NewChain

func NewChain(ctx context.Context, principal Principal) context.Context

NewChain starts a new metadata chain in the context. Generates a random correlation ID and sets the principal as both initiator and originator (they're the same at the entry point).

Use in HTTP middleware or API entry points:

ctx = eskit.NewChain(ctx, eskit.Principal{Kind: eskit.PrincipalUser, ID: userID})

func NewChainWithID

func NewChainWithID(ctx context.Context, correlationID string, principal Principal) context.Context

NewChainWithID starts a new metadata chain with a specific correlation ID. Use when the caller provides their own correlation ID (e.g., X-Correlation-ID header).

ctx = eskit.NewChainWithID(ctx, requestID, eskit.Principal{Kind: eskit.PrincipalAPI, ID: apiKey})

func ProjectEvents

func ProjectEvents[E any](t *testing.T, handler func(ctx context.Context, event Event[E]) error, streamID string, baseTime time.Time, events ...E)

ProjectEvents feeds events through a handler function for projection unit tests. Events are wrapped in Event[E] with incremental versions and timestamps.

This is a standalone function for simple cases. For multi-stream or assertion-rich tests, prefer the fluent TestProjection API.

Parameters:

  • streamID: the stream identifier (e.g., "geozone-1", "location-42")
  • baseTime: starting timestamp; each event is offset by 1 second
  • events: the domain event data to project

Example:

eskit.ProjectEvents(t, handler, "order-1", eskit.DefaultBaseTime,
    OrderCreated{ID: "order-1"},
    ItemAdded{SKU: "shoe"},
)

func RebuildSnapshots

func RebuildSnapshots[S any, E any](
	ctx context.Context,
	store EventStore[E],
	snapStore SnapshotStore[S],
	decider Decider[S, any, E],
	opts ...snapshot.Option,
) error

RebuildSnapshots replays all events for all streams and saves fresh snapshots. Use after deploying schema changes or Evolve logic changes.

This is a batch operation intended for deployment scripts or maintenance tasks. It loads all events from the store, replays them through the decider, and saves new snapshots with the configured schema version.

eskit.RebuildSnapshots(ctx, eventStore, snapStore, decider,
    snapshot.SchemaVersion(2),
    snapshot.Every(100),
)

func Register

func Register[E any](reg *EventRegistry)

Register adds a type to the EventRegistry using the Go type name as the wire name. The wire name format is "{package}.{TypeName}" — e.g., "sales.OrderPlaced".

This is the preferred registration method. It eliminates string duplication and derives the name from the Go type system (single source of truth).

Panics if:

  • reg is nil
  • E is not a named struct type (interfaces, maps, slices, anonymous structs are rejected)
  • the type is already registered
  • the registry is full

func RegisterAs

func RegisterAs[E any](reg *EventRegistry, name string)

RegisterAs adds a type to the EventRegistry with an explicit wire name. Use this for backward compatibility when migrating from manual string-based registration — the wire name must match existing stored events.

eskit.RegisterAs[OrderPlaced](reg, "OrderPlaced")  // legacy wire name

Panics if reg is nil, the type is already registered, or the registry is full.

func RegisterRawUpcaster

func RegisterRawUpcaster(
	registry *UpcasterRegistry,
	eventType string,
	fromVersion, toVersion int,
	fn UpcasterFunc,
)

RegisterRawUpcaster registers an upcaster that works directly with json.RawMessage. Use this for maximum performance when you don't need typed access to the event, or when the intermediate types no longer exist in your codebase.

Example — renaming a field:

RegisterRawUpcaster(registry, "UserCreated", 1, 2, func(data json.RawMessage) (json.RawMessage, error) {
    // Replace "name" with "full_name" using direct byte manipulation
    return bytes.Replace(data, []byte(`"name"`), []byte(`"full_name"`), 1), nil
})

func RegisterTypedUpcaster

func RegisterTypedUpcaster[From any, To any](
	registry *UpcasterRegistry,
	eventType string,
	fromVersion, toVersion int,
	fn func(From) (To, error),
)

RegisterTypedUpcaster is a generic helper that registers a type-safe upcaster. It handles JSON marshaling/unmarshaling so callers work with Go types directly.

Performance: Uses sync.Pool for marshal buffers to minimize allocations. The upcaster function itself is allocation-free if the caller doesn't allocate.

Example:

RegisterTypedUpcaster[OrderCreatedV1, OrderCreatedV2](registry, "OrderCreated", 1, 2,
    func(old OrderCreatedV1) (OrderCreatedV2, error) {
        return OrderCreatedV2{
            OrderID:  old.OrderID,
            Customer: old.Customer,
            Currency: "USD", // new field with default
        }, nil
    },
)

func ResolveEventType

func ResolveEventType(registry *EventRegistry, event any) (string, error)

ResolveEventType determines the type name for an event value. Priority: TypeNamer interface (deprecated) > registry reverse lookup > error.

Preferred path: register types with eskit.Register[E](reg), then the registry reverse lookup handles everything. TypeNamer is only checked for backward compatibility.

func ScopeStreamID

func ScopeStreamID(tenant TenantID, streamID string) string

ScopeStreamID returns "tenantID.streamID" — the scoped stream name. Panics if either argument is empty.

func ServeChanges

func ServeChanges(ctx context.Context, notifier *ChangeNotifier, projection string,
	renderFn func(ctx context.Context) error, opts ...ServeOption) error

ServeChanges subscribes to a projection on notifier and runs a render loop. It calls renderFn immediately (initial render), then again each time a change notification arrives. The function blocks until ctx is cancelled or the notifier is closed. Subscribe/unsubscribe is handled automatically.

If renderFn returns an error, the error is logged but the loop continues — a single failed render must not tear down the connection.

By default a 5 s fallback poll is enabled (see WithServePollFallback).

Example:

eskit.ServeChanges(ctx, notifier, "orders.order-list",
    func(ctx context.Context) error {
        orders, _ := reader.ListOrders(ctx)
        return sse.MergeFragmentTempl(OrderTable(orders),
            datastar.WithSelectorID("order-table"))
    },
)

func ServeChangesWatch

func ServeChangesWatch(ctx context.Context, notifier *ChangeNotifier,
	projection, streamID string,
	renderFn func(ctx context.Context) error, opts ...ServeOption) error

ServeChangesWatch is like ServeChanges but watches a specific entity identified by streamID within the given projection.

Example:

eskit.ServeChangesWatch(ctx, notifier, "orders.order-detail", orderID,
    func(ctx context.Context) error {
        order, _ := reader.GetOrder(ctx, orderID)
        return sse.MergeFragmentTempl(OrderDetail(order),
            datastar.WithSelectorID("order-detail"))
    },
)

func StreamExists

func StreamExists[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (bool, error)

StreamExists returns true if the stream has any events.

func StreamVersion

func StreamVersion[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (int, error)

StreamVersion returns the current version of a stream (latest event's version). Returns 0 if the stream has no events.

func SubscribeTo

func SubscribeTo[D any](d *EventDispatcher[any], name string, handle func(ctx context.Context, evt Event[any], data D) error)

SubscribeTo registers a type-safe subscription for a specific event data type. The event type name is derived automatically from D — no strings, no manual type assertions.

The handler receives the already-typed event data as the third argument. Events that don't match type D are skipped automatically.

Example:

eskit.SubscribeTo(dispatcher, "shipping.ship_when_paid",
    func(ctx context.Context, evt eskit.Event[any], e PaymentCompleted) error {
        return bus.Send(ctx, CreateShipment{OrderID: e.OrderID})
    },
)

func TypeName

func TypeName[E any](e Event[E]) string

TypeName returns the Go type name of an event's data payload. Useful for type-switch based event handling in projections. Returns "" for nil data.

func ValidateTenantID

func ValidateTenantID(id TenantID) error

ValidateTenantID checks if a tenant ID is valid. Returns an error if empty, too long, or doesn't match the allowed pattern.

func WithMeta

func WithMeta(ctx context.Context, meta Metadata) context.Context

WithMeta sets full metadata in the context. Use when you need complete control.

ctx = eskit.WithMeta(ctx, eskit.Metadata{CorrelationID: "custom", ...})

func WithTenant

func WithTenant(ctx context.Context, tenant TenantID) context.Context

WithTenant returns a new context with the tenant ID set. Panics if the tenant ID is invalid — this is a programmer error.

Types

type AppendOptions

type AppendOptions struct {
	// IdempotencyKey deduplicates retried appends. If a previous append
	// with this key succeeded, return the same events without re-appending.
	// Empty means no idempotency check.
	IdempotencyKey string

	// Timestamp overrides the auto-generated timestamp. Useful for migrations
	// and replaying events from another system. Zero value means use current time.
	Timestamp time.Time

	// Metadata is the event metadata (replaces variadic metadata parameter).
	Metadata Metadata
}

AppendOptions configures optional behavior for AppendWithOptions.

type AuditEntry

type AuditEntry struct {
	Timestamp time.Time
	Action    string // "read" or "write"
	StreamID  string
	Principal Principal
	EventIDs  []string // events read or written
	Extra     map[string]string
}

AuditEntry records a single audit log entry for compliance.

type AuditLogger

type AuditLogger interface {
	// LogRead records that events were read from a stream.
	LogRead(ctx context.Context, entry AuditEntry) error

	// LogWrite records that events were written to a stream.
	LogWrite(ctx context.Context, entry AuditEntry) error
}

AuditLogger records read and write access to event streams for compliance. Implementations should be non-blocking — audit failures must not break the write path.

type AutomationRunner

type AutomationRunner[E any] struct {
	// contains filtered or unexported fields
}

AutomationRunner subscribes to an EventBus and feeds events to an EventDispatcher. In cluster mode with queue groups, each event is processed by exactly ONE node — preventing duplicate side effects.

func NewAutomationRunner

func NewAutomationRunner[E any](bus EventBus, dispatcher *EventDispatcher[E]) *AutomationRunner[E]

NewAutomationRunner creates a runner that feeds bus events to the dispatcher.

func (*AutomationRunner[E]) Start

func (r *AutomationRunner[E]) Start(ctx context.Context, queueGroup string) error

Start subscribes to the event bus and begins feeding events to the dispatcher.

func (*AutomationRunner[E]) Stop

func (r *AutomationRunner[E]) Stop() error

Stop unsubscribes from the event bus.

type BusSubscription

type BusSubscription interface {
	Unsubscribe() error
}

BusSubscription represents an active event bus subscription. Call Unsubscribe to stop receiving events and release resources.

type Change

type Change struct {
	// Projection is the state-view name (e.g., "geography.geozone-list").
	Projection string

	// StreamID identifies the entity that changed (e.g., "geozone-42").
	StreamID string

	// EventType is the event that caused the change (e.g., "GeozoneRegistered").
	EventType string
}

Change describes a single state-view update. Emitted after a projection's Evolve succeeds for an event.

type ChangeNotifier

type ChangeNotifier struct {
	// contains filtered or unexported fields
}

ChangeNotifier provides in-process pub/sub for projection state changes. SSE handlers subscribe to projections and receive notifications when state-views are updated, triggering re-reads and client pushes.

Architecture:

  • Close-channel broadcast: Notify() closes a shared channel to wake all waiters in O(1), then allocates a new one. Cost is constant regardless of subscriber count — no fan-out loop.
  • Ring buffer per projection stores recent changes. Watch subscribers wake on broadcast, scan the ring for their streamID, and go back to sleep if nothing matches.
  • Natural coalescing: if 5 changes arrive while a subscriber is rendering, it wakes once, sees the sequence jumped by 5, and renders once. No silent drops, no lost notifications.
  • Zero per-subscriber channel allocation. Memory scales with projections, not connections.

Supports two subscription modes:

  • Subscribe(projection): receives all changes for a projection
  • Watch(projection, streamID): receives changes for a specific entity only

Safe for concurrent use.

func NewChangeNotifier

func NewChangeNotifier(opts ...ChangeNotifierOption) *ChangeNotifier

NewChangeNotifier creates a notification hub for projection changes.

func (*ChangeNotifier) Close

func (n *ChangeNotifier) Close()

Close shuts down the notifier and wakes all waiting subscribers.

func (*ChangeNotifier) Notify

func (n *ChangeNotifier) Notify(change Change)

Notify broadcasts a change to all subscribers of the named projection.

O(1) cost regardless of subscriber count — no channel iteration, no allocation per subscriber. The only allocation is one channel (to replace the wake channel).

Watch subscribers are woken too; they filter by streamID after waking.

func (*ChangeNotifier) SubCount

func (n *ChangeNotifier) SubCount() int64

SubCount returns the total number of active subscriptions across all projections.

func (*ChangeNotifier) Subscribe

func (n *ChangeNotifier) Subscribe(projection string) (WaitFunc, func())

Subscribe returns a WaitFunc that blocks until the projection has new changes. Each call blocks until at least one new change arrives since the previous call.

Natural coalescing: if 10 changes arrive while the caller is busy, the next call returns immediately and the caller re-renders once, not 10.

The returned cancel function releases resources. After cancel, the WaitFunc returns false immediately.

Usage:

wait, cancel := notifier.Subscribe("orders.order-list")
defer cancel()
for wait(ctx) {
    orders := reader.ListOrders(ctx)
    sse.MergeFragmentTempl(OrderTable(orders))
}

func (*ChangeNotifier) Watch

func (n *ChangeNotifier) Watch(projection, streamID string) (WaitFunc, func())

Watch returns a WaitFunc that blocks until the projection has a new change for the specified streamID. Like [Subscribe], but filters by entity.

Uses the ring buffer to check which streamIDs were affected. False wakeups (changes to other entities in the same projection) cause a brief wake + ring scan + back to sleep, costing ~10ns per ring slot checked.

Usage:

wait, cancel := notifier.Watch("orders.order-detail", orderID)
defer cancel()
for wait(ctx) {
    order := reader.GetOrder(ctx, orderID)
    sse.MergeFragmentTempl(OrderDetail(order))
}

type ChangeNotifierOption

type ChangeNotifierOption func(*ChangeNotifier)

ChangeNotifierOption configures a ChangeNotifier.

func WithRingBits

func WithRingBits(bits uint) ChangeNotifierOption

WithRingBits sets the ring buffer size as a power of 2. Default: 7 (128 slots). Minimum: 4 (16 slots). Maximum: 16 (65536 slots).

Larger rings help Watch subscribers that fall far behind, at the cost of more memory per projection (~48 bytes × ring size). For most workloads the default of 128 is plenty.

type ChangeRelay

type ChangeRelay interface {
	// Broadcast sends a change notification to all servers (including self).
	// Called from OnChange after a shared-consumer projection processes an event.
	// Must be non-blocking — implementations should not wait for delivery.
	Broadcast(change Change)

	// Start begins listening for remote change notifications and forwarding
	// them to the local ChangeNotifier. Blocks until ctx is cancelled.
	// Auto-reconnects on transient failures.
	Start(ctx context.Context)

	// Close stops the relay and releases resources.
	Close() error
}

ChangeRelay broadcasts projection changes across server instances. Use for shared-consumer projections where only one server processes each event and other servers need to be notified.

Implementations use transport-specific mechanisms:

  • pgstore.PGChangeRelay: PostgreSQL LISTEN/NOTIFY
  • sqlitestore.SQLiteChangeRelay: polling

The relay forwards received changes to a local ChangeNotifier so that SSE handlers on all servers receive notifications.

type ChannelEventBus

type ChannelEventBus struct {
	// contains filtered or unexported fields
}

ChannelEventBus is an in-process EventBus backed by Go channels. Suitable for single-node deployments and testing. Fan-out: every subscriber receives every published event.

func NewChannelEventBus

func NewChannelEventBus(opts ...ChannelEventBusOption) *ChannelEventBus

NewChannelEventBus creates a new in-process event bus.

func (*ChannelEventBus) Close

func (b *ChannelEventBus) Close()

Close shuts down the bus, closing all subscriber channels.

func (*ChannelEventBus) Publish

func (b *ChannelEventBus) Publish(_ context.Context, streamID string, events []EventEnvelope) error

Publish fans out events to all matching subscribers. Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber (slow consumer protection — subscribers should be fast or use buffering).

func (*ChannelEventBus) Subscribe

func (b *ChannelEventBus) Subscribe(_ context.Context, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)

Subscribe registers a handler for events on all streams.

func (*ChannelEventBus) SubscribeStream

func (b *ChannelEventBus) SubscribeStream(_ context.Context, streamID string, handler func(events []EventEnvelope)) (BusSubscription, error)

SubscribeStream registers a handler for events on a specific stream.

type ChannelEventBusOption

type ChannelEventBusOption func(*ChannelEventBus)

ChannelEventBusOption configures a ChannelEventBus.

func WithChannelBufferSize

func WithChannelBufferSize(size int) ChannelEventBusOption

WithChannelBufferSize sets the channel buffer size for each subscriber. Default: 256.

type CommandHandler

type CommandHandler[S any, C any, E any] struct {
	Decider       Decider[S, C, E]
	Store         EventStore[E]
	Snapshots     SnapshotStore[S]    // optional, may be nil
	SnapshotEvery int                 // take snapshot every N events, 0 = disabled (deprecated: use NewDeciderHandler + WithSnapshots)
	Dispatcher    *EventDispatcher[E] // optional, may be nil — dispatches events after persist
	// contains filtered or unexported fields
}

CommandHandler processes commands against a stream, persisting resulting events. This is the main entry point for executing domain logic.

Create with NewDeciderHandler for the functional options API, or initialize the struct directly for simple cases.

func NewDeciderHandler

func NewDeciderHandler[S any, C any, E any](
	store EventStore[E],
	decider Decider[S, C, E],
	opts ...HandlerOption[S, C, E],
) *CommandHandler[S, C, E]

NewDeciderHandler creates a CommandHandler with functional options. Snapshots are OFF by default — use WithSnapshots to enable.

Minimal usage:

handler := eskit.NewDeciderHandler(store, decider)

With snapshots:

handler := eskit.NewDeciderHandler(store, decider,
    eskit.WithSnapshots[MyState, MyCmd, MyEvent](snapStore,
        snapshot.Every(100),
        snapshot.SchemaVersion(2),
    ),
)

With event dispatch:

handler := eskit.NewDeciderHandler(store, decider,
    eskit.WithDispatcher[MyState, MyCmd, MyEvent](dispatcher),
)

func (*CommandHandler[S, C, E]) Handle

func (h *CommandHandler[S, C, E]) Handle(ctx context.Context, streamType, streamID string, command C) (S, []Event[E], error)

Handle executes a command against the stream identified by streamID. It loads events (optionally from a snapshot), runs the decider, and appends new events. Returns the new state and the produced events.

func (*CommandHandler[S, C, E]) Use

func (h *CommandHandler[S, C, E]) Use(mw Middleware[S, C, E])

Use adds middleware to the command handler. Middleware executes in the order added (first added = outermost). Panics if adding would exceed maxMiddleware.

type DLQ

type DLQ[E any] struct {
	// contains filtered or unexported fields
}

DLQ is a generic dead letter queue for failed projection events. Thread-safe: all methods are safe for concurrent use.

func NewDLQ

func NewDLQ[E any](cfg DLQConfig[E]) *DLQ[E]

NewDLQ creates a new dead letter queue.

func (*DLQ[E]) Close

func (q *DLQ[E]) Close() error

Close marks the DLQ as closed. Subsequent operations return ErrDLQClosed.

func (*DLQ[E]) Delete

func (q *DLQ[E]) Delete(ctx context.Context, id string) error

Delete removes a DLQ entry without replaying it.

func (*DLQ[E]) Get

func (q *DLQ[E]) Get(ctx context.Context, id string) (*DLQEntry[E], error)

Get retrieves a DLQ entry by ID.

func (*DLQ[E]) Len

func (q *DLQ[E]) Len() int

Len returns the current number of entries in the DLQ.

func (*DLQ[E]) List

func (q *DLQ[E]) List(ctx context.Context, limit, offset int) ([]DLQEntry[E], error)

List returns DLQ entries in insertion order (newest last). limit is clamped to maxDLQListLimit. offset skips entries from the start.

func (*DLQ[E]) Replay

func (q *DLQ[E]) Replay(ctx context.Context, id string) error

Replay re-processes a DLQ entry via the configured replay function. On success, the entry is removed from the DLQ. Thread-safe: concurrent Replay calls on the same entry are serialized — only one will execute replayFn, the other gets ErrDLQMessageNotFound.

func (*DLQ[E]) Send

func (q *DLQ[E]) Send(ctx context.Context, entry DLQEntry[E]) error

Send adds a failed event to the DLQ.

type DLQConfig

type DLQConfig[E any] struct {
	// MaxSize is the maximum number of entries. 0 = default (100000).
	MaxSize int

	// ReplayFn is called when replaying a DLQ entry. If nil, replay returns an error.
	ReplayFn func(ctx context.Context, entry DLQEntry[E]) error
}

DLQConfig configures the dead letter queue.

type DLQEntry

type DLQEntry[E any] struct {
	// ID is the unique identifier for this DLQ entry.
	ID string

	// Event is the original event that failed processing.
	Event Event[E]

	// HandlerName identifies which projection handler failed.
	HandlerName string

	// Error is the error message from the last failure.
	Error string

	// Attempts is the number of times processing was attempted.
	Attempts int

	// FirstFailure is when the event first failed.
	FirstFailure time.Time

	// LastFailure is when the event most recently failed.
	LastFailure time.Time
}

DLQEntry represents a failed event in the dead letter queue.

type Decider

type Decider[S any, C any, E any] struct {
	// Decide takes the current state and a command, returning events or an error.
	Decide func(state S, command C) ([]E, error)

	// Evolve applies an event to the current state, returning the new state.
	Evolve func(state S, event E) S

	// InitialState returns the zero/initial state for the decider.
	InitialState func() S
}

Decider defines the core event sourcing decider pattern using generics. It encapsulates domain logic as pure functions over commands, events, and state.

Type parameters:

  • S: the decider state type
  • C: the command type
  • E: the event type

func (Decider[S, C, E]) Fold

func (d Decider[S, C, E]) Fold(events []E) S

Fold replays a sequence of events from the initial state, returning the final state.

func (Decider[S, C, E]) FoldFrom

func (d Decider[S, C, E]) FoldFrom(state S, events []E) S

FoldFrom replays events starting from a given state.

func (Decider[S, C, E]) Validate

func (d Decider[S, C, E]) Validate() error

Validate checks that all required functions are set on the Decider. Returns an error describing any missing functions. Call this during setup to catch misconfigurations early instead of panicking at runtime.

type DeciderTest

type DeciderTest[S any, C any, E any] struct {
	// contains filtered or unexported fields
}

DeciderTest provides a Given/When/Then fluent API for testing deciders. Each test is a specification that maps directly to event modeling:

eskit.Test(t, decider).
    Given(OrderCreated{}, ItemAdded{SKU: "shoe"}).
    When(SubmitOrder{}).
    ThenExpect(OrderSubmitted{})

Internally: InitialState → Fold Given events → Decide(state, When command) → assert.

func Test

func Test[S any, C any, E any](t *testing.T, d Decider[S, C, E]) *DeciderTest[S, C, E]

Test creates a new decider test with the Given/When/Then fluent API.

func (*DeciderTest[S, C, E]) Given

func (dt *DeciderTest[S, C, E]) Given(events ...E) *DeciderTest[S, C, E]

Given sets the historical events that establish the initial state.

func (*DeciderTest[S, C, E]) ThenError

func (dt *DeciderTest[S, C, E]) ThenError(msg string)

ThenError asserts that Decide returns an error containing the given substring.

func (*DeciderTest[S, C, E]) ThenExpect

func (dt *DeciderTest[S, C, E]) ThenExpect(expected ...E)

ThenExpect asserts that Decide produces exactly the expected events. Uses cmp.Diff for readable diffs on mismatch.

func (*DeciderTest[S, C, E]) ThenExpectWith

func (dt *DeciderTest[S, C, E]) ThenExpectWith(opts []cmp.Option, expected ...E)

ThenExpectWith asserts that Decide produces exactly the expected events, using the provided cmp.Options for comparison. Useful for ignoring fields like timestamps:

ThenExpectWith([]cmp.Option{cmpopts.IgnoreFields(Event{}, "Timestamp")}, expected...)

func (*DeciderTest[S, C, E]) ThenNoEvents

func (dt *DeciderTest[S, C, E]) ThenNoEvents()

ThenNoEvents asserts that Decide produces zero events and no error.

func (*DeciderTest[S, C, E]) When

func (dt *DeciderTest[S, C, E]) When(cmd C) *DeciderTest[S, C, E]

When sets the command to execute against the state built from Given events.

type EncryptedEventStore

type EncryptedEventStore[E any] struct {
	// contains filtered or unexported fields
}

EncryptedEventStore wraps an EventStore to transparently encrypt/decrypt event data. Events whose type name (via typeNameFn) matches the encryption config are encrypted on Append and decrypted on Load/LoadFrom. Other events pass through unchanged.

Type parameter E must be serializable to/from JSON for encryption.

func NewEncryptedEventStore

func NewEncryptedEventStore[E any](cfg EncryptedStoreConfig[E]) *EncryptedEventStore[E]

NewEncryptedEventStore creates an encrypted wrapper around an event store. Panics if required config fields are nil — these are programmer errors.

func (*EncryptedEventStore[E]) Append

func (s *EncryptedEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error)

func (*EncryptedEventStore[E]) Load

func (s *EncryptedEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)

func (*EncryptedEventStore[E]) LoadFrom

func (s *EncryptedEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)

type EncryptedStoreConfig

type EncryptedStoreConfig[E any] struct {
	// Store is the underlying event store. Required.
	Store EventStore[E]
	// Encryptor handles encryption/decryption. Required.
	Encryptor *Encryptor
	// ShouldEncrypt returns true if this specific event should be encrypted.
	// If nil, all events are encrypted.
	ShouldEncrypt func(event E) bool
	// Serialize converts an event to bytes. Required.
	Serialize func(event E) ([]byte, error)
	// Deserialize converts bytes back to an event. Required.
	Deserialize func(data []byte) (E, error)
}

EncryptedStoreConfig configures an EncryptedEventStore.

type Encryptor

type Encryptor struct {
	// contains filtered or unexported fields
}

Encryptor handles AES-256-GCM envelope encryption with key rotation support.

func NewEncryptor

func NewEncryptor(keyStore KeyStore) *Encryptor

NewEncryptor creates an encryptor backed by the given key store. Panics if keyStore is nil — this is a configuration error.

func (*Encryptor) Decrypt

func (e *Encryptor) Decrypt(envelope []byte) ([]byte, error)

Decrypt decrypts an envelope back to plaintext. Automatically resolves the correct key from the envelope's key ID.

func (*Encryptor) Encrypt

func (e *Encryptor) Encrypt(plaintext []byte) ([]byte, error)

Encrypt encrypts plaintext using envelope encryption.

Envelope format (v1):

[version:1][keyIDLen:1][keyID:N][encryptedDEK:nonce(12)+ciphertext(32)+tag(16)=60][nonce:12][ciphertext+tag:...]

Returns the envelope bytes. Each call generates a unique random DEK, so encrypting the same plaintext twice produces different ciphertext.

type ErrUnknownEventType

type ErrUnknownEventType struct {
	EventType string
}

ErrUnknownEventType is returned when an event type is not registered.

func (ErrUnknownEventType) Error

func (e ErrUnknownEventType) Error() string

type Event

type Event[E any] struct {
	// ID is the unique identifier for this event record.
	ID string

	// StreamType identifies the type/category of stream (e.g., "order", "account").
	// Set by the store from the Decider's StreamType or passed explicitly.
	StreamType string

	// StreamID identifies the stream this event belongs to (just the business ID, e.g., "123").
	StreamID string

	// Version is the sequential version number within the stream (StreamPosition).
	Version int

	// EventType is the string name of the event (e.g., "OrderCreated").
	// Stores set this from the EventRegistry on append and load.
	// When no registry is configured, this may be empty.
	EventType string

	// GlobalSequence is the position in the global ordered stream across all aggregates.
	// Zero means not assigned (e.g., memory store doesn't track global sequence).
	GlobalSequence uint64

	// SchemaVersion tracks the event schema for upcasting. Default: 1.
	SchemaVersion int

	// Timestamp is when the event was recorded.
	Timestamp time.Time

	// Metadata holds optional event metadata (correlation, causation, principal, etc.).
	Metadata Metadata

	// Data is the domain event payload.
	Data E
}

Event represents a stored event in the event store.

func DeserializeRaw

func DeserializeRaw[E any](raw *RawEvent) (Event[E], error)

DeserializeRaw converts a RawEvent into a typed Event by deserializing the raw data. Uses JSON unmarshaling. For custom codecs, use DeserializeRawWith.

type EventBus

type EventBus interface {
	// Publish sends events to all subscribers. Called after successful store append.
	// streamID identifies which stream the events belong to.
	Publish(ctx context.Context, streamID string, events []EventEnvelope) error

	// Subscribe registers a handler for all events on all streams.
	// Returns a Subscription that must be closed when no longer needed.
	Subscribe(ctx context.Context, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)

	// SubscribeStream registers a handler for events on a specific stream.
	// Returns a Subscription that must be closed when no longer needed.
	SubscribeStream(ctx context.Context, streamID string, handler func(events []EventEnvelope)) (BusSubscription, error)
}

EventBus distributes events to interested subscribers after they are persisted. This is the notification layer — not the source of truth (that's EventStore).

type EventDispatcher

type EventDispatcher[E any] struct {
	// contains filtered or unexported fields
}

EventDispatcher fans out events to registered subscriptions filtered by event type.

Each subscription declares which event types it handles via EventTypes. Events with no matching subscription are skipped entirely — zero overhead. Subscriptions execute in registration order, synchronously, fail-fast.

EventDispatcher is the unified engine behind [Projector] (read models) and [Reactor] (side effects). Use the semantic alias that fits your intent.

Thread-safe for concurrent Dispatch calls. Register should be called during setup; concurrent Register and Dispatch is safe but not recommended.

func NewEventDispatcher

func NewEventDispatcher[E any]() *EventDispatcher[E]

NewEventDispatcher creates an EventDispatcher with no registered subscriptions.

func (*EventDispatcher[E]) Dispatch

func (d *EventDispatcher[E]) Dispatch(ctx context.Context, events []Event[E]) error

Dispatch fans out events to matching subscriptions.

For each event, only subscriptions whose EventTypes includes that event's type name are called. Match-all subscriptions (empty EventTypes) are always called.

Execution is fail-fast: the first error stops processing. No further subscriptions or events are processed after a failure.

func (*EventDispatcher[E]) HandlerCount

func (d *EventDispatcher[E]) HandlerCount() int

HandlerCount returns the number of registered subscriptions.

func (*EventDispatcher[E]) HandlesEvent

func (d *EventDispatcher[E]) HandlesEvent(typeName string) bool

HandlesEvent reports whether any registered subscription handles the given event type name.

func (*EventDispatcher[E]) Register

func (d *EventDispatcher[E]) Register(sub Subscription[E])

Register adds a subscription to the dispatcher. Subscriptions execute in registration order.

Panics if Name is empty, Handler is nil, EventTypes contains empty strings, or maxHandlers would be exceeded.

func (*EventDispatcher[E]) Replay

func (d *EventDispatcher[E]) Replay(ctx context.Context, store EventStore[E], streamType string, streamIDs []string) error

Replay loads all events from a store and dispatches them through all handlers. Useful for rebuilding read models from scratch.

type EventEnvelope

type EventEnvelope struct {
	ID        string `json:"id"`
	StreamID  string `json:"stream_id"`
	Version   int    `json:"version"`
	Type      string `json:"type,omitempty"`
	Data      []byte `json:"data"`
	Timestamp int64  `json:"timestamp"` // unix nano
}

EventEnvelope is a serialization-friendly event wrapper for the event bus. Unlike Event[E] which is generic, EventEnvelope uses []byte data so it can cross process boundaries without knowing the concrete event type.

type EventRegistry

type EventRegistry struct {
	// contains filtered or unexported fields
}

EventRegistry maps event type names to factory functions for deserialization. This allows stores to create the correct Go type when loading events.

func NewEventRegistry

func NewEventRegistry() *EventRegistry

NewEventRegistry creates an empty event registry.

func (*EventRegistry) Has

func (r *EventRegistry) Has(eventType string) bool

Has returns true if the event type is registered.

func (*EventRegistry) New

func (r *EventRegistry) New(eventType string) (any, error)

New creates a new instance of the registered event type. Returns an error if the type is not registered.

func (*EventRegistry) Register

func (r *EventRegistry) Register(eventType string, factory func() any)

Register maps an event type name to a factory function. The factory should return a pointer to a new zero-valued instance.

registry.Register("OrderCreated", func() any { return &OrderCreated{} })

func (*EventRegistry) TypeName

func (r *EventRegistry) TypeName(event any) string

TypeName returns the registered name for an event value. Uses the concrete type (dereferencing pointers). Returns empty string if not registered.

func (*EventRegistry) Types

func (r *EventRegistry) Types() []string

Types returns all registered event type names.

type EventStore

type EventStore[E any] interface {
	// Append persists new events for a stream. expectedVersion is the version
	// of the last known event (0 means new stream). Returns ErrConcurrencyConflict
	// if the current version doesn't match expectedVersion.
	// Pass AppendAny (-1) to skip the version check entirely (append-only mode).
	// The optional metadata parameter (first element used if provided) is applied to all events.
	Append(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error)

	// Load returns all events for a stream in version order.
	// Returns an empty slice (not an error) if the stream doesn't exist.
	Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)

	// LoadFrom returns events for a stream starting from a given version (inclusive).
	LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)
}

EventStore is the interface for persisting and loading events. Implementations must support optimistic concurrency control via versioning.

type EventStoreWithAppendOptions

type EventStoreWithAppendOptions[E any] interface {
	// AppendWithOptions persists new events with additional options.
	// If IdempotencyKey matches a previous successful append, returns the
	// original events without re-appending.
	AppendWithOptions(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, opts AppendOptions) ([]Event[E], error)
}

EventStoreWithAppendOptions extends EventStore with advanced append capabilities. Stores that support idempotency and custom timestamps implement this interface.

type EventStoreWithArchival

type EventStoreWithArchival[E any] interface {
	// Archive moves a stream's events to cold storage. The stream becomes
	// read-only from the archive store. Future Append calls return ErrStreamArchived.
	// Load on the primary store returns empty; load from the archive returns events.
	Archive(ctx context.Context, streamType, streamID string, target EventStore[E]) error

	// Restore moves an archived stream back to the primary store.
	// Removes the tombstone and deletes from the source store.
	Restore(ctx context.Context, streamType, streamID string, source EventStore[E]) error
}

EventStoreWithArchival extends EventStore with cold storage archival. Archive moves events to a target store and tombstones the primary. Restore moves events back from the source store.

type EventStoreWithDeletion

type EventStoreWithDeletion[E any] interface {
	// Delete permanently removes all events for a stream.
	// Returns ErrStreamNotFound if stream doesn't exist.
	// Also removes associated snapshots and checkpoints.
	Delete(ctx context.Context, streamType, streamID string) error

	// Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted.
	// Load returns empty. The tombstone record remains for audit.
	Tombstone(ctx context.Context, streamType, streamID string, reason string) error

	// IsTombstoned checks if a stream has been tombstoned.
	// Returns nil, nil if the stream is not tombstoned.
	IsTombstoned(ctx context.Context, streamType, streamID string) (*Tombstone, error)
}

EventStoreWithDeletion extends EventStore with stream deletion and tombstoning. Implementations should also clean up associated snapshots and checkpoints.

type EventStoreWithOptions

type EventStoreWithOptions[E any] interface {
	EventStore[E]

	// LoadWithOptions loads events with optional filtering.
	LoadWithOptions(ctx context.Context, streamType, streamID string, opts LoadOptions) ([]Event[E], error)
}

EventStoreWithOptions extends EventStore with filtered loading. Stores that support server-side filtering implement this for performance.

type HandleFunc

type HandleFunc[S any, E any] func(ctx context.Context) (S, []Event[E], error)

HandleFunc is the signature for the next handler in the middleware chain.

type HandlerOption

type HandlerOption[S any, C any, E any] func(*CommandHandler[S, C, E])

HandlerOption configures a CommandHandler created with NewDeciderHandler.

func WithConflictRetry

func WithConflictRetry[S any, C any, E any](maxRetries int) HandlerOption[S, C, E]

WithConflictRetry sets the maximum number of automatic retries on ErrConcurrencyConflict. Default is 0 (no retries, preserving backward compatibility). On each retry, the handler reloads events, re-evolves state, and re-decides.

func WithConflictRetryDelay

func WithConflictRetryDelay[S any, C any, E any](baseDelay time.Duration) HandlerOption[S, C, E]

WithConflictRetryDelay sets the base delay for exponential backoff between conflict retries. The actual delay is baseDelay * 2^attempt, capped at 1 second. Default is 10ms if retries are enabled but no delay is set.

func WithDispatcher

func WithDispatcher[S any, C any, E any](d *EventDispatcher[E]) HandlerOption[S, C, E]

WithDispatcher sets the event dispatcher for the handler. Events are dispatched to all matching subscriptions after successful persist.

eskit.WithDispatcher[S, C, E](dispatcher)

func WithSnapshots

func WithSnapshots[S any, C any, E any](store SnapshotStore[S], opts ...snapshot.Option) HandlerOption[S, C, E]

WithSnapshots enables snapshot support with optional configuration. If no options are provided, sensible defaults are used (every 100 events, schema version 1).

Minimal — just enable with defaults:

eskit.WithSnapshots[S, C, E](snapStore)

Custom threshold:

eskit.WithSnapshots[S, C, E](snapStore, snapshot.Every(500))

Full configuration:

eskit.WithSnapshots[S, C, E](snapStore,
    snapshot.Every(500),
    snapshot.SchemaVersion(2),
    snapshot.MinAge(5 * time.Minute),
    snapshot.Async(true),
)

type InstrumentedEventStore

type InstrumentedEventStore[E any] struct {
	// contains filtered or unexported fields
}

InstrumentedEventStore wraps an EventStore with logging and optional profiling.

func NewInstrumentedEventStore

func NewInstrumentedEventStore[E any](store EventStore[E], logger *slog.Logger, profiler *Profiler) *InstrumentedEventStore[E]

NewInstrumentedEventStore wraps a store with observability. Both logger and profiler are optional (pass nil to skip).

func (*InstrumentedEventStore[E]) Append

func (s *InstrumentedEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error)

func (*InstrumentedEventStore[E]) Load

func (s *InstrumentedEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)

func (*InstrumentedEventStore[E]) LoadFrom

func (s *InstrumentedEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)

func (*InstrumentedEventStore[E]) Ping

func (s *InstrumentedEventStore[E]) Ping(ctx context.Context) error

Ping checks store connectivity by loading events for a non-existent stream. If the underlying store implements a Ping method, it delegates to that.

type KeyStore

type KeyStore interface {
	// GetKey returns the key bytes for the given ID.
	GetKey(id string) ([]byte, error)

	// ActiveKeyID returns the ID of the current encryption key.
	ActiveKeyID() (string, error)
}

KeyStore provides access to encryption keys. Must be safe for concurrent use. Implementations: MemoryKeyStore (testing), or your own backed by Vault/KMS/etc.

type LoadOptions

type LoadOptions struct {
	// EventTypes filters by event type name. Empty means all types.
	EventTypes []string

	// FromVersion loads events starting from this version (inclusive). 0 means all.
	FromVersion int

	// Limit caps the number of events returned. 0 means no limit.
	Limit int
}

LoadOptions configures optional filtering for event loading. Zero values mean "no filter" — backward compatible.

type LockRegistry

type LockRegistry interface {
	// Acquire blocks until the lock for the stream is obtained, then returns
	// a release function. The caller MUST call release when done.
	// Returns an error if the context is cancelled or the lock cannot be obtained.
	Acquire(ctx context.Context, streamType, streamID string) (release func(), err error)

	// TryAcquire attempts to obtain the lock without blocking.
	// Returns a release function and true if successful, or nil and false if the lock is held.
	TryAcquire(streamType, streamID string) (release func(), ok bool)
}

LockRegistry manages per-stream locks for single-writer concurrency control. Implementations must be safe for concurrent use.

type MemoryKeyStore

type MemoryKeyStore struct {
	// contains filtered or unexported fields
}

MemoryKeyStore is an in-memory KeyStore for testing and development.

func NewMemoryKeyStore

func NewMemoryKeyStore() *MemoryKeyStore

NewMemoryKeyStore creates a new in-memory key store.

func (*MemoryKeyStore) ActiveKeyID

func (s *MemoryKeyStore) ActiveKeyID() (string, error)

func (*MemoryKeyStore) AddKey

func (s *MemoryKeyStore) AddKey(keyID string, key []byte)

AddKey adds a key to the store. The key must be exactly 32 bytes (AES-256). Panics if keyID is empty or key is wrong size — these are programmer errors.

func (*MemoryKeyStore) GetKey

func (s *MemoryKeyStore) GetKey(id string) ([]byte, error)

func (*MemoryKeyStore) SetActiveKey

func (s *MemoryKeyStore) SetActiveKey(keyID string)

SetActiveKey sets which key ID to use for new encryptions. Panics if the key hasn't been added yet — this is a configuration error.

type MemoryLockRegistry

type MemoryLockRegistry struct {
	// contains filtered or unexported fields
}

MemoryLockRegistry is an in-process LockRegistry backed by per-stream mutexes. Suitable for single-process deployments where all writes go through one instance.

func NewMemoryLockRegistry

func NewMemoryLockRegistry() *MemoryLockRegistry

NewMemoryLockRegistry creates a new in-process lock registry.

func (*MemoryLockRegistry) Acquire

func (r *MemoryLockRegistry) Acquire(ctx context.Context, streamType, streamID string) (func(), error)

Acquire blocks until the lock for streamID is obtained. Respects context cancellation via a background goroutine.

func (*MemoryLockRegistry) TryAcquire

func (r *MemoryLockRegistry) TryAcquire(streamType, streamID string) (func(), bool)

TryAcquire attempts a non-blocking lock acquisition.

type MemorySnapshotStore

type MemorySnapshotStore[S any] struct {
	// contains filtered or unexported fields
}

MemorySnapshotStore is an in-memory SnapshotStore implementation. Useful for testing and single-process applications. Safe for concurrent use.

func NewMemorySnapshotStore

func NewMemorySnapshotStore[S any]() *MemorySnapshotStore[S]

NewMemorySnapshotStore creates a new in-memory snapshot store.

func (*MemorySnapshotStore[S]) Invalidate

func (s *MemorySnapshotStore[S]) Invalidate(_ context.Context, streamType, streamID string) error

Invalidate removes the snapshot for a single stream.

func (*MemorySnapshotStore[S]) InvalidateAll

func (s *MemorySnapshotStore[S]) InvalidateAll(_ context.Context) error

InvalidateAll removes all snapshots.

func (*MemorySnapshotStore[S]) LoadSnapshot

func (s *MemorySnapshotStore[S]) LoadSnapshot(_ context.Context, streamType, streamID string) (*Snapshot[S], error)

func (*MemorySnapshotStore[S]) SaveSnapshot

func (s *MemorySnapshotStore[S]) SaveSnapshot(_ context.Context, snapshot Snapshot[S]) error

type MemoryStore

type MemoryStore[E any] struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory EventStore implementation, useful for testing.

func NewMemoryStore

func NewMemoryStore[E any](opts ...MemoryStoreOption[E]) *MemoryStore[E]

NewMemoryStore creates a new in-memory event store.

func (*MemoryStore[E]) Append

func (s *MemoryStore[E]) Append(_ context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error)

func (*MemoryStore[E]) AppendWithOptions

func (s *MemoryStore[E]) AppendWithOptions(_ context.Context, streamType, streamID string, expectedVersion int, events []E, opts AppendOptions) ([]Event[E], error)

AppendWithOptions persists new events with idempotency and custom timestamp support.

func (*MemoryStore[E]) Archive

func (s *MemoryStore[E]) Archive(ctx context.Context, streamType, streamID string, target EventStore[E]) error

Archive moves a stream to the target store and tombstones the primary.

func (*MemoryStore[E]) ArchiveStream

func (s *MemoryStore[E]) ArchiveStream(ctx context.Context, streamType, streamID string) error

ArchiveStream marks a stream as archived. Future appends are rejected with ErrStreamArchived.

func (*MemoryStore[E]) Delete

func (s *MemoryStore[E]) Delete(_ context.Context, streamType, streamID string) error

Delete permanently removes all events for a stream from the memory store. Returns ErrStreamNotFound if stream does not exist.

func (*MemoryStore[E]) DeleteStream

func (s *MemoryStore[E]) DeleteStream(ctx context.Context, streamType, streamID string) error

DeleteStream permanently removes all events in a stream. Irreversible.

func (*MemoryStore[E]) IsTombstoned

func (s *MemoryStore[E]) IsTombstoned(_ context.Context, streamType, streamID string) (*Tombstone, error)

IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.

func (*MemoryStore[E]) LatestSequence

func (s *MemoryStore[E]) LatestSequence(_ context.Context) (uint64, error)

LatestSequence returns the highest global sequence, or 0 if empty.

func (*MemoryStore[E]) Load

func (s *MemoryStore[E]) Load(_ context.Context, streamType, streamID string) ([]Event[E], error)

func (*MemoryStore[E]) LoadFrom

func (s *MemoryStore[E]) LoadFrom(_ context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)

func (*MemoryStore[E]) LoadRaw

func (s *MemoryStore[E]) LoadRaw(_ context.Context, streamType, streamID string) ([]*RawEvent, error)

LoadRaw loads events without deserializing the Data field.

func (*MemoryStore[E]) LoadWithOptions

func (s *MemoryStore[E]) LoadWithOptions(_ context.Context, streamType, streamID string, opts LoadOptions) ([]Event[E], error)

LoadWithOptions loads events with optional filtering for the memory store.

func (*MemoryStore[E]) ReadByStreamType

func (s *MemoryStore[E]) ReadByStreamType(_ context.Context, streamType string, fromSequence uint64, limit int) ([]Event[E], error)

ReadByStreamType reads global events filtered by stream type.

func (*MemoryStore[E]) ReadFrom

func (s *MemoryStore[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]Event[E], error)

ReadFrom implements GlobalReader — reads events by global sequence.

func (*MemoryStore[E]) ReadFromWithOptions

func (s *MemoryStore[E]) ReadFromWithOptions(_ context.Context, fromSequence uint64, limit int, opts LoadOptions) ([]Event[E], error)

ReadFromWithOptions reads global events with optional event type filtering.

func (*MemoryStore[E]) Restore

func (s *MemoryStore[E]) Restore(ctx context.Context, streamType, streamID string, source EventStore[E]) error

Restore moves an archived stream back from the source store to the primary.

func (*MemoryStore[E]) RestoreStream

func (s *MemoryStore[E]) RestoreStream(ctx context.Context, streamType, streamID string) error

RestoreStream brings an archived stream back to active state by removing the tombstone.

func (*MemoryStore[E]) StreamStatus

func (s *MemoryStore[E]) StreamStatus(_ context.Context, streamType, streamID string) (StreamState, error)

StreamStatus returns the current lifecycle state of a stream.

func (*MemoryStore[E]) Tombstone

func (s *MemoryStore[E]) Tombstone(_ context.Context, streamType, streamID string, reason string) error

Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. Load returns empty. The tombstone record remains for audit.

func (*MemoryStore[E]) TombstoneStream

func (s *MemoryStore[E]) TombstoneStream(ctx context.Context, streamType, streamID string) error

TombstoneStream marks a stream as deleted. Future appends are rejected with ErrStreamDeleted.

type MemoryStoreOption

type MemoryStoreOption[E any] func(*MemoryStore[E])

MemoryStoreOption configures a MemoryStore.

func WithMemoryMarshal

func WithMemoryMarshal[E any](marshal func(any) ([]byte, error)) MemoryStoreOption[E]

WithMemoryMarshal sets a custom marshal function for event serialization. This accepts any codec.Codec-compatible marshal function. Defaults to encoding/json if not set.

func WithMemoryRegistry

func WithMemoryRegistry[E any](reg *EventRegistry) MemoryStoreOption[E]

WithMemoryRegistry enables type registry for heterogeneous event deserialization.

func WithMemoryUpcasters

func WithMemoryUpcasters[E any](u *UpcasterRegistry) MemoryStoreOption[E]

WithMemoryUpcasters enables event upcasting for schema evolution during Load.

type Metadata

type Metadata struct {
	// CorrelationID groups related events across streams/services.
	CorrelationID string `json:"correlation_id,omitempty"`

	// CausationID identifies the event or command that caused this event.
	CausationID string `json:"causation_id,omitempty"`

	// Initiator is who directly caused this event (e.g., the user clicking "submit").
	Initiator Principal `json:"initiator,omitempty"`

	// Originator is who started the causal chain (e.g., the user who created the order,
	// even if this event was produced by an automation triggered by that order).
	Originator Principal `json:"originator,omitempty"`

	// IdempotencyKey is an optional key for idempotent appends.
	// When set, pgstore will deduplicate: if events with this key already exist
	// for the stream, Append returns the existing events instead of inserting duplicates.
	IdempotencyKey string `json:"idempotency_key,omitempty"`

	// Extra holds arbitrary key-value pairs for extensibility.
	// Bounded: max MetadataEntriesMax entries, max MetadataMaxSizeBytes total size.
	Extra map[string]string `json:"extra,omitempty"`
}

Metadata holds contextual information about an event for tracing, audit, and debugging.

func MetaFromContext

func MetaFromContext(ctx context.Context) (Metadata, bool)

MetaFromContext reads metadata from the context. Returns zero Metadata and false if not set.

The command bus calls this internally — you typically don't need to.

func (*Metadata) MetadataRange

func (m *Metadata) MetadataRange(fn func(key, value string) bool)

MetadataRange iterates over Extra entries without allocating a copy. Return false from fn to stop iteration early.

func (*Metadata) SetExtra

func (m *Metadata) SetExtra(key, value string) error

SetExtra sets a key-value pair in the Extra map, enforcing TigerStyle bounds. Returns an error if bounds would be exceeded.

type Middleware

type Middleware[S any, C any, E any] func(ctx context.Context, streamType, streamID string, cmd C, next HandleFunc[S, E]) (S, []Event[E], error)

Middleware intercepts command handling. It receives the command context and calls next to continue the chain. Middleware can:

  • Validate or enrich commands before handling
  • Log, meter, or trace command execution
  • Transform or filter the result after handling

The next function executes the rest of the middleware chain and the actual handler.

func LoggingMiddleware

func LoggingMiddleware[S any, C any, E any](logger *slog.Logger) Middleware[S, C, E]

Logger returns a middleware that logs command handling using slog.

func ProfilerMiddleware

func ProfilerMiddleware[S any, C any, E any](profiler *Profiler) Middleware[S, C, E]

ProfilerMiddleware returns a middleware that records command handling durations. The operation name is "CommandHandler.Handle" for all commands. For per-command-type profiling, use ProfilerMiddlewareWithType.

func ProfilerMiddlewareFunc

func ProfilerMiddlewareFunc[S any, C any, E any](profiler *Profiler, nameFunc func(C) string) Middleware[S, C, E]

ProfilerMiddlewareFunc returns a middleware that uses a custom function to derive the operation name from the command. This allows per-command-type profiling.

func SingleWriterMiddleware

func SingleWriterMiddleware[S any, C any, E any](registry LockRegistry, reject bool) Middleware[S, C, E]

SingleWriterMiddleware returns a middleware that serializes command handling per stream using the provided LockRegistry. This eliminates optimistic concurrency retries — if you hold the lock, no one else can write.

When reject is true, commands that can't immediately acquire the lock are rejected with an error instead of blocking.

func WithLogging

func WithLogging[S any, C any, E any](logger *slog.Logger) Middleware[S, C, E]

WithLogging returns middleware that logs command handling using the provided slog.Logger. It logs the command type, stream ID, duration, number of events produced, and any error.

func WithMetrics

func WithMetrics[S any, C any, E any]() Middleware[S, C, E]

WithMetrics is a placeholder for OpenTelemetry metrics middleware. Since OTel is not a core dependency, this serves as documentation for how to add metrics to your command handling pipeline.

To implement metrics with OpenTelemetry, create middleware like:

func WithOTelMetrics[S any, C any, E any](meter metric.Meter) eskit.Middleware[S, C, E] {
    commandDuration, _ := meter.Float64Histogram("eskit.command.duration",
        metric.WithUnit("s"),
        metric.WithDescription("Command handling duration"),
    )
    commandCount, _ := meter.Int64Counter("eskit.command.count",
        metric.WithDescription("Total commands handled"),
    )
    return func(ctx context.Context, streamID string, cmd C, next eskit.HandleFunc[S, E]) (S, []Event[E], error) {
        start := time.Now()
        state, events, err := next(ctx)
        attrs := []attribute.KeyValue{
            attribute.String("command", fmt.Sprintf("%T", cmd)),
            attribute.Bool("error", err != nil),
        }
        commandDuration.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes(attrs...))
        commandCount.Add(ctx, 1, metric.WithAttributes(attrs...))
        return state, events, err
    }
}

func WithRetry

func WithRetry[S any, C any, E any](maxRetries int, baseDelay time.Duration) Middleware[S, C, E]

WithRetry returns middleware that retries command handling on transient errors. It uses exponential backoff with jitter starting from baseDelay. Only errors wrapped with or matching ErrConcurrencyConflict are retried.

type MultiOption

type MultiOption func(*multiSubConfig)

MultiOption configures a MultiSubscription.

func OnChange

func OnChange(projection string, handler func()) MultiOption

OnChange registers a handler that fires whenever any stream in the named projection changes. Use this for list views, dashboards, and summary pages.

eskit.OnChange("orders.order-list", func() {
    orders := repo.ListOrders(ctx)
    sse.PatchElementTempl(OrderList(orders))
})

func OnChangeWithKey

func OnChangeWithKey(projection, key string, handler func()) MultiOption

OnChangeWithKey registers a handler that fires only when the specific stream identified by key changes within the named projection. Use this for detail pages and entity-specific panels where you only care about one stream.

eskit.OnChangeWithKey("orders.order-detail", orderID, func() {
    order := repo.GetOrder(ctx, orderID)
    sse.PatchElementTempl(OrderDetail(order))
})

func WithDebounce

func WithDebounce(d time.Duration) MultiOption

WithDebounce enables change coalescing. When set, after receiving a change notification the MultiSubscription waits for the debounce duration collecting additional changes, then fires each affected handler exactly once.

This is useful when a single user action triggers many events (e.g., bulk import) and you want to avoid redundant re-renders.

Set to 0 (default) to disable debouncing.

func WithPollFallback

func WithPollFallback(d time.Duration) MultiOption

WithPollFallback enables periodic synthetic notifications as a safety net, ensuring SSE handlers re-read from the database even if push notifications are lost. The poll timer resets after every real notification, preventing spurious renders when the system is healthy.

Set to 0 (default) to disable polling.

type MultiSubscription

type MultiSubscription struct {
	// contains filtered or unexported fields
}

MultiSubscription manages multiple projection subscriptions with fan-in dispatch, optional debounce coalescing, and poll fallback. It simplifies SSE handlers that need to react to changes from several projections by consolidating them into a single Run loop.

Example usage:

notifier := eskit.NewChangeNotifier()
ms := eskit.NewMultiSubscription(notifier,
    eskit.OnChange("orders.order-list", func() {
        // re-render order list
    }),
    eskit.OnChangeWithKey("orders.order-detail", "order-42", func() {
        // re-render order 42 detail
    }),
    eskit.WithPollFallback(5*time.Second),
    eskit.WithDebounce(100*time.Millisecond),
)
ms.RenderAll() // initial render
go ms.Run(ctx) // blocks until ctx cancelled
// later:
ms.Close()

func NewMultiSubscription

func NewMultiSubscription(notifier *ChangeNotifier, opts ...MultiOption) *MultiSubscription

NewMultiSubscription creates a MultiSubscription that manages fan-in dispatch for multiple projection subscriptions. At least one OnChange or OnChangeWithKey option must be provided.

Panics if notifier is nil or no subscriptions are configured.

func (*MultiSubscription) Close

func (m *MultiSubscription) Close()

Close stops all subscriptions and fan-in goroutines. Safe to call multiple times; subsequent calls are no-ops.

func (*MultiSubscription) RenderAll

func (m *MultiSubscription) RenderAll()

RenderAll fires every registered handler exactly once. Typically called before Run to perform the initial render of all projections.

func (*MultiSubscription) Run

func (m *MultiSubscription) Run(ctx context.Context)

Run subscribes to all configured projections and blocks, dispatching handlers as changes arrive. It returns when ctx is cancelled or [Close] is called.

If debounce is configured, rapid changes to the same projection are coalesced so each handler fires at most once per debounce window.

If poll fallback is configured, the poll timer resets after every real dispatch, preventing spurious renders when notifications are flowing.

type NopAuditLogger

type NopAuditLogger struct{}

NopAuditLogger is an AuditLogger that discards all entries.

func (NopAuditLogger) LogRead

func (NopAuditLogger) LogWrite

type OperationStats

type OperationStats struct {
	Name    string  `json:"name"`
	Count   int64   `json:"count"`
	TotalMs float64 `json:"total_ms"`
	AvgMs   float64 `json:"avg_ms"`
	P50Ms   float64 `json:"p50_ms"`
	P95Ms   float64 `json:"p95_ms"`
	P99Ms   float64 `json:"p99_ms"`
	MaxMs   float64 `json:"max_ms"`
	MinMs   float64 `json:"min_ms"`
	LastMs  float64 `json:"last_ms"`
	// Trend is the ratio of recent P95 to older P95. >1.0 means getting slower.
	// 0 if insufficient data.
	Trend float64 `json:"trend"`
}

OperationStats contains timing statistics for a named operation.

func (OperationStats) TrendLabel

func (s OperationStats) TrendLabel() string

TrendLabel returns a human-readable trend indicator.

type Principal

type Principal struct {
	Kind PrincipalKind `json:"kind,omitempty"`
	ID   string        `json:"id,omitempty"`
}

Principal represents who is performing an action. Initiator is who directly caused it; Originator is who started the chain.

func (Principal) IsZero

func (p Principal) IsZero() bool

IsZero returns true if the principal is not set.

type PrincipalKind

type PrincipalKind string

PrincipalKind defines the type of actor initiating an action.

const (
	PrincipalUser       PrincipalKind = "user"
	PrincipalAutomation PrincipalKind = "automation"
	PrincipalService    PrincipalKind = "service"
	PrincipalSystem     PrincipalKind = "system"
	PrincipalScheduler  PrincipalKind = "scheduler"
	PrincipalAPI        PrincipalKind = "api"
)

type Profiler

type Profiler struct {
	// contains filtered or unexported fields
}

Profiler tracks operation timings with a rolling window and computes percentile stats.

func NewProfiler

func NewProfiler() *Profiler

NewProfiler creates a profiler with the default window size.

func NewProfilerWithWindow

func NewProfilerWithWindow(windowSize int) *Profiler

NewProfilerWithWindow creates a profiler with a custom rolling window size.

func (*Profiler) AllStats

func (p *Profiler) AllStats() []OperationStats

AllStats returns statistics for all tracked operations, sorted by name.

func (*Profiler) Degrading

func (p *Profiler) Degrading() []OperationStats

Degrading returns operations whose recent P95 is trending higher than historical.

func (*Profiler) Handler

func (p *Profiler) Handler() http.Handler

Handler returns an http.Handler that serves profiling stats as JSON.

func (*Profiler) Record

func (p *Profiler) Record(operation string, duration time.Duration)

Record adds a timing sample for an operation.

func (*Profiler) Reset

func (p *Profiler) Reset()

Reset clears all tracked data.

func (*Profiler) Slowest

func (p *Profiler) Slowest(n int) []OperationStats

Slowest returns the top N operations by P95 latency.

func (*Profiler) Stats

func (p *Profiler) Stats(operation string) OperationStats

Stats returns statistics for a single operation. Returns zero stats if not found.

type ProjectionRunner

type ProjectionRunner[E any] struct {
	// contains filtered or unexported fields
}

ProjectionRunner subscribes to an EventBus and feeds events to an EventDispatcher. In single-node mode, all events reach all subscriptions. In cluster mode with a QueueSubscriber (e.g., NATS event bus), use queue groups so each event is processed by exactly ONE node.

func NewProjectionRunner

func NewProjectionRunner[E any](bus EventBus, dispatcher *EventDispatcher[E]) *ProjectionRunner[E]

NewProjectionRunner creates a runner that feeds bus events to the dispatcher.

func (*ProjectionRunner[E]) Start

func (r *ProjectionRunner[E]) Start(ctx context.Context, queueGroup string) error

Start subscribes to the event bus and begins feeding events to the dispatcher.

func (*ProjectionRunner[E]) Stop

func (r *ProjectionRunner[E]) Stop() error

Stop unsubscribes from the event bus.

type ProjectionTest

type ProjectionTest[E any] struct {
	// contains filtered or unexported fields
}

ProjectionTest provides a Given/Then fluent API for testing projection handlers. Each test is a specification that maps directly to event modeling read models:

eskit.TestProjection(t, handler).
    Given("order-1",
        OrderCreated{ID: "order-1", Total: 100},
        ItemAdded{OrderID: "order-1", SKU: "shoe"},
    ).
    Then(func(t *testing.T) {
        // Query your read model and assert results
    })

Internally: wraps events in Event[E] with incremental versions and timestamps, then feeds them through the handler.

func TestProjection

func TestProjection[E any](t *testing.T, handler func(ctx context.Context, event Event[E]) error) *ProjectionTest[E]

TestProjection creates a new projection test with the Given/Then fluent API. This is the projection counterpart to Test for deciders.

func (*ProjectionTest[E]) Given

func (pt *ProjectionTest[E]) Given(streamID string, events ...E) *ProjectionTest[E]

Given feeds events for a stream through the handler. Events are wrapped in Event[E] with incremental versions (starting at 1) and timestamps offset by 1 second from DefaultBaseTime.

Call Given multiple times for events from different streams:

eskit.TestProjection(t, handler).
    Given("order-1", OrderCreated{}).
    Given("order-2", OrderCreated{}).
    Then(func(t *testing.T) { ... })

func (*ProjectionTest[E]) Then

func (pt *ProjectionTest[E]) Then(fn func(t *testing.T))

Then projects all Given events through the handler, then calls fn for assertions. If any event fails to project, the test fails immediately.

type QueueSubscriber

type QueueSubscriber interface {
	SubscribeQueue(ctx context.Context, queueGroup string, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)
}

QueueSubscriber is an optional interface for EventBus implementations that support queue groups (e.g., NATS). Queue groups ensure each event is processed by exactly ONE subscriber in the group — useful for distributed projections and processors.

type RawEvent

type RawEvent struct {
	ID             string
	StreamType     string
	StreamID       string
	Version        int
	EventType      string
	GlobalSequence uint64
	SchemaVersion  int
	Timestamp      time.Time
	Metadata       Metadata

	// Codec is the name of the codec used to serialize this event.
	// Empty string defaults to "json" for backward compatibility.
	Codec string
	// contains filtered or unexported fields
}

RawEvent holds an event with deferred deserialization. The Data field is only deserialized when Decode() is called. This avoids allocation overhead when you only need metadata or a subset of events.

func (*RawEvent) Decode

func (r *RawEvent) Decode(target any) error

Decode deserializes the event data into the target type. If a registry is configured, it uses it for type resolution. The result is cached — subsequent calls return the same value.

func (*RawEvent) DecodeWithRegistry

func (r *RawEvent) DecodeWithRegistry() (any, error)

DecodeWithRegistry deserializes using the event registry (for interface types). Result is cached after first call.

func (*RawEvent) RawData

func (r *RawEvent) RawData() []byte

RawData returns the raw JSON bytes of the event data.

func (*RawEvent) SetRawData

func (r *RawEvent) SetRawData(data []byte, registry *EventRegistry)

SetRawData sets the raw JSON bytes and optional registry (used by stores).

type ResilientDispatcher

type ResilientDispatcher[E any] struct {
	// contains filtered or unexported fields
}

ResilientDispatcher wraps an EventDispatcher with retry and DLQ support. On handler failure, it retries with exponential backoff. If all retries are exhausted, the event goes to the DLQ instead of blocking.

func NewResilientDispatcher

func NewResilientDispatcher[E any](cfg ResilientDispatcherConfig[E]) *ResilientDispatcher[E]

NewResilientDispatcher creates a dispatcher with retry and DLQ support. Panics if Dispatcher or DLQ is nil — these are configuration errors.

func (*ResilientDispatcher[E]) Dispatch

func (rd *ResilientDispatcher[E]) Dispatch(ctx context.Context, events []Event[E]) error

Dispatch processes events with retry and DLQ support. Events that fail all retry attempts are sent to the DLQ. Returns nil even if events go to the DLQ — the caller should not be blocked. Returns an error only if the DLQ itself fails.

type ResilientDispatcherConfig

type ResilientDispatcherConfig[E any] struct {
	// Dispatcher is the underlying event dispatcher. Required.
	Dispatcher *EventDispatcher[E]
	// DLQ receives events that fail all retries. Required.
	DLQ *DLQ[E]
	// MaxRetries before sending to DLQ. 0 = default (5).
	MaxRetries int
	// BaseDelay for exponential backoff. 0 = default (100ms).
	BaseDelay time.Duration
	// MaxDelay caps the backoff. 0 = default (30s).
	MaxDelay time.Duration
}

ResilientDispatcherConfig configures retry and DLQ behavior.

type ServeOption

type ServeOption func(*serveConfig)

ServeOption configures the behaviour of ServeChanges and ServeChangesWatch.

func WithServePollFallback

func WithServePollFallback(d time.Duration) ServeOption

WithServePollFallback sets the fallback poll interval used to guarantee periodic re-renders even when no real change notifications arrive. The default is 5 s. Set to 0 to disable fallback polling entirely.

type Snapshot

type Snapshot[S any] struct {
	StreamType    string
	StreamID      string
	Version       int
	State         S
	SchemaVersion int       // tracks schema for invalidation
	CreatedAt     time.Time // when the snapshot was saved
}

Snapshot represents a point-in-time snapshot of decider state.

type SnapshotStore

type SnapshotStore[S any] interface {
	// SaveSnapshot persists a snapshot of decider state at a given version.
	// If a snapshot already exists for the stream, it is overwritten.
	SaveSnapshot(ctx context.Context, snapshot Snapshot[S]) error

	// LoadSnapshot loads the latest snapshot for a stream.
	// Returns nil, nil if no snapshot exists (not an error).
	LoadSnapshot(ctx context.Context, streamType, streamID string) (*Snapshot[S], error)

	// Invalidate deletes the snapshot for a single stream.
	// Returns nil if no snapshot existed.
	Invalidate(ctx context.Context, streamType, streamID string) error

	// InvalidateAll deletes all snapshots. Use after deploying schema changes
	// or Evolve logic changes that affect all streams.
	InvalidateAll(ctx context.Context) error
}

SnapshotStore is an optional interface for persisting decider state snapshots. Implementations must be safe for concurrent use.

type StateView

type StateView[E any] struct {
	// Name identifies this projection (used for checkpoint tracking).
	Name string

	// EventTypes lists the event types this view cares about.
	// Empty = all events (not recommended).
	EventTypes []string

	// Evolve applies a single event to the read model.
	Evolve func(ctx context.Context, event Event[E]) error

	// Setup initializes the read model (e.g., CREATE TABLE). Optional.
	Setup func(ctx context.Context) error

	// Reset clears the read model for rebuild (e.g., TRUNCATE TABLE). Optional.
	Reset func(ctx context.Context) error

	// OnChange is called after Evolve succeeds for each event.
	// Use this to notify SSE handlers or other watchers that the projection
	// has been updated for a specific stream. Optional.
	//
	// Implementations must be non-blocking — slow consumers should be dropped
	// rather than delaying event processing.
	OnChange func(streamID string, eventType string)

	// AtomicCheckpoint indicates that the Evolve function saves the checkpoint
	// within its own transaction, making Evolve + checkpoint atomic.
	// When true, the subscription will NOT save the checkpoint separately —
	// Evolve is responsible for persisting it (using CheckpointFromContext
	// from the subscription package to get consumer ID and sequence).
	//
	// This prevents double processing of non-idempotent side effects
	// (e.g., sending emails, charging payments) on crash recovery.
	AtomicCheckpoint bool
}

StateView defines a read-model projection with lifecycle hooks. Used with the subscription package for durable, checkpoint-based projections.

StateView is a configuration struct — not a dispatcher. Wire it into the event stream with subscription.FromStateView().

type Stream

type Stream struct {
	// Type is the aggregate category (e.g., "order", "account"). Required.
	Type string

	// ID is the business identifier (e.g., "123", "acc-456"). Required.
	ID string
}

Stream identifies a stream by type and business ID.

func (Stream) String

func (s Stream) String() string

String returns "type/id".

func (Stream) Validate

func (s Stream) Validate() error

Validate checks that required fields are set.

type StreamLifecycle

type StreamLifecycle[E any] interface {
	// ArchiveStream moves a stream to cold storage. Archived streams cannot receive new events.
	ArchiveStream(ctx context.Context, streamType, streamID string) error

	// RestoreStream brings an archived stream back to active state.
	RestoreStream(ctx context.Context, streamType, streamID string) error

	// TombstoneStream marks a stream as deleted. Future appends are rejected.
	// Events can still be read (for audit). Use DeleteStream for hard delete.
	TombstoneStream(ctx context.Context, streamType, streamID string) error

	// DeleteStream permanently removes all events in a stream. Irreversible.
	DeleteStream(ctx context.Context, streamType, streamID string) error

	// StreamStatus returns the current lifecycle state of a stream.
	StreamStatus(ctx context.Context, streamType, streamID string) (StreamState, error)
}

StreamLifecycle provides stream lifecycle management.

type StreamState

type StreamState int

StreamState represents the lifecycle state of a stream.

const (
	// StreamActive is the default state — the stream accepts appends.
	StreamActive StreamState = iota
	// StreamArchived means the stream is in cold storage. Appends are rejected.
	StreamArchived
	// StreamTombstoned means the stream is soft-deleted. Appends are rejected.
	// Events may still be readable for audit purposes.
	StreamTombstoned
)

func (StreamState) String

func (s StreamState) String() string

String returns a human-readable representation of the stream state.

type StreamTypeReader

type StreamTypeReader[E any] interface {
	// ReadByStreamType returns events filtered by stream type, starting from
	// the given global sequence (inclusive), up to limit events.
	ReadByStreamType(ctx context.Context, streamType string, fromSequence uint64, limit int) ([]Event[E], error)
}

StreamTypeReader reads events filtered by stream type. Stores that support indexed stream type queries implement this.

type Subscription

type Subscription[E any] struct {
	// Name identifies this subscription for error messages and debugging. Required.
	Name string

	// EventTypes lists event type names this subscription handles (from [TypeName]).
	// Empty = all events (match-all).
	EventTypes []string

	// Handler processes a matched event. Required.
	Handler func(ctx context.Context, event Event[E]) error
}

Subscription defines a named event handler with optional type filtering.

EventTypes declares which event type names this subscription handles. Events with no matching subscription are skipped — zero overhead. Empty EventTypes means ALL events (match-all).

Use Subscription for both read-model projections and side-effect reactions. The EventDispatcher treats them identically; semantics are up to you.

type TenantEventStore

type TenantEventStore[E any] struct {
	// contains filtered or unexported fields
}

TenantEventStore wraps an EventStore to add tenant-scoping. All stream IDs are transparently prefixed with the tenant ID from context. If no tenant is in context, operations return ErrTenantRequired.

func NewTenantEventStore

func NewTenantEventStore[E any](inner EventStore[E]) *TenantEventStore[E]

NewTenantEventStore wraps an event store with tenant-scoping. Panics if inner is nil.

func (*TenantEventStore[E]) Append

func (s *TenantEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error)

func (*TenantEventStore[E]) Load

func (s *TenantEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)

func (*TenantEventStore[E]) LoadFrom

func (s *TenantEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)

type TenantID

type TenantID string

TenantID is a validated tenant identifier.

func MustTenantFrom

func MustTenantFrom(ctx context.Context) TenantID

MustTenantFrom extracts the tenant ID or panics if not present. Use in handlers where tenant is always required (after middleware validation).

func TenantFrom

func TenantFrom(ctx context.Context) (TenantID, bool)

TenantFrom extracts the tenant ID from context. Returns empty string and false if not present.

type Tombstone

type Tombstone struct {
	// StreamType is the type of the deleted stream.
	StreamType string

	// StreamID is the deleted stream.
	StreamID string

	// Reason explains why the stream was tombstoned (e.g., "gdpr_request", "archived").
	Reason string

	// Timestamp is when the tombstone was created.
	Timestamp time.Time

	// ArchivedTo identifies where the stream was archived (empty if not archived).
	ArchivedTo string
}

Tombstone records the deletion of a stream for audit purposes.

type TypeCache

type TypeCache[T any] struct {
	// contains filtered or unexported fields
}

TypeCache provides near-zero latency pooled type acquisition for registered event types. It caches the sync.Pool lookup per type, avoiding map lookups on the hot path.

Usage:

registry := eskit.NewEventRegistry()
registry.Register("OrderCreated", func() any { return &OrderCreated{} })
cache := eskit.NewTypeCache[OrderCreated](registry, "OrderCreated")

obj := cache.Acquire()   // ~19 ns/op from pool
// use obj...
cache.Release(obj)       // return to pool

Thread-safe. The Acquire/Release methods never allocate on the hot path when the pool has available objects.

func NewTypeCache

func NewTypeCache[T any](r *EventRegistry, eventType string) *TypeCache[T]

NewTypeCache creates a cached type lookup for maximum performance. The eventType must already be registered in the registry. Panics if the type is not registered.

func (*TypeCache[T]) Acquire

func (tc *TypeCache[T]) Acquire() *T

Acquire returns a pooled instance of type T. The returned instance may contain stale data from a previous use — the caller must overwrite all fields before reading.

Performance: ~19 ns/op (no map lookup, direct pool access).

func (*TypeCache[T]) Release

func (tc *TypeCache[T]) Release(v *T)

Release returns an instance to the pool for reuse. The instance is zeroed before being returned to prevent data leaks.

Passing nil is a no-op.

type TypeNamer deprecated

type TypeNamer interface {
	EventType() string
}

TypeNamer is an optional interface events can implement to provide their type name. If not implemented, the registry's reverse lookup (reflect.Type → name) is used.

Deprecated: Use eskit.Register[E](reg) instead. The registry derives the wire name from the Go type automatically. TypeNamer remains for backward compatibility.

type UnmarshalFunc

type UnmarshalFunc func(data []byte, v any) error

UnmarshalFunc is a function that unmarshals data into a target. Used to thread custom codecs through the deserialization pipeline.

type UpcasterFunc

type UpcasterFunc func(data json.RawMessage) (json.RawMessage, error)

UpcasterFunc transforms raw event JSON from one schema version to the next. Input and output are both JSON bytes — this keeps the chain composable without requiring intermediate Go types to exist in the codebase.

Why JSON bytes instead of typed values? When upcasting v1→v2→v3, the v1 and v2 Go structs may have been deleted. Working with raw JSON means we only need the transformation logic, not the types.

type UpcasterRegistry

type UpcasterRegistry struct {
	// contains filtered or unexported fields
}

UpcasterRegistry manages per-event-type version chains. Thread-safe: all methods are safe for concurrent use.

Performance: Uses a map[int] index per event type for O(1) chain lookup instead of linear scan. The index is rebuilt on Register (cold path) so Upcast (hot path) pays zero overhead.

func NewUpcasterRegistry

func NewUpcasterRegistry() *UpcasterRegistry

NewUpcasterRegistry creates an empty upcaster registry.

func (*UpcasterRegistry) ChainLength

func (r *UpcasterRegistry) ChainLength(eventType string) int

ChainLength returns the number of upcasters registered for an event type. Returns 0 if no upcasters are registered. Useful for testing and diagnostics.

func (*UpcasterRegistry) HasUpcaster

func (r *UpcasterRegistry) HasUpcaster(eventType string, fromVersion int) bool

HasUpcaster returns true if an upcaster is registered for the given event type and fromVersion. O(1) via index.

func (*UpcasterRegistry) LatestVersion

func (r *UpcasterRegistry) LatestVersion(eventType string) int

LatestVersion returns the highest version reachable from any registered upcaster for the given event type. Returns 0 if no upcasters are registered.

Why this exists: callers need to know the target version for Upcast() without hardcoding it. The registry knows because the chain's last toVersion is the latest.

func (*UpcasterRegistry) Register

func (r *UpcasterRegistry) Register(eventType string, fromVersion, toVersion int, fn UpcasterFunc)

Register adds an upcaster that converts eventType from fromVersion to toVersion. Upcasters are chained: registering v1→v2 and v2→v3 allows automatic v1→v3.

Panics on programmer errors (nil fn, invalid versions, duplicate registration). These are configuration bugs that must be caught at startup, not runtime.

func (*UpcasterRegistry) RegisteredTypes

func (r *UpcasterRegistry) RegisteredTypes() []string

RegisteredTypes returns all event types that have upcasters registered.

func (*UpcasterRegistry) Upcast

func (r *UpcasterRegistry) Upcast(eventType string, data json.RawMessage, fromVersion, targetVersion int) (json.RawMessage, error)

Upcast transforms event data from fromVersion to targetVersion by chaining registered upcasters. Returns the original data unchanged if fromVersion == targetVersion.

Hot path — zero allocations beyond what the upcaster functions themselves allocate. Chain lookup is O(1) via pre-built index.

type WaitFunc

type WaitFunc func(ctx context.Context) bool

WaitFunc blocks until a relevant change arrives, then returns true. Returns false when the context is cancelled or the notifier is closed.

type WithMetadata

type WithMetadata interface {
	EventMetadata() Metadata
}

WithMetadata is an optional interface for commands that carry metadata to propagate through to stored events.

type WithPrincipal

type WithPrincipal interface {
	GetInitiator() Principal
	GetOriginator() Principal
}

WithPrincipal is an optional interface for commands that identify who is performing the action. The CommandBus auto-propagates this.

Directories

Path Synopsis
cmd
sse-stress command
Command sse-stress benchmarks eskit SSE infrastructure.
Command sse-stress benchmarks eskit SSE infrastructure.
codec module
Package command provides an in-process CommandBus with single-writer guarantee per stream.
Package command provides an in-process CommandBus with single-writer guarantee per stream.
Package commandlog provides an interface and middleware for recording every command dispatched through a CommandBus.
Package commandlog provides an interface and middleware for recording every command dispatched through a CommandBus.
commandqueue module
Package conformance provides an end-to-end conformance suite that tests the full event sourcing pipeline:
Package conformance provides an end-to-end conformance suite that tests the full event sourcing pipeline:
Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing.
Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing.
embeddednats module
Package eventstoretest provides a reusable conformance test suite for EventStore implementations.
Package eventstoretest provides a reusable conformance test suite for EventStore implementations.
Package gdpr provides GDPR-compliant crypto-shredding for event-sourced systems.
Package gdpr provides GDPR-compliant crypto-shredding for event-sourced systems.
Package hooks provides composable lifecycle hooks for eskit event stores, command dispatch, and projection processing.
Package hooks provides composable lifecycle hooks for eskit event stores, command dispatch, and projection processing.
Package id provides distributed, time-ordered unique ID generation using snowflake IDs.
Package id provides distributed, time-ordered unique ID generation using snowflake IDs.
Package liveprojection provides a real-time, topic-based event projection with fan-out to subscribers.
Package liveprojection provides a real-time, topic-based event projection with fan-out to subscribers.
Package middleware provides composable middleware for eskit EventStore and CommandBus.
Package middleware provides composable middleware for eskit EventStore and CommandBus.
natsstore module
otelkit module
pgstore module
pgview module
Package processor provides event-driven processing for eskit.
Package processor provides event-driven processing for eskit.
Package projection provides tools for managing and rebuilding projections.
Package projection provides tools for managing and rebuilding projections.
Package rebuild provides zero-downtime projection rebuilds using shadow tables.
Package rebuild provides zero-downtime projection rebuilds using shadow tables.
Package snapshot provides configuration options for decider snapshot behavior.
Package snapshot provides configuration options for decider snapshot behavior.
sqlitestore module
Package sqlview provides SQL-backed StateView projections with transactional event processing.
Package sqlview provides SQL-backed StateView projections with transactional event processing.
Package subscription provides guaranteed event delivery for event-sourced systems.
Package subscription provides guaranteed event delivery for event-sourced systems.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL