Documentation
¶
Overview ¶
Package eskit provides an event sourcing toolkit for Go using the decider pattern.
The decider pattern models domain logic as pure functions:
Command → Decide(state, command) → []Event State + Event → Evolve(state, event) → State
This separation makes domain logic easy to test, compose, and reason about.
registry_store.go provides helpers for wiring EventRegistry into store deserialization.
The core problem: when E is an interface (heterogeneous event stream), json.Unmarshal doesn't know which concrete type to create. The registry solves this by mapping event type names → factory functions, enabling correct deserialization.
Two paths:
- No registry: json.Unmarshal into E directly (works for concrete struct E)
- With registry: create concrete type from registry, unmarshal into it, cast to E
EventBus provides a publish/subscribe mechanism for distributing events across components or nodes. Use ChannelEventBus for single-process deployments and natseventbus.NATSEventBus (from git.nullsoft.is/ash/eskit/natseventbus) for multi-node clusters.
EventBus is separate from EventStore — the store is the source of truth, the bus is the notification mechanism. Events are published AFTER successful store append, and subscribers should be idempotent (events may be delivered more than once, or missed if a subscriber was offline).
register.go provides the generic Register function for type-safe event registration.
The wire name is derived from the Go type: "{package}.{TypeName}". This is deterministic, zero-duplication, and derived at registration time.
reg := eskit.NewEventRegistry() eskit.Register[OrderPlaced](reg) // → "sales.OrderPlaced" eskit.Register[ItemAddedToCart](reg) // → "sales.ItemAddedToCart"
Index ¶
- Constants
- Variables
- func Continue[E any](ctx context.Context, source Event[E], principal Principal) context.Context
- func CurrentSchemaVersion(upcasters *UpcasterRegistry, eventType string) int
- func DeriveTypeName[E any]() string
- func DeserializeWithCodec[E any](registry *EventRegistry, eventType string, data []byte, ...) (E, error)
- func DeserializeWithPool[E any](pool *pooledEventRegistry, eventType string, data []byte) (E, error)
- func DeserializeWithRegistry[E any](registry *EventRegistry, eventType string, data []byte) (E, error)
- func DeserializeWithUpcasting[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, ...) (E, int, error)
- func DeserializeWithUpcastingCodec[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, ...) (E, int, error)
- func GenerateKey() ([]byte, error)
- func NewChain(ctx context.Context, principal Principal) context.Context
- func NewChainWithID(ctx context.Context, correlationID string, principal Principal) context.Context
- func ProjectEvents[E any](t *testing.T, handler func(ctx context.Context, event Event[E]) error, ...)
- func RebuildSnapshots[S any, E any](ctx context.Context, store EventStore[E], snapStore SnapshotStore[S], ...) error
- func Register[E any](reg *EventRegistry)
- func RegisterAs[E any](reg *EventRegistry, name string)
- func RegisterRawUpcaster(registry *UpcasterRegistry, eventType string, fromVersion, toVersion int, ...)
- func RegisterTypedUpcaster[From any, To any](registry *UpcasterRegistry, eventType string, fromVersion, toVersion int, ...)
- func ResolveEventType(registry *EventRegistry, event any) (string, error)
- func ScopeStreamID(tenant TenantID, streamID string) string
- func ServeChanges(ctx context.Context, notifier *ChangeNotifier, projection string, ...) error
- func ServeChangesWatch(ctx context.Context, notifier *ChangeNotifier, projection, streamID string, ...) error
- func StreamExists[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (bool, error)
- func StreamVersion[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (int, error)
- func SubscribeTo[D any](d *EventDispatcher[any], name string, ...)
- func TypeName[E any](e Event[E]) string
- func ValidateTenantID(id TenantID) error
- func WithMeta(ctx context.Context, meta Metadata) context.Context
- func WithTenant(ctx context.Context, tenant TenantID) context.Context
- type AppendOptions
- type AuditEntry
- type AuditLogger
- type AutomationRunner
- type BusSubscription
- type Change
- type ChangeNotifier
- type ChangeNotifierOption
- type ChangeRelay
- type ChannelEventBus
- func (b *ChannelEventBus) Close()
- func (b *ChannelEventBus) Publish(_ context.Context, streamID string, events []EventEnvelope) error
- func (b *ChannelEventBus) Subscribe(_ context.Context, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)
- func (b *ChannelEventBus) SubscribeStream(_ context.Context, streamID string, handler func(events []EventEnvelope)) (BusSubscription, error)
- type ChannelEventBusOption
- type CommandHandler
- type DLQ
- func (q *DLQ[E]) Close() error
- func (q *DLQ[E]) Delete(ctx context.Context, id string) error
- func (q *DLQ[E]) Get(ctx context.Context, id string) (*DLQEntry[E], error)
- func (q *DLQ[E]) Len() int
- func (q *DLQ[E]) List(ctx context.Context, limit, offset int) ([]DLQEntry[E], error)
- func (q *DLQ[E]) Replay(ctx context.Context, id string) error
- func (q *DLQ[E]) Send(ctx context.Context, entry DLQEntry[E]) error
- type DLQConfig
- type DLQEntry
- type Decider
- type DeciderTest
- func (dt *DeciderTest[S, C, E]) Given(events ...E) *DeciderTest[S, C, E]
- func (dt *DeciderTest[S, C, E]) ThenError(msg string)
- func (dt *DeciderTest[S, C, E]) ThenExpect(expected ...E)
- func (dt *DeciderTest[S, C, E]) ThenExpectWith(opts []cmp.Option, expected ...E)
- func (dt *DeciderTest[S, C, E]) ThenNoEvents()
- func (dt *DeciderTest[S, C, E]) When(cmd C) *DeciderTest[S, C, E]
- type EncryptedEventStore
- func (s *EncryptedEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, ...) ([]Event[E], error)
- func (s *EncryptedEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)
- func (s *EncryptedEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)
- type EncryptedStoreConfig
- type Encryptor
- type ErrUnknownEventType
- type Event
- type EventBus
- type EventDispatcher
- func (d *EventDispatcher[E]) Dispatch(ctx context.Context, events []Event[E]) error
- func (d *EventDispatcher[E]) HandlerCount() int
- func (d *EventDispatcher[E]) HandlesEvent(typeName string) bool
- func (d *EventDispatcher[E]) Register(sub Subscription[E])
- func (d *EventDispatcher[E]) Replay(ctx context.Context, store EventStore[E], streamType string, ...) error
- type EventEnvelope
- type EventRegistry
- type EventStore
- type EventStoreWithAppendOptions
- type EventStoreWithArchival
- type EventStoreWithDeletion
- type EventStoreWithOptions
- type HandleFunc
- type HandlerOption
- func WithConflictRetry[S any, C any, E any](maxRetries int) HandlerOption[S, C, E]
- func WithConflictRetryDelay[S any, C any, E any](baseDelay time.Duration) HandlerOption[S, C, E]
- func WithDispatcher[S any, C any, E any](d *EventDispatcher[E]) HandlerOption[S, C, E]
- func WithSnapshots[S any, C any, E any](store SnapshotStore[S], opts ...snapshot.Option) HandlerOption[S, C, E]
- type InstrumentedEventStore
- func (s *InstrumentedEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, ...) ([]Event[E], error)
- func (s *InstrumentedEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)
- func (s *InstrumentedEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)
- func (s *InstrumentedEventStore[E]) Ping(ctx context.Context) error
- type KeyStore
- type LoadOptions
- type LockRegistry
- type MemoryKeyStore
- type MemoryLockRegistry
- type MemorySnapshotStore
- func (s *MemorySnapshotStore[S]) Invalidate(_ context.Context, streamType, streamID string) error
- func (s *MemorySnapshotStore[S]) InvalidateAll(_ context.Context) error
- func (s *MemorySnapshotStore[S]) LoadSnapshot(_ context.Context, streamType, streamID string) (*Snapshot[S], error)
- func (s *MemorySnapshotStore[S]) SaveSnapshot(_ context.Context, snapshot Snapshot[S]) error
- type MemoryStore
- func (s *MemoryStore[E]) Append(_ context.Context, streamType, streamID string, expectedVersion int, ...) ([]Event[E], error)
- func (s *MemoryStore[E]) AppendWithOptions(_ context.Context, streamType, streamID string, expectedVersion int, ...) ([]Event[E], error)
- func (s *MemoryStore[E]) Archive(ctx context.Context, streamType, streamID string, target EventStore[E]) error
- func (s *MemoryStore[E]) ArchiveStream(ctx context.Context, streamType, streamID string) error
- func (s *MemoryStore[E]) Delete(_ context.Context, streamType, streamID string) error
- func (s *MemoryStore[E]) DeleteStream(ctx context.Context, streamType, streamID string) error
- func (s *MemoryStore[E]) IsTombstoned(_ context.Context, streamType, streamID string) (*Tombstone, error)
- func (s *MemoryStore[E]) LatestSequence(_ context.Context) (uint64, error)
- func (s *MemoryStore[E]) Load(_ context.Context, streamType, streamID string) ([]Event[E], error)
- func (s *MemoryStore[E]) LoadFrom(_ context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)
- func (s *MemoryStore[E]) LoadRaw(_ context.Context, streamType, streamID string) ([]*RawEvent, error)
- func (s *MemoryStore[E]) LoadWithOptions(_ context.Context, streamType, streamID string, opts LoadOptions) ([]Event[E], error)
- func (s *MemoryStore[E]) ReadByStreamType(_ context.Context, streamType string, fromSequence uint64, limit int) ([]Event[E], error)
- func (s *MemoryStore[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]Event[E], error)
- func (s *MemoryStore[E]) ReadFromWithOptions(_ context.Context, fromSequence uint64, limit int, opts LoadOptions) ([]Event[E], error)
- func (s *MemoryStore[E]) Restore(ctx context.Context, streamType, streamID string, source EventStore[E]) error
- func (s *MemoryStore[E]) RestoreStream(ctx context.Context, streamType, streamID string) error
- func (s *MemoryStore[E]) StreamStatus(_ context.Context, streamType, streamID string) (StreamState, error)
- func (s *MemoryStore[E]) Tombstone(_ context.Context, streamType, streamID string, reason string) error
- func (s *MemoryStore[E]) TombstoneStream(ctx context.Context, streamType, streamID string) error
- type MemoryStoreOption
- type Metadata
- type Middleware
- func LoggingMiddleware[S any, C any, E any](logger *slog.Logger) Middleware[S, C, E]
- func ProfilerMiddleware[S any, C any, E any](profiler *Profiler) Middleware[S, C, E]
- func ProfilerMiddlewareFunc[S any, C any, E any](profiler *Profiler, nameFunc func(C) string) Middleware[S, C, E]
- func SingleWriterMiddleware[S any, C any, E any](registry LockRegistry, reject bool) Middleware[S, C, E]
- func WithLogging[S any, C any, E any](logger *slog.Logger) Middleware[S, C, E]
- func WithMetrics[S any, C any, E any]() Middleware[S, C, E]
- func WithRetry[S any, C any, E any](maxRetries int, baseDelay time.Duration) Middleware[S, C, E]
- type MultiOption
- type MultiSubscription
- type NopAuditLogger
- type OperationStats
- type Principal
- type PrincipalKind
- type Profiler
- func (p *Profiler) AllStats() []OperationStats
- func (p *Profiler) Degrading() []OperationStats
- func (p *Profiler) Handler() http.Handler
- func (p *Profiler) Record(operation string, duration time.Duration)
- func (p *Profiler) Reset()
- func (p *Profiler) Slowest(n int) []OperationStats
- func (p *Profiler) Stats(operation string) OperationStats
- type ProjectionRunner
- type ProjectionTest
- type QueueSubscriber
- type RawEvent
- type ResilientDispatcher
- type ResilientDispatcherConfig
- type ServeOption
- type Snapshot
- type SnapshotStore
- type StateView
- type Stream
- type StreamLifecycle
- type StreamState
- type StreamTypeReader
- type Subscription
- type TenantEventStore
- func (s *TenantEventStore[E]) Append(ctx context.Context, streamType, streamID string, expectedVersion int, ...) ([]Event[E], error)
- func (s *TenantEventStore[E]) Load(ctx context.Context, streamType, streamID string) ([]Event[E], error)
- func (s *TenantEventStore[E]) LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error)
- type TenantID
- type Tombstone
- type TypeCache
- type TypeNamerdeprecated
- type UnmarshalFunc
- type UpcasterFunc
- type UpcasterRegistry
- func (r *UpcasterRegistry) ChainLength(eventType string) int
- func (r *UpcasterRegistry) HasUpcaster(eventType string, fromVersion int) bool
- func (r *UpcasterRegistry) LatestVersion(eventType string) int
- func (r *UpcasterRegistry) Register(eventType string, fromVersion, toVersion int, fn UpcasterFunc)
- func (r *UpcasterRegistry) RegisteredTypes() []string
- func (r *UpcasterRegistry) Upcast(eventType string, data json.RawMessage, fromVersion, targetVersion int) (json.RawMessage, error)
- type WaitFunc
- type WithMetadata
- type WithPrincipal
Constants ¶
const ( // StreamTypeLengthMax limits the length of stream type strings. StreamTypeLengthMax = 256 // StreamIDLengthMax limits the length of stream ID strings. StreamIDLengthMax = 256 // EventTypeLengthMax limits the length of event type strings. EventTypeLengthMax = 256 // BatchSizeMax limits batch operations to prevent unbounded memory usage. BatchSizeMax = 10000 // MetadataEntriesMax limits the number of metadata entries per event. MetadataEntriesMax = 100 // MetadataMaxSizeBytes limits the total size of all metadata keys and values (64KB). MetadataMaxSizeBytes = 64 * 1024 )
Bounds — TigerStyle: every resource must be bounded.
const AppendAny = -1
AppendAny is a sentinel expectedVersion value meaning "skip optimistic concurrency check." Use for append-only streams where multiple writers are expected (e.g., integration event streams).
Variables ¶
var ( // ErrKeyNotFound is returned when a key ID is not in the store. ErrKeyNotFound = fmt.Errorf("eskit/crypto: key not found") // ErrNoActiveKey is returned when no active key is configured. ErrNoActiveKey = fmt.Errorf("eskit/crypto: no active key") // ErrDecryptionFailed is returned when authenticated decryption fails (tampered data). ErrDecryptionFailed = fmt.Errorf("eskit/crypto: decryption failed") // ErrInvalidEnvelope is returned for malformed encrypted envelopes. ErrInvalidEnvelope = fmt.Errorf("eskit/crypto: invalid envelope") // ErrPayloadTooLarge is returned when data exceeds maxPayloadSize. ErrPayloadTooLarge = fmt.Errorf("eskit/crypto: payload too large") )
var ( // ErrStreamDeleted is returned when attempting to append to a tombstoned stream. ErrStreamDeleted = errors.New("eskit: stream deleted") // ErrStreamArchived is returned when attempting to append to an archived stream. ErrStreamArchived = errors.New("eskit: stream archived") )
var ( // ErrDLQMessageNotFound is returned when a DLQ entry doesn't exist. ErrDLQMessageNotFound = fmt.Errorf("eskit/dlq: message not found") // ErrDLQFull is returned when the DLQ has reached its capacity. ErrDLQFull = fmt.Errorf("eskit/dlq: queue full") // ErrDLQClosed is returned when operating on a closed DLQ. ErrDLQClosed = fmt.Errorf("eskit/dlq: closed") )
var ( // ErrStreamIDEmpty is returned when a stream ID is empty or blank. ErrStreamIDEmpty = errors.New("eskit: stream ID must not be empty") // ErrStreamIDTooLong is returned when a stream ID exceeds the maximum allowed length. ErrStreamIDTooLong = errors.New("eskit: stream ID too long") // ErrEventsEmpty is returned when an Append call provides no events. ErrEventsEmpty = errors.New("eskit: events must not be empty") // ErrTooManyEvents is returned when an Append call exceeds the maximum batch size. ErrTooManyEvents = errors.New("eskit: too many events in batch") // ErrNegativeVersion is returned when a negative expectedVersion is provided. ErrNegativeVersion = errors.New("eskit: expected version must not be negative") // ErrNilHandler is returned when a nil handler or callback is provided // where a non-nil value is required. ErrNilHandler = errors.New("eskit: handler must not be nil") // ErrNilStore is returned when a nil event store is provided. ErrNilStore = errors.New("eskit: store must not be nil") // ErrAlreadyRunning is returned when Start is called on something already running. ErrAlreadyRunning = errors.New("eskit: already running") // ErrBusClosed is returned when publishing or subscribing on a closed event bus. ErrBusClosed = errors.New("eskit: bus is closed") // ErrTooManySubscribers is returned when a subscriber limit is exceeded. ErrTooManySubscribers = errors.New("eskit: too many subscribers") // ErrEventTypeNotRegistered is returned when an event type string cannot be resolved // to a registered Go type in the EventRegistry (not the TypeCache). ErrEventTypeNotRegistered = errors.New("eskit: event type not registered") // ErrNullByteInID is returned when a stream ID or stream type contains a null byte. // Null bytes are forbidden because they are used as internal key separators. ErrNullByteInID = errors.New("eskit: stream ID or type must not contain null bytes") )
var ( // ErrConcurrencyConflict is returned when the expected version doesn't match // the actual version in the store. This indicates a concurrent modification. ErrConcurrencyConflict = errors.New("eskit: concurrency conflict") // ErrStreamNotFound is returned when no events exist for a stream. ErrStreamNotFound = errors.New("eskit: stream not found") // ErrMetadataTooManyEntries is returned when metadata exceeds MetadataEntriesMax. ErrMetadataTooManyEntries = errors.New("eskit: metadata too many entries") // ErrMetadataTooLarge is returned when metadata exceeds MetadataMaxSizeBytes. ErrMetadataTooLarge = errors.New("eskit: metadata too large") // ErrStreamTypeEmpty is returned when stream type is empty. ErrStreamTypeEmpty = errors.New("eskit: stream type must not be empty") // ErrStreamTypeTooLong is returned when stream type exceeds StreamTypeLengthMax. ErrStreamTypeTooLong = errors.New("eskit: stream type too long") )
var ( // ErrTenantRequired is returned when a tenant ID is expected but not in context. ErrTenantRequired = fmt.Errorf("eskit/tenant: tenant ID required in context") // ErrInvalidTenantID is returned for tenant IDs that don't match the allowed pattern. ErrInvalidTenantID = fmt.Errorf("eskit/tenant: invalid tenant ID") )
var DefaultBaseTime = time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC)
DefaultBaseTime is a stable timestamp for deterministic projection tests. Using a fixed time avoids flaky tests that depend on time.Now().
Functions ¶
func Continue ¶
Continue propagates metadata from a source event into the context. Copies CorrelationID and Originator from the event, sets CausationID to the source event's identity, and sets Initiator to the given principal.
The processor framework calls this automatically before invoking handlers. You typically don't need to call this yourself.
ctx = eskit.Continue(ctx, sourceEvent, eskit.Principal{Kind: eskit.PrincipalAutomation, ID: "charger"})
func CurrentSchemaVersion ¶
func CurrentSchemaVersion(upcasters *UpcasterRegistry, eventType string) int
CurrentSchemaVersion returns the latest schema version for an event type. Returns 1 if no upcasters are registered (default schema version).
func DeriveTypeName ¶
DeriveTypeName returns the wire name for a Go type without registering it. Format: "{package}.{TypeName}" with PascalCase type name.
Panics if E is not a named type (anonymous structs, interfaces, etc.).
eskit.DeriveTypeName[OrderPlaced]() // → "sales.OrderPlaced"
func DeserializeWithCodec ¶
func DeserializeWithCodec[E any](registry *EventRegistry, eventType string, data []byte, unmarshal UnmarshalFunc) (E, error)
DeserializeWithCodec deserializes event data using a custom unmarshal function and the registry for type resolution. This enables stores to use any codec (CBOR, msgpack, etc.) instead of hardcoded JSON.
func DeserializeWithPool ¶
func DeserializeWithPool[E any](pool *pooledEventRegistry, eventType string, data []byte) (E, error)
DeserializeWithPool deserializes JSON event data using a pooled type registry. This avoids allocating a new instance for every event, reusing objects via sync.Pool.
The returned value is a copy (not pooled), so it's safe to hold indefinitely. The pooled instance is returned to the pool after copying.
Performance: avoids 1 allocation per event on the deserialization hot path.
func DeserializeWithRegistry ¶
func DeserializeWithRegistry[E any](registry *EventRegistry, eventType string, data []byte) (E, error)
DeserializeWithRegistry deserializes JSON event data using the registry to create the correct concrete type, then returns it as E.
Steps:
- Look up the factory for eventType in the registry
- Create a new instance (pointer to concrete type)
- json.Unmarshal into the pointer
- Dereference to get the value, assert to E
Why dereference? The registry factory returns *ConcreteType (pointer) so json.Unmarshal can write into it. But E is typically the interface (not pointer), so we dereference before the type assertion.
func DeserializeWithUpcasting ¶
func DeserializeWithUpcasting[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, data []byte, schemaVersion int) (E, int, error)
DeserializeWithUpcasting deserializes JSON event data, applying upcasters first if needed. If upcasters is nil or no upcaster is registered for this event type, falls through to standard DeserializeWithRegistry (zero overhead on the hot path).
Flow: raw JSON (schema v_old) → upcast chain → raw JSON (schema v_latest) → unmarshal → E
func DeserializeWithUpcastingCodec ¶
func DeserializeWithUpcastingCodec[E any](registry *EventRegistry, upcasters *UpcasterRegistry, eventType string, data []byte, schemaVersion int, unmarshal UnmarshalFunc) (E, int, error)
DeserializeWithUpcastingCodec is like DeserializeWithUpcasting but accepts a custom unmarshal function for codec-agnostic deserialization.
func GenerateKey ¶
GenerateKey generates a cryptographically secure random 32-byte key.
func NewChain ¶
NewChain starts a new metadata chain in the context. Generates a random correlation ID and sets the principal as both initiator and originator (they're the same at the entry point).
Use in HTTP middleware or API entry points:
ctx = eskit.NewChain(ctx, eskit.Principal{Kind: eskit.PrincipalUser, ID: userID})
func NewChainWithID ¶
NewChainWithID starts a new metadata chain with a specific correlation ID. Use when the caller provides their own correlation ID (e.g., X-Correlation-ID header).
ctx = eskit.NewChainWithID(ctx, requestID, eskit.Principal{Kind: eskit.PrincipalAPI, ID: apiKey})
func ProjectEvents ¶
func ProjectEvents[E any](t *testing.T, handler func(ctx context.Context, event Event[E]) error, streamID string, baseTime time.Time, events ...E)
ProjectEvents feeds events through a handler function for projection unit tests. Events are wrapped in Event[E] with incremental versions and timestamps.
This is a standalone function for simple cases. For multi-stream or assertion-rich tests, prefer the fluent TestProjection API.
Parameters:
- streamID: the stream identifier (e.g., "geozone-1", "location-42")
- baseTime: starting timestamp; each event is offset by 1 second
- events: the domain event data to project
Example:
eskit.ProjectEvents(t, handler, "order-1", eskit.DefaultBaseTime,
OrderCreated{ID: "order-1"},
ItemAdded{SKU: "shoe"},
)
func RebuildSnapshots ¶
func RebuildSnapshots[S any, E any]( ctx context.Context, store EventStore[E], snapStore SnapshotStore[S], decider Decider[S, any, E], opts ...snapshot.Option, ) error
RebuildSnapshots replays all events for all streams and saves fresh snapshots. Use after deploying schema changes or Evolve logic changes.
This is a batch operation intended for deployment scripts or maintenance tasks. It loads all events from the store, replays them through the decider, and saves new snapshots with the configured schema version.
eskit.RebuildSnapshots(ctx, eventStore, snapStore, decider,
snapshot.SchemaVersion(2),
snapshot.Every(100),
)
func Register ¶
func Register[E any](reg *EventRegistry)
Register adds a type to the EventRegistry using the Go type name as the wire name. The wire name format is "{package}.{TypeName}" — e.g., "sales.OrderPlaced".
This is the preferred registration method. It eliminates string duplication and derives the name from the Go type system (single source of truth).
Panics if:
- reg is nil
- E is not a named struct type (interfaces, maps, slices, anonymous structs are rejected)
- the type is already registered
- the registry is full
func RegisterAs ¶
func RegisterAs[E any](reg *EventRegistry, name string)
RegisterAs adds a type to the EventRegistry with an explicit wire name. Use this for backward compatibility when migrating from manual string-based registration — the wire name must match existing stored events.
eskit.RegisterAs[OrderPlaced](reg, "OrderPlaced") // legacy wire name
Panics if reg is nil, the type is already registered, or the registry is full.
func RegisterRawUpcaster ¶
func RegisterRawUpcaster( registry *UpcasterRegistry, eventType string, fromVersion, toVersion int, fn UpcasterFunc, )
RegisterRawUpcaster registers an upcaster that works directly with json.RawMessage. Use this for maximum performance when you don't need typed access to the event, or when the intermediate types no longer exist in your codebase.
Example — renaming a field:
RegisterRawUpcaster(registry, "UserCreated", 1, 2, func(data json.RawMessage) (json.RawMessage, error) {
// Replace "name" with "full_name" using direct byte manipulation
return bytes.Replace(data, []byte(`"name"`), []byte(`"full_name"`), 1), nil
})
func RegisterTypedUpcaster ¶
func RegisterTypedUpcaster[From any, To any]( registry *UpcasterRegistry, eventType string, fromVersion, toVersion int, fn func(From) (To, error), )
RegisterTypedUpcaster is a generic helper that registers a type-safe upcaster. It handles JSON marshaling/unmarshaling so callers work with Go types directly.
Performance: Uses sync.Pool for marshal buffers to minimize allocations. The upcaster function itself is allocation-free if the caller doesn't allocate.
Example:
RegisterTypedUpcaster[OrderCreatedV1, OrderCreatedV2](registry, "OrderCreated", 1, 2,
func(old OrderCreatedV1) (OrderCreatedV2, error) {
return OrderCreatedV2{
OrderID: old.OrderID,
Customer: old.Customer,
Currency: "USD", // new field with default
}, nil
},
)
func ResolveEventType ¶
func ResolveEventType(registry *EventRegistry, event any) (string, error)
ResolveEventType determines the type name for an event value. Priority: TypeNamer interface (deprecated) > registry reverse lookup > error.
Preferred path: register types with eskit.Register[E](reg), then the registry reverse lookup handles everything. TypeNamer is only checked for backward compatibility.
func ScopeStreamID ¶
ScopeStreamID returns "tenantID.streamID" — the scoped stream name. Panics if either argument is empty.
func ServeChanges ¶
func ServeChanges(ctx context.Context, notifier *ChangeNotifier, projection string, renderFn func(ctx context.Context) error, opts ...ServeOption) error
ServeChanges subscribes to a projection on notifier and runs a render loop. It calls renderFn immediately (initial render), then again each time a change notification arrives. The function blocks until ctx is cancelled or the notifier is closed. Subscribe/unsubscribe is handled automatically.
If renderFn returns an error, the error is logged but the loop continues — a single failed render must not tear down the connection.
By default a 5 s fallback poll is enabled (see WithServePollFallback).
Example:
eskit.ServeChanges(ctx, notifier, "orders.order-list",
func(ctx context.Context) error {
orders, _ := reader.ListOrders(ctx)
return sse.MergeFragmentTempl(OrderTable(orders),
datastar.WithSelectorID("order-table"))
},
)
func ServeChangesWatch ¶
func ServeChangesWatch(ctx context.Context, notifier *ChangeNotifier, projection, streamID string, renderFn func(ctx context.Context) error, opts ...ServeOption) error
ServeChangesWatch is like ServeChanges but watches a specific entity identified by streamID within the given projection.
Example:
eskit.ServeChangesWatch(ctx, notifier, "orders.order-detail", orderID,
func(ctx context.Context) error {
order, _ := reader.GetOrder(ctx, orderID)
return sse.MergeFragmentTempl(OrderDetail(order),
datastar.WithSelectorID("order-detail"))
},
)
func StreamExists ¶
func StreamExists[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (bool, error)
StreamExists returns true if the stream has any events.
func StreamVersion ¶
func StreamVersion[E any](ctx context.Context, store EventStore[E], streamType, streamID string) (int, error)
StreamVersion returns the current version of a stream (latest event's version). Returns 0 if the stream has no events.
func SubscribeTo ¶
func SubscribeTo[D any](d *EventDispatcher[any], name string, handle func(ctx context.Context, evt Event[any], data D) error)
SubscribeTo registers a type-safe subscription for a specific event data type. The event type name is derived automatically from D — no strings, no manual type assertions.
The handler receives the already-typed event data as the third argument. Events that don't match type D are skipped automatically.
Example:
eskit.SubscribeTo(dispatcher, "shipping.ship_when_paid",
func(ctx context.Context, evt eskit.Event[any], e PaymentCompleted) error {
return bus.Send(ctx, CreateShipment{OrderID: e.OrderID})
},
)
func TypeName ¶
TypeName returns the Go type name of an event's data payload. Useful for type-switch based event handling in projections. Returns "" for nil data.
func ValidateTenantID ¶
ValidateTenantID checks if a tenant ID is valid. Returns an error if empty, too long, or doesn't match the allowed pattern.
Types ¶
type AppendOptions ¶
type AppendOptions struct {
// IdempotencyKey deduplicates retried appends. If a previous append
// with this key succeeded, return the same events without re-appending.
// Empty means no idempotency check.
IdempotencyKey string
// Timestamp overrides the auto-generated timestamp. Useful for migrations
// and replaying events from another system. Zero value means use current time.
Timestamp time.Time
// Metadata is the event metadata (replaces variadic metadata parameter).
Metadata Metadata
}
AppendOptions configures optional behavior for AppendWithOptions.
type AuditEntry ¶
type AuditEntry struct {
Timestamp time.Time
Action string // "read" or "write"
StreamID string
Principal Principal
EventIDs []string // events read or written
Extra map[string]string
}
AuditEntry records a single audit log entry for compliance.
type AuditLogger ¶
type AuditLogger interface {
// LogRead records that events were read from a stream.
LogRead(ctx context.Context, entry AuditEntry) error
// LogWrite records that events were written to a stream.
LogWrite(ctx context.Context, entry AuditEntry) error
}
AuditLogger records read and write access to event streams for compliance. Implementations should be non-blocking — audit failures must not break the write path.
type AutomationRunner ¶
type AutomationRunner[E any] struct { // contains filtered or unexported fields }
AutomationRunner subscribes to an EventBus and feeds events to an EventDispatcher. In cluster mode with queue groups, each event is processed by exactly ONE node — preventing duplicate side effects.
func NewAutomationRunner ¶
func NewAutomationRunner[E any](bus EventBus, dispatcher *EventDispatcher[E]) *AutomationRunner[E]
NewAutomationRunner creates a runner that feeds bus events to the dispatcher.
func (*AutomationRunner[E]) Start ¶
func (r *AutomationRunner[E]) Start(ctx context.Context, queueGroup string) error
Start subscribes to the event bus and begins feeding events to the dispatcher.
func (*AutomationRunner[E]) Stop ¶
func (r *AutomationRunner[E]) Stop() error
Stop unsubscribes from the event bus.
type BusSubscription ¶
type BusSubscription interface {
Unsubscribe() error
}
BusSubscription represents an active event bus subscription. Call Unsubscribe to stop receiving events and release resources.
type Change ¶
type Change struct {
// Projection is the state-view name (e.g., "geography.geozone-list").
Projection string
// StreamID identifies the entity that changed (e.g., "geozone-42").
StreamID string
// EventType is the event that caused the change (e.g., "GeozoneRegistered").
EventType string
}
Change describes a single state-view update. Emitted after a projection's Evolve succeeds for an event.
type ChangeNotifier ¶
type ChangeNotifier struct {
// contains filtered or unexported fields
}
ChangeNotifier provides in-process pub/sub for projection state changes. SSE handlers subscribe to projections and receive notifications when state-views are updated, triggering re-reads and client pushes.
Architecture:
- Close-channel broadcast: Notify() closes a shared channel to wake all waiters in O(1), then allocates a new one. Cost is constant regardless of subscriber count — no fan-out loop.
- Ring buffer per projection stores recent changes. Watch subscribers wake on broadcast, scan the ring for their streamID, and go back to sleep if nothing matches.
- Natural coalescing: if 5 changes arrive while a subscriber is rendering, it wakes once, sees the sequence jumped by 5, and renders once. No silent drops, no lost notifications.
- Zero per-subscriber channel allocation. Memory scales with projections, not connections.
Supports two subscription modes:
- Subscribe(projection): receives all changes for a projection
- Watch(projection, streamID): receives changes for a specific entity only
Safe for concurrent use.
func NewChangeNotifier ¶
func NewChangeNotifier(opts ...ChangeNotifierOption) *ChangeNotifier
NewChangeNotifier creates a notification hub for projection changes.
func (*ChangeNotifier) Close ¶
func (n *ChangeNotifier) Close()
Close shuts down the notifier and wakes all waiting subscribers.
func (*ChangeNotifier) Notify ¶
func (n *ChangeNotifier) Notify(change Change)
Notify broadcasts a change to all subscribers of the named projection.
O(1) cost regardless of subscriber count — no channel iteration, no allocation per subscriber. The only allocation is one channel (to replace the wake channel).
Watch subscribers are woken too; they filter by streamID after waking.
func (*ChangeNotifier) SubCount ¶
func (n *ChangeNotifier) SubCount() int64
SubCount returns the total number of active subscriptions across all projections.
func (*ChangeNotifier) Subscribe ¶
func (n *ChangeNotifier) Subscribe(projection string) (WaitFunc, func())
Subscribe returns a WaitFunc that blocks until the projection has new changes. Each call blocks until at least one new change arrives since the previous call.
Natural coalescing: if 10 changes arrive while the caller is busy, the next call returns immediately and the caller re-renders once, not 10.
The returned cancel function releases resources. After cancel, the WaitFunc returns false immediately.
Usage:
wait, cancel := notifier.Subscribe("orders.order-list")
defer cancel()
for wait(ctx) {
orders := reader.ListOrders(ctx)
sse.MergeFragmentTempl(OrderTable(orders))
}
func (*ChangeNotifier) Watch ¶
func (n *ChangeNotifier) Watch(projection, streamID string) (WaitFunc, func())
Watch returns a WaitFunc that blocks until the projection has a new change for the specified streamID. Like [Subscribe], but filters by entity.
Uses the ring buffer to check which streamIDs were affected. False wakeups (changes to other entities in the same projection) cause a brief wake + ring scan + back to sleep, costing ~10ns per ring slot checked.
Usage:
wait, cancel := notifier.Watch("orders.order-detail", orderID)
defer cancel()
for wait(ctx) {
order := reader.GetOrder(ctx, orderID)
sse.MergeFragmentTempl(OrderDetail(order))
}
type ChangeNotifierOption ¶
type ChangeNotifierOption func(*ChangeNotifier)
ChangeNotifierOption configures a ChangeNotifier.
func WithRingBits ¶
func WithRingBits(bits uint) ChangeNotifierOption
WithRingBits sets the ring buffer size as a power of 2. Default: 7 (128 slots). Minimum: 4 (16 slots). Maximum: 16 (65536 slots).
Larger rings help Watch subscribers that fall far behind, at the cost of more memory per projection (~48 bytes × ring size). For most workloads the default of 128 is plenty.
type ChangeRelay ¶
type ChangeRelay interface {
// Broadcast sends a change notification to all servers (including self).
// Called from OnChange after a shared-consumer projection processes an event.
// Must be non-blocking — implementations should not wait for delivery.
Broadcast(change Change)
// Start begins listening for remote change notifications and forwarding
// them to the local ChangeNotifier. Blocks until ctx is cancelled.
// Auto-reconnects on transient failures.
Start(ctx context.Context)
// Close stops the relay and releases resources.
Close() error
}
ChangeRelay broadcasts projection changes across server instances. Use for shared-consumer projections where only one server processes each event and other servers need to be notified.
Implementations use transport-specific mechanisms:
- pgstore.PGChangeRelay: PostgreSQL LISTEN/NOTIFY
- sqlitestore.SQLiteChangeRelay: polling
The relay forwards received changes to a local ChangeNotifier so that SSE handlers on all servers receive notifications.
type ChannelEventBus ¶
type ChannelEventBus struct {
// contains filtered or unexported fields
}
ChannelEventBus is an in-process EventBus backed by Go channels. Suitable for single-node deployments and testing. Fan-out: every subscriber receives every published event.
func NewChannelEventBus ¶
func NewChannelEventBus(opts ...ChannelEventBusOption) *ChannelEventBus
NewChannelEventBus creates a new in-process event bus.
func (*ChannelEventBus) Close ¶
func (b *ChannelEventBus) Close()
Close shuts down the bus, closing all subscriber channels.
func (*ChannelEventBus) Publish ¶
func (b *ChannelEventBus) Publish(_ context.Context, streamID string, events []EventEnvelope) error
Publish fans out events to all matching subscribers. Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber (slow consumer protection — subscribers should be fast or use buffering).
func (*ChannelEventBus) Subscribe ¶
func (b *ChannelEventBus) Subscribe(_ context.Context, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)
Subscribe registers a handler for events on all streams.
func (*ChannelEventBus) SubscribeStream ¶
func (b *ChannelEventBus) SubscribeStream(_ context.Context, streamID string, handler func(events []EventEnvelope)) (BusSubscription, error)
SubscribeStream registers a handler for events on a specific stream.
type ChannelEventBusOption ¶
type ChannelEventBusOption func(*ChannelEventBus)
ChannelEventBusOption configures a ChannelEventBus.
func WithChannelBufferSize ¶
func WithChannelBufferSize(size int) ChannelEventBusOption
WithChannelBufferSize sets the channel buffer size for each subscriber. Default: 256.
type CommandHandler ¶
type CommandHandler[S any, C any, E any] struct { Decider Decider[S, C, E] Store EventStore[E] Snapshots SnapshotStore[S] // optional, may be nil SnapshotEvery int // take snapshot every N events, 0 = disabled (deprecated: use NewDeciderHandler + WithSnapshots) Dispatcher *EventDispatcher[E] // optional, may be nil — dispatches events after persist // contains filtered or unexported fields }
CommandHandler processes commands against a stream, persisting resulting events. This is the main entry point for executing domain logic.
Create with NewDeciderHandler for the functional options API, or initialize the struct directly for simple cases.
func NewDeciderHandler ¶
func NewDeciderHandler[S any, C any, E any]( store EventStore[E], decider Decider[S, C, E], opts ...HandlerOption[S, C, E], ) *CommandHandler[S, C, E]
NewDeciderHandler creates a CommandHandler with functional options. Snapshots are OFF by default — use WithSnapshots to enable.
Minimal usage:
handler := eskit.NewDeciderHandler(store, decider)
With snapshots:
handler := eskit.NewDeciderHandler(store, decider,
eskit.WithSnapshots[MyState, MyCmd, MyEvent](snapStore,
snapshot.Every(100),
snapshot.SchemaVersion(2),
),
)
With event dispatch:
handler := eskit.NewDeciderHandler(store, decider,
eskit.WithDispatcher[MyState, MyCmd, MyEvent](dispatcher),
)
func (*CommandHandler[S, C, E]) Handle ¶
func (h *CommandHandler[S, C, E]) Handle(ctx context.Context, streamType, streamID string, command C) (S, []Event[E], error)
Handle executes a command against the stream identified by streamID. It loads events (optionally from a snapshot), runs the decider, and appends new events. Returns the new state and the produced events.
func (*CommandHandler[S, C, E]) Use ¶
func (h *CommandHandler[S, C, E]) Use(mw Middleware[S, C, E])
Use adds middleware to the command handler. Middleware executes in the order added (first added = outermost). Panics if adding would exceed maxMiddleware.
type DLQ ¶
type DLQ[E any] struct { // contains filtered or unexported fields }
DLQ is a generic dead letter queue for failed projection events. Thread-safe: all methods are safe for concurrent use.
func (*DLQ[E]) List ¶
List returns DLQ entries in insertion order (newest last). limit is clamped to maxDLQListLimit. offset skips entries from the start.
type DLQConfig ¶
type DLQConfig[E any] struct { // MaxSize is the maximum number of entries. 0 = default (100000). MaxSize int // ReplayFn is called when replaying a DLQ entry. If nil, replay returns an error. ReplayFn func(ctx context.Context, entry DLQEntry[E]) error }
DLQConfig configures the dead letter queue.
type DLQEntry ¶
type DLQEntry[E any] struct { // ID is the unique identifier for this DLQ entry. ID string // Event is the original event that failed processing. Event Event[E] // HandlerName identifies which projection handler failed. HandlerName string // Error is the error message from the last failure. Error string // Attempts is the number of times processing was attempted. Attempts int // FirstFailure is when the event first failed. FirstFailure time.Time // LastFailure is when the event most recently failed. LastFailure time.Time }
DLQEntry represents a failed event in the dead letter queue.
type Decider ¶
type Decider[S any, C any, E any] struct { // Decide takes the current state and a command, returning events or an error. Decide func(state S, command C) ([]E, error) // Evolve applies an event to the current state, returning the new state. Evolve func(state S, event E) S // InitialState returns the zero/initial state for the decider. InitialState func() S }
Decider defines the core event sourcing decider pattern using generics. It encapsulates domain logic as pure functions over commands, events, and state.
Type parameters:
- S: the decider state type
- C: the command type
- E: the event type
func (Decider[S, C, E]) Fold ¶
func (d Decider[S, C, E]) Fold(events []E) S
Fold replays a sequence of events from the initial state, returning the final state.
type DeciderTest ¶
DeciderTest provides a Given/When/Then fluent API for testing deciders. Each test is a specification that maps directly to event modeling:
eskit.Test(t, decider).
Given(OrderCreated{}, ItemAdded{SKU: "shoe"}).
When(SubmitOrder{}).
ThenExpect(OrderSubmitted{})
Internally: InitialState → Fold Given events → Decide(state, When command) → assert.
func (*DeciderTest[S, C, E]) Given ¶
func (dt *DeciderTest[S, C, E]) Given(events ...E) *DeciderTest[S, C, E]
Given sets the historical events that establish the initial state.
func (*DeciderTest[S, C, E]) ThenError ¶
func (dt *DeciderTest[S, C, E]) ThenError(msg string)
ThenError asserts that Decide returns an error containing the given substring.
func (*DeciderTest[S, C, E]) ThenExpect ¶
func (dt *DeciderTest[S, C, E]) ThenExpect(expected ...E)
ThenExpect asserts that Decide produces exactly the expected events. Uses cmp.Diff for readable diffs on mismatch.
func (*DeciderTest[S, C, E]) ThenExpectWith ¶
func (dt *DeciderTest[S, C, E]) ThenExpectWith(opts []cmp.Option, expected ...E)
ThenExpectWith asserts that Decide produces exactly the expected events, using the provided cmp.Options for comparison. Useful for ignoring fields like timestamps:
ThenExpectWith([]cmp.Option{cmpopts.IgnoreFields(Event{}, "Timestamp")}, expected...)
func (*DeciderTest[S, C, E]) ThenNoEvents ¶
func (dt *DeciderTest[S, C, E]) ThenNoEvents()
ThenNoEvents asserts that Decide produces zero events and no error.
func (*DeciderTest[S, C, E]) When ¶
func (dt *DeciderTest[S, C, E]) When(cmd C) *DeciderTest[S, C, E]
When sets the command to execute against the state built from Given events.
type EncryptedEventStore ¶
type EncryptedEventStore[E any] struct { // contains filtered or unexported fields }
EncryptedEventStore wraps an EventStore to transparently encrypt/decrypt event data. Events whose type name (via typeNameFn) matches the encryption config are encrypted on Append and decrypted on Load/LoadFrom. Other events pass through unchanged.
Type parameter E must be serializable to/from JSON for encryption.
func NewEncryptedEventStore ¶
func NewEncryptedEventStore[E any](cfg EncryptedStoreConfig[E]) *EncryptedEventStore[E]
NewEncryptedEventStore creates an encrypted wrapper around an event store. Panics if required config fields are nil — these are programmer errors.
type EncryptedStoreConfig ¶
type EncryptedStoreConfig[E any] struct { // Store is the underlying event store. Required. Store EventStore[E] // Encryptor handles encryption/decryption. Required. Encryptor *Encryptor // ShouldEncrypt returns true if this specific event should be encrypted. // If nil, all events are encrypted. ShouldEncrypt func(event E) bool // Serialize converts an event to bytes. Required. Serialize func(event E) ([]byte, error) // Deserialize converts bytes back to an event. Required. Deserialize func(data []byte) (E, error) }
EncryptedStoreConfig configures an EncryptedEventStore.
type Encryptor ¶
type Encryptor struct {
// contains filtered or unexported fields
}
Encryptor handles AES-256-GCM envelope encryption with key rotation support.
func NewEncryptor ¶
NewEncryptor creates an encryptor backed by the given key store. Panics if keyStore is nil — this is a configuration error.
func (*Encryptor) Decrypt ¶
Decrypt decrypts an envelope back to plaintext. Automatically resolves the correct key from the envelope's key ID.
func (*Encryptor) Encrypt ¶
Encrypt encrypts plaintext using envelope encryption.
Envelope format (v1):
[version:1][keyIDLen:1][keyID:N][encryptedDEK:nonce(12)+ciphertext(32)+tag(16)=60][nonce:12][ciphertext+tag:...]
Returns the envelope bytes. Each call generates a unique random DEK, so encrypting the same plaintext twice produces different ciphertext.
type ErrUnknownEventType ¶
type ErrUnknownEventType struct {
EventType string
}
ErrUnknownEventType is returned when an event type is not registered.
func (ErrUnknownEventType) Error ¶
func (e ErrUnknownEventType) Error() string
type Event ¶
type Event[E any] struct { // ID is the unique identifier for this event record. ID string // StreamType identifies the type/category of stream (e.g., "order", "account"). // Set by the store from the Decider's StreamType or passed explicitly. StreamType string // StreamID identifies the stream this event belongs to (just the business ID, e.g., "123"). StreamID string // Version is the sequential version number within the stream (StreamPosition). Version int // EventType is the string name of the event (e.g., "OrderCreated"). // Stores set this from the EventRegistry on append and load. // When no registry is configured, this may be empty. EventType string // GlobalSequence is the position in the global ordered stream across all aggregates. // Zero means not assigned (e.g., memory store doesn't track global sequence). GlobalSequence uint64 // SchemaVersion tracks the event schema for upcasting. Default: 1. SchemaVersion int // Timestamp is when the event was recorded. Timestamp time.Time // Metadata holds optional event metadata (correlation, causation, principal, etc.). Metadata Metadata // Data is the domain event payload. Data E }
Event represents a stored event in the event store.
type EventBus ¶
type EventBus interface {
// Publish sends events to all subscribers. Called after successful store append.
// streamID identifies which stream the events belong to.
Publish(ctx context.Context, streamID string, events []EventEnvelope) error
// Subscribe registers a handler for all events on all streams.
// Returns a Subscription that must be closed when no longer needed.
Subscribe(ctx context.Context, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)
// SubscribeStream registers a handler for events on a specific stream.
// Returns a Subscription that must be closed when no longer needed.
SubscribeStream(ctx context.Context, streamID string, handler func(events []EventEnvelope)) (BusSubscription, error)
}
EventBus distributes events to interested subscribers after they are persisted. This is the notification layer — not the source of truth (that's EventStore).
type EventDispatcher ¶
type EventDispatcher[E any] struct { // contains filtered or unexported fields }
EventDispatcher fans out events to registered subscriptions filtered by event type.
Each subscription declares which event types it handles via EventTypes. Events with no matching subscription are skipped entirely — zero overhead. Subscriptions execute in registration order, synchronously, fail-fast.
EventDispatcher is the unified engine behind [Projector] (read models) and [Reactor] (side effects). Use the semantic alias that fits your intent.
Thread-safe for concurrent Dispatch calls. Register should be called during setup; concurrent Register and Dispatch is safe but not recommended.
func NewEventDispatcher ¶
func NewEventDispatcher[E any]() *EventDispatcher[E]
NewEventDispatcher creates an EventDispatcher with no registered subscriptions.
func (*EventDispatcher[E]) Dispatch ¶
func (d *EventDispatcher[E]) Dispatch(ctx context.Context, events []Event[E]) error
Dispatch fans out events to matching subscriptions.
For each event, only subscriptions whose EventTypes includes that event's type name are called. Match-all subscriptions (empty EventTypes) are always called.
Execution is fail-fast: the first error stops processing. No further subscriptions or events are processed after a failure.
func (*EventDispatcher[E]) HandlerCount ¶
func (d *EventDispatcher[E]) HandlerCount() int
HandlerCount returns the number of registered subscriptions.
func (*EventDispatcher[E]) HandlesEvent ¶
func (d *EventDispatcher[E]) HandlesEvent(typeName string) bool
HandlesEvent reports whether any registered subscription handles the given event type name.
func (*EventDispatcher[E]) Register ¶
func (d *EventDispatcher[E]) Register(sub Subscription[E])
Register adds a subscription to the dispatcher. Subscriptions execute in registration order.
Panics if Name is empty, Handler is nil, EventTypes contains empty strings, or maxHandlers would be exceeded.
func (*EventDispatcher[E]) Replay ¶
func (d *EventDispatcher[E]) Replay(ctx context.Context, store EventStore[E], streamType string, streamIDs []string) error
Replay loads all events from a store and dispatches them through all handlers. Useful for rebuilding read models from scratch.
type EventEnvelope ¶
type EventEnvelope struct {
ID string `json:"id"`
StreamID string `json:"stream_id"`
Version int `json:"version"`
Type string `json:"type,omitempty"`
Data []byte `json:"data"`
Timestamp int64 `json:"timestamp"` // unix nano
}
EventEnvelope is a serialization-friendly event wrapper for the event bus. Unlike Event[E] which is generic, EventEnvelope uses []byte data so it can cross process boundaries without knowing the concrete event type.
type EventRegistry ¶
type EventRegistry struct {
// contains filtered or unexported fields
}
EventRegistry maps event type names to factory functions for deserialization. This allows stores to create the correct Go type when loading events.
func NewEventRegistry ¶
func NewEventRegistry() *EventRegistry
NewEventRegistry creates an empty event registry.
func (*EventRegistry) Has ¶
func (r *EventRegistry) Has(eventType string) bool
Has returns true if the event type is registered.
func (*EventRegistry) New ¶
func (r *EventRegistry) New(eventType string) (any, error)
New creates a new instance of the registered event type. Returns an error if the type is not registered.
func (*EventRegistry) Register ¶
func (r *EventRegistry) Register(eventType string, factory func() any)
Register maps an event type name to a factory function. The factory should return a pointer to a new zero-valued instance.
registry.Register("OrderCreated", func() any { return &OrderCreated{} })
func (*EventRegistry) TypeName ¶
func (r *EventRegistry) TypeName(event any) string
TypeName returns the registered name for an event value. Uses the concrete type (dereferencing pointers). Returns empty string if not registered.
func (*EventRegistry) Types ¶
func (r *EventRegistry) Types() []string
Types returns all registered event type names.
type EventStore ¶
type EventStore[E any] interface { // Append persists new events for a stream. expectedVersion is the version // of the last known event (0 means new stream). Returns ErrConcurrencyConflict // if the current version doesn't match expectedVersion. // Pass AppendAny (-1) to skip the version check entirely (append-only mode). // The optional metadata parameter (first element used if provided) is applied to all events. Append(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, metadata ...Metadata) ([]Event[E], error) // Load returns all events for a stream in version order. // Returns an empty slice (not an error) if the stream doesn't exist. Load(ctx context.Context, streamType, streamID string) ([]Event[E], error) // LoadFrom returns events for a stream starting from a given version (inclusive). LoadFrom(ctx context.Context, streamType, streamID string, fromVersion int) ([]Event[E], error) }
EventStore is the interface for persisting and loading events. Implementations must support optimistic concurrency control via versioning.
type EventStoreWithAppendOptions ¶
type EventStoreWithAppendOptions[E any] interface { // AppendWithOptions persists new events with additional options. // If IdempotencyKey matches a previous successful append, returns the // original events without re-appending. AppendWithOptions(ctx context.Context, streamType, streamID string, expectedVersion int, events []E, opts AppendOptions) ([]Event[E], error) }
EventStoreWithAppendOptions extends EventStore with advanced append capabilities. Stores that support idempotency and custom timestamps implement this interface.
type EventStoreWithArchival ¶
type EventStoreWithArchival[E any] interface { // Archive moves a stream's events to cold storage. The stream becomes // read-only from the archive store. Future Append calls return ErrStreamArchived. // Load on the primary store returns empty; load from the archive returns events. Archive(ctx context.Context, streamType, streamID string, target EventStore[E]) error // Restore moves an archived stream back to the primary store. // Removes the tombstone and deletes from the source store. Restore(ctx context.Context, streamType, streamID string, source EventStore[E]) error }
EventStoreWithArchival extends EventStore with cold storage archival. Archive moves events to a target store and tombstones the primary. Restore moves events back from the source store.
type EventStoreWithDeletion ¶
type EventStoreWithDeletion[E any] interface { // Delete permanently removes all events for a stream. // Returns ErrStreamNotFound if stream doesn't exist. // Also removes associated snapshots and checkpoints. Delete(ctx context.Context, streamType, streamID string) error // Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. // Load returns empty. The tombstone record remains for audit. Tombstone(ctx context.Context, streamType, streamID string, reason string) error // IsTombstoned checks if a stream has been tombstoned. // Returns nil, nil if the stream is not tombstoned. IsTombstoned(ctx context.Context, streamType, streamID string) (*Tombstone, error) }
EventStoreWithDeletion extends EventStore with stream deletion and tombstoning. Implementations should also clean up associated snapshots and checkpoints.
type EventStoreWithOptions ¶
type EventStoreWithOptions[E any] interface { EventStore[E] // LoadWithOptions loads events with optional filtering. LoadWithOptions(ctx context.Context, streamType, streamID string, opts LoadOptions) ([]Event[E], error) }
EventStoreWithOptions extends EventStore with filtered loading. Stores that support server-side filtering implement this for performance.
type HandleFunc ¶
HandleFunc is the signature for the next handler in the middleware chain.
type HandlerOption ¶
type HandlerOption[S any, C any, E any] func(*CommandHandler[S, C, E])
HandlerOption configures a CommandHandler created with NewDeciderHandler.
func WithConflictRetry ¶
func WithConflictRetry[S any, C any, E any](maxRetries int) HandlerOption[S, C, E]
WithConflictRetry sets the maximum number of automatic retries on ErrConcurrencyConflict. Default is 0 (no retries, preserving backward compatibility). On each retry, the handler reloads events, re-evolves state, and re-decides.
func WithConflictRetryDelay ¶
WithConflictRetryDelay sets the base delay for exponential backoff between conflict retries. The actual delay is baseDelay * 2^attempt, capped at 1 second. Default is 10ms if retries are enabled but no delay is set.
func WithDispatcher ¶
func WithDispatcher[S any, C any, E any](d *EventDispatcher[E]) HandlerOption[S, C, E]
WithDispatcher sets the event dispatcher for the handler. Events are dispatched to all matching subscriptions after successful persist.
eskit.WithDispatcher[S, C, E](dispatcher)
func WithSnapshots ¶
func WithSnapshots[S any, C any, E any](store SnapshotStore[S], opts ...snapshot.Option) HandlerOption[S, C, E]
WithSnapshots enables snapshot support with optional configuration. If no options are provided, sensible defaults are used (every 100 events, schema version 1).
Minimal — just enable with defaults:
eskit.WithSnapshots[S, C, E](snapStore)
Custom threshold:
eskit.WithSnapshots[S, C, E](snapStore, snapshot.Every(500))
Full configuration:
eskit.WithSnapshots[S, C, E](snapStore,
snapshot.Every(500),
snapshot.SchemaVersion(2),
snapshot.MinAge(5 * time.Minute),
snapshot.Async(true),
)
type InstrumentedEventStore ¶
type InstrumentedEventStore[E any] struct { // contains filtered or unexported fields }
InstrumentedEventStore wraps an EventStore with logging and optional profiling.
func NewInstrumentedEventStore ¶
func NewInstrumentedEventStore[E any](store EventStore[E], logger *slog.Logger, profiler *Profiler) *InstrumentedEventStore[E]
NewInstrumentedEventStore wraps a store with observability. Both logger and profiler are optional (pass nil to skip).
type KeyStore ¶
type KeyStore interface {
// GetKey returns the key bytes for the given ID.
GetKey(id string) ([]byte, error)
// ActiveKeyID returns the ID of the current encryption key.
ActiveKeyID() (string, error)
}
KeyStore provides access to encryption keys. Must be safe for concurrent use. Implementations: MemoryKeyStore (testing), or your own backed by Vault/KMS/etc.
type LoadOptions ¶
type LoadOptions struct {
// EventTypes filters by event type name. Empty means all types.
EventTypes []string
// FromVersion loads events starting from this version (inclusive). 0 means all.
FromVersion int
// Limit caps the number of events returned. 0 means no limit.
Limit int
}
LoadOptions configures optional filtering for event loading. Zero values mean "no filter" — backward compatible.
type LockRegistry ¶
type LockRegistry interface {
// Acquire blocks until the lock for the stream is obtained, then returns
// a release function. The caller MUST call release when done.
// Returns an error if the context is cancelled or the lock cannot be obtained.
Acquire(ctx context.Context, streamType, streamID string) (release func(), err error)
// TryAcquire attempts to obtain the lock without blocking.
// Returns a release function and true if successful, or nil and false if the lock is held.
TryAcquire(streamType, streamID string) (release func(), ok bool)
}
LockRegistry manages per-stream locks for single-writer concurrency control. Implementations must be safe for concurrent use.
type MemoryKeyStore ¶
type MemoryKeyStore struct {
// contains filtered or unexported fields
}
MemoryKeyStore is an in-memory KeyStore for testing and development.
func NewMemoryKeyStore ¶
func NewMemoryKeyStore() *MemoryKeyStore
NewMemoryKeyStore creates a new in-memory key store.
func (*MemoryKeyStore) ActiveKeyID ¶
func (s *MemoryKeyStore) ActiveKeyID() (string, error)
func (*MemoryKeyStore) AddKey ¶
func (s *MemoryKeyStore) AddKey(keyID string, key []byte)
AddKey adds a key to the store. The key must be exactly 32 bytes (AES-256). Panics if keyID is empty or key is wrong size — these are programmer errors.
func (*MemoryKeyStore) SetActiveKey ¶
func (s *MemoryKeyStore) SetActiveKey(keyID string)
SetActiveKey sets which key ID to use for new encryptions. Panics if the key hasn't been added yet — this is a configuration error.
type MemoryLockRegistry ¶
type MemoryLockRegistry struct {
// contains filtered or unexported fields
}
MemoryLockRegistry is an in-process LockRegistry backed by per-stream mutexes. Suitable for single-process deployments where all writes go through one instance.
func NewMemoryLockRegistry ¶
func NewMemoryLockRegistry() *MemoryLockRegistry
NewMemoryLockRegistry creates a new in-process lock registry.
func (*MemoryLockRegistry) Acquire ¶
func (r *MemoryLockRegistry) Acquire(ctx context.Context, streamType, streamID string) (func(), error)
Acquire blocks until the lock for streamID is obtained. Respects context cancellation via a background goroutine.
func (*MemoryLockRegistry) TryAcquire ¶
func (r *MemoryLockRegistry) TryAcquire(streamType, streamID string) (func(), bool)
TryAcquire attempts a non-blocking lock acquisition.
type MemorySnapshotStore ¶
type MemorySnapshotStore[S any] struct { // contains filtered or unexported fields }
MemorySnapshotStore is an in-memory SnapshotStore implementation. Useful for testing and single-process applications. Safe for concurrent use.
func NewMemorySnapshotStore ¶
func NewMemorySnapshotStore[S any]() *MemorySnapshotStore[S]
NewMemorySnapshotStore creates a new in-memory snapshot store.
func (*MemorySnapshotStore[S]) Invalidate ¶
func (s *MemorySnapshotStore[S]) Invalidate(_ context.Context, streamType, streamID string) error
Invalidate removes the snapshot for a single stream.
func (*MemorySnapshotStore[S]) InvalidateAll ¶
func (s *MemorySnapshotStore[S]) InvalidateAll(_ context.Context) error
InvalidateAll removes all snapshots.
func (*MemorySnapshotStore[S]) LoadSnapshot ¶
func (*MemorySnapshotStore[S]) SaveSnapshot ¶
func (s *MemorySnapshotStore[S]) SaveSnapshot(_ context.Context, snapshot Snapshot[S]) error
type MemoryStore ¶
type MemoryStore[E any] struct { // contains filtered or unexported fields }
MemoryStore is an in-memory EventStore implementation, useful for testing.
func NewMemoryStore ¶
func NewMemoryStore[E any](opts ...MemoryStoreOption[E]) *MemoryStore[E]
NewMemoryStore creates a new in-memory event store.
func (*MemoryStore[E]) AppendWithOptions ¶
func (s *MemoryStore[E]) AppendWithOptions(_ context.Context, streamType, streamID string, expectedVersion int, events []E, opts AppendOptions) ([]Event[E], error)
AppendWithOptions persists new events with idempotency and custom timestamp support.
func (*MemoryStore[E]) Archive ¶
func (s *MemoryStore[E]) Archive(ctx context.Context, streamType, streamID string, target EventStore[E]) error
Archive moves a stream to the target store and tombstones the primary.
func (*MemoryStore[E]) ArchiveStream ¶
func (s *MemoryStore[E]) ArchiveStream(ctx context.Context, streamType, streamID string) error
ArchiveStream marks a stream as archived. Future appends are rejected with ErrStreamArchived.
func (*MemoryStore[E]) Delete ¶
func (s *MemoryStore[E]) Delete(_ context.Context, streamType, streamID string) error
Delete permanently removes all events for a stream from the memory store. Returns ErrStreamNotFound if stream does not exist.
func (*MemoryStore[E]) DeleteStream ¶
func (s *MemoryStore[E]) DeleteStream(ctx context.Context, streamType, streamID string) error
DeleteStream permanently removes all events in a stream. Irreversible.
func (*MemoryStore[E]) IsTombstoned ¶
func (s *MemoryStore[E]) IsTombstoned(_ context.Context, streamType, streamID string) (*Tombstone, error)
IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.
func (*MemoryStore[E]) LatestSequence ¶
func (s *MemoryStore[E]) LatestSequence(_ context.Context) (uint64, error)
LatestSequence returns the highest global sequence, or 0 if empty.
func (*MemoryStore[E]) LoadRaw ¶
func (s *MemoryStore[E]) LoadRaw(_ context.Context, streamType, streamID string) ([]*RawEvent, error)
LoadRaw loads events without deserializing the Data field.
func (*MemoryStore[E]) LoadWithOptions ¶
func (s *MemoryStore[E]) LoadWithOptions(_ context.Context, streamType, streamID string, opts LoadOptions) ([]Event[E], error)
LoadWithOptions loads events with optional filtering for the memory store.
func (*MemoryStore[E]) ReadByStreamType ¶
func (s *MemoryStore[E]) ReadByStreamType(_ context.Context, streamType string, fromSequence uint64, limit int) ([]Event[E], error)
ReadByStreamType reads global events filtered by stream type.
func (*MemoryStore[E]) ReadFrom ¶
func (s *MemoryStore[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]Event[E], error)
ReadFrom implements GlobalReader — reads events by global sequence.
func (*MemoryStore[E]) ReadFromWithOptions ¶
func (s *MemoryStore[E]) ReadFromWithOptions(_ context.Context, fromSequence uint64, limit int, opts LoadOptions) ([]Event[E], error)
ReadFromWithOptions reads global events with optional event type filtering.
func (*MemoryStore[E]) Restore ¶
func (s *MemoryStore[E]) Restore(ctx context.Context, streamType, streamID string, source EventStore[E]) error
Restore moves an archived stream back from the source store to the primary.
func (*MemoryStore[E]) RestoreStream ¶
func (s *MemoryStore[E]) RestoreStream(ctx context.Context, streamType, streamID string) error
RestoreStream brings an archived stream back to active state by removing the tombstone.
func (*MemoryStore[E]) StreamStatus ¶
func (s *MemoryStore[E]) StreamStatus(_ context.Context, streamType, streamID string) (StreamState, error)
StreamStatus returns the current lifecycle state of a stream.
func (*MemoryStore[E]) Tombstone ¶
func (s *MemoryStore[E]) Tombstone(_ context.Context, streamType, streamID string, reason string) error
Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. Load returns empty. The tombstone record remains for audit.
func (*MemoryStore[E]) TombstoneStream ¶
func (s *MemoryStore[E]) TombstoneStream(ctx context.Context, streamType, streamID string) error
TombstoneStream marks a stream as deleted. Future appends are rejected with ErrStreamDeleted.
type MemoryStoreOption ¶
type MemoryStoreOption[E any] func(*MemoryStore[E])
MemoryStoreOption configures a MemoryStore.
func WithMemoryMarshal ¶
func WithMemoryMarshal[E any](marshal func(any) ([]byte, error)) MemoryStoreOption[E]
WithMemoryMarshal sets a custom marshal function for event serialization. This accepts any codec.Codec-compatible marshal function. Defaults to encoding/json if not set.
func WithMemoryRegistry ¶
func WithMemoryRegistry[E any](reg *EventRegistry) MemoryStoreOption[E]
WithMemoryRegistry enables type registry for heterogeneous event deserialization.
func WithMemoryUpcasters ¶
func WithMemoryUpcasters[E any](u *UpcasterRegistry) MemoryStoreOption[E]
WithMemoryUpcasters enables event upcasting for schema evolution during Load.
type Metadata ¶
type Metadata struct {
// CorrelationID groups related events across streams/services.
CorrelationID string `json:"correlation_id,omitempty"`
// CausationID identifies the event or command that caused this event.
CausationID string `json:"causation_id,omitempty"`
// Initiator is who directly caused this event (e.g., the user clicking "submit").
Initiator Principal `json:"initiator,omitempty"`
// Originator is who started the causal chain (e.g., the user who created the order,
// even if this event was produced by an automation triggered by that order).
Originator Principal `json:"originator,omitempty"`
// IdempotencyKey is an optional key for idempotent appends.
// When set, pgstore will deduplicate: if events with this key already exist
// for the stream, Append returns the existing events instead of inserting duplicates.
IdempotencyKey string `json:"idempotency_key,omitempty"`
// Extra holds arbitrary key-value pairs for extensibility.
// Bounded: max MetadataEntriesMax entries, max MetadataMaxSizeBytes total size.
Extra map[string]string `json:"extra,omitempty"`
}
Metadata holds contextual information about an event for tracing, audit, and debugging.
func MetaFromContext ¶
MetaFromContext reads metadata from the context. Returns zero Metadata and false if not set.
The command bus calls this internally — you typically don't need to.
func (*Metadata) MetadataRange ¶
MetadataRange iterates over Extra entries without allocating a copy. Return false from fn to stop iteration early.
type Middleware ¶
type Middleware[S any, C any, E any] func(ctx context.Context, streamType, streamID string, cmd C, next HandleFunc[S, E]) (S, []Event[E], error)
Middleware intercepts command handling. It receives the command context and calls next to continue the chain. Middleware can:
- Validate or enrich commands before handling
- Log, meter, or trace command execution
- Transform or filter the result after handling
The next function executes the rest of the middleware chain and the actual handler.
func LoggingMiddleware ¶
Logger returns a middleware that logs command handling using slog.
func ProfilerMiddleware ¶
func ProfilerMiddleware[S any, C any, E any](profiler *Profiler) Middleware[S, C, E]
ProfilerMiddleware returns a middleware that records command handling durations. The operation name is "CommandHandler.Handle" for all commands. For per-command-type profiling, use ProfilerMiddlewareWithType.
func ProfilerMiddlewareFunc ¶
func ProfilerMiddlewareFunc[S any, C any, E any](profiler *Profiler, nameFunc func(C) string) Middleware[S, C, E]
ProfilerMiddlewareFunc returns a middleware that uses a custom function to derive the operation name from the command. This allows per-command-type profiling.
func SingleWriterMiddleware ¶
func SingleWriterMiddleware[S any, C any, E any](registry LockRegistry, reject bool) Middleware[S, C, E]
SingleWriterMiddleware returns a middleware that serializes command handling per stream using the provided LockRegistry. This eliminates optimistic concurrency retries — if you hold the lock, no one else can write.
When reject is true, commands that can't immediately acquire the lock are rejected with an error instead of blocking.
func WithLogging ¶
WithLogging returns middleware that logs command handling using the provided slog.Logger. It logs the command type, stream ID, duration, number of events produced, and any error.
func WithMetrics ¶
func WithMetrics[S any, C any, E any]() Middleware[S, C, E]
WithMetrics is a placeholder for OpenTelemetry metrics middleware. Since OTel is not a core dependency, this serves as documentation for how to add metrics to your command handling pipeline.
To implement metrics with OpenTelemetry, create middleware like:
func WithOTelMetrics[S any, C any, E any](meter metric.Meter) eskit.Middleware[S, C, E] {
commandDuration, _ := meter.Float64Histogram("eskit.command.duration",
metric.WithUnit("s"),
metric.WithDescription("Command handling duration"),
)
commandCount, _ := meter.Int64Counter("eskit.command.count",
metric.WithDescription("Total commands handled"),
)
return func(ctx context.Context, streamID string, cmd C, next eskit.HandleFunc[S, E]) (S, []Event[E], error) {
start := time.Now()
state, events, err := next(ctx)
attrs := []attribute.KeyValue{
attribute.String("command", fmt.Sprintf("%T", cmd)),
attribute.Bool("error", err != nil),
}
commandDuration.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes(attrs...))
commandCount.Add(ctx, 1, metric.WithAttributes(attrs...))
return state, events, err
}
}
type MultiOption ¶
type MultiOption func(*multiSubConfig)
MultiOption configures a MultiSubscription.
func OnChange ¶
func OnChange(projection string, handler func()) MultiOption
OnChange registers a handler that fires whenever any stream in the named projection changes. Use this for list views, dashboards, and summary pages.
eskit.OnChange("orders.order-list", func() {
orders := repo.ListOrders(ctx)
sse.PatchElementTempl(OrderList(orders))
})
func OnChangeWithKey ¶
func OnChangeWithKey(projection, key string, handler func()) MultiOption
OnChangeWithKey registers a handler that fires only when the specific stream identified by key changes within the named projection. Use this for detail pages and entity-specific panels where you only care about one stream.
eskit.OnChangeWithKey("orders.order-detail", orderID, func() {
order := repo.GetOrder(ctx, orderID)
sse.PatchElementTempl(OrderDetail(order))
})
func WithDebounce ¶
func WithDebounce(d time.Duration) MultiOption
WithDebounce enables change coalescing. When set, after receiving a change notification the MultiSubscription waits for the debounce duration collecting additional changes, then fires each affected handler exactly once.
This is useful when a single user action triggers many events (e.g., bulk import) and you want to avoid redundant re-renders.
Set to 0 (default) to disable debouncing.
func WithPollFallback ¶
func WithPollFallback(d time.Duration) MultiOption
WithPollFallback enables periodic synthetic notifications as a safety net, ensuring SSE handlers re-read from the database even if push notifications are lost. The poll timer resets after every real notification, preventing spurious renders when the system is healthy.
Set to 0 (default) to disable polling.
type MultiSubscription ¶
type MultiSubscription struct {
// contains filtered or unexported fields
}
MultiSubscription manages multiple projection subscriptions with fan-in dispatch, optional debounce coalescing, and poll fallback. It simplifies SSE handlers that need to react to changes from several projections by consolidating them into a single Run loop.
Example usage:
notifier := eskit.NewChangeNotifier()
ms := eskit.NewMultiSubscription(notifier,
eskit.OnChange("orders.order-list", func() {
// re-render order list
}),
eskit.OnChangeWithKey("orders.order-detail", "order-42", func() {
// re-render order 42 detail
}),
eskit.WithPollFallback(5*time.Second),
eskit.WithDebounce(100*time.Millisecond),
)
ms.RenderAll() // initial render
go ms.Run(ctx) // blocks until ctx cancelled
// later:
ms.Close()
func NewMultiSubscription ¶
func NewMultiSubscription(notifier *ChangeNotifier, opts ...MultiOption) *MultiSubscription
NewMultiSubscription creates a MultiSubscription that manages fan-in dispatch for multiple projection subscriptions. At least one OnChange or OnChangeWithKey option must be provided.
Panics if notifier is nil or no subscriptions are configured.
func (*MultiSubscription) Close ¶
func (m *MultiSubscription) Close()
Close stops all subscriptions and fan-in goroutines. Safe to call multiple times; subsequent calls are no-ops.
func (*MultiSubscription) RenderAll ¶
func (m *MultiSubscription) RenderAll()
RenderAll fires every registered handler exactly once. Typically called before Run to perform the initial render of all projections.
func (*MultiSubscription) Run ¶
func (m *MultiSubscription) Run(ctx context.Context)
Run subscribes to all configured projections and blocks, dispatching handlers as changes arrive. It returns when ctx is cancelled or [Close] is called.
If debounce is configured, rapid changes to the same projection are coalesced so each handler fires at most once per debounce window.
If poll fallback is configured, the poll timer resets after every real dispatch, preventing spurious renders when notifications are flowing.
type NopAuditLogger ¶
type NopAuditLogger struct{}
NopAuditLogger is an AuditLogger that discards all entries.
func (NopAuditLogger) LogRead ¶
func (NopAuditLogger) LogRead(_ context.Context, _ AuditEntry) error
func (NopAuditLogger) LogWrite ¶
func (NopAuditLogger) LogWrite(_ context.Context, _ AuditEntry) error
type OperationStats ¶
type OperationStats struct {
Name string `json:"name"`
Count int64 `json:"count"`
TotalMs float64 `json:"total_ms"`
AvgMs float64 `json:"avg_ms"`
P50Ms float64 `json:"p50_ms"`
P95Ms float64 `json:"p95_ms"`
P99Ms float64 `json:"p99_ms"`
MaxMs float64 `json:"max_ms"`
MinMs float64 `json:"min_ms"`
LastMs float64 `json:"last_ms"`
// Trend is the ratio of recent P95 to older P95. >1.0 means getting slower.
// 0 if insufficient data.
Trend float64 `json:"trend"`
}
OperationStats contains timing statistics for a named operation.
func (OperationStats) TrendLabel ¶
func (s OperationStats) TrendLabel() string
TrendLabel returns a human-readable trend indicator.
type Principal ¶
type Principal struct {
Kind PrincipalKind `json:"kind,omitempty"`
ID string `json:"id,omitempty"`
}
Principal represents who is performing an action. Initiator is who directly caused it; Originator is who started the chain.
type PrincipalKind ¶
type PrincipalKind string
PrincipalKind defines the type of actor initiating an action.
const ( PrincipalUser PrincipalKind = "user" PrincipalAutomation PrincipalKind = "automation" PrincipalService PrincipalKind = "service" PrincipalSystem PrincipalKind = "system" PrincipalScheduler PrincipalKind = "scheduler" PrincipalAPI PrincipalKind = "api" )
type Profiler ¶
type Profiler struct {
// contains filtered or unexported fields
}
Profiler tracks operation timings with a rolling window and computes percentile stats.
func NewProfiler ¶
func NewProfiler() *Profiler
NewProfiler creates a profiler with the default window size.
func NewProfilerWithWindow ¶
NewProfilerWithWindow creates a profiler with a custom rolling window size.
func (*Profiler) AllStats ¶
func (p *Profiler) AllStats() []OperationStats
AllStats returns statistics for all tracked operations, sorted by name.
func (*Profiler) Degrading ¶
func (p *Profiler) Degrading() []OperationStats
Degrading returns operations whose recent P95 is trending higher than historical.
func (*Profiler) Slowest ¶
func (p *Profiler) Slowest(n int) []OperationStats
Slowest returns the top N operations by P95 latency.
func (*Profiler) Stats ¶
func (p *Profiler) Stats(operation string) OperationStats
Stats returns statistics for a single operation. Returns zero stats if not found.
type ProjectionRunner ¶
type ProjectionRunner[E any] struct { // contains filtered or unexported fields }
ProjectionRunner subscribes to an EventBus and feeds events to an EventDispatcher. In single-node mode, all events reach all subscriptions. In cluster mode with a QueueSubscriber (e.g., NATS event bus), use queue groups so each event is processed by exactly ONE node.
func NewProjectionRunner ¶
func NewProjectionRunner[E any](bus EventBus, dispatcher *EventDispatcher[E]) *ProjectionRunner[E]
NewProjectionRunner creates a runner that feeds bus events to the dispatcher.
func (*ProjectionRunner[E]) Start ¶
func (r *ProjectionRunner[E]) Start(ctx context.Context, queueGroup string) error
Start subscribes to the event bus and begins feeding events to the dispatcher.
func (*ProjectionRunner[E]) Stop ¶
func (r *ProjectionRunner[E]) Stop() error
Stop unsubscribes from the event bus.
type ProjectionTest ¶
type ProjectionTest[E any] struct { // contains filtered or unexported fields }
ProjectionTest provides a Given/Then fluent API for testing projection handlers. Each test is a specification that maps directly to event modeling read models:
eskit.TestProjection(t, handler).
Given("order-1",
OrderCreated{ID: "order-1", Total: 100},
ItemAdded{OrderID: "order-1", SKU: "shoe"},
).
Then(func(t *testing.T) {
// Query your read model and assert results
})
Internally: wraps events in Event[E] with incremental versions and timestamps, then feeds them through the handler.
func TestProjection ¶
func TestProjection[E any](t *testing.T, handler func(ctx context.Context, event Event[E]) error) *ProjectionTest[E]
TestProjection creates a new projection test with the Given/Then fluent API. This is the projection counterpart to Test for deciders.
func (*ProjectionTest[E]) Given ¶
func (pt *ProjectionTest[E]) Given(streamID string, events ...E) *ProjectionTest[E]
Given feeds events for a stream through the handler. Events are wrapped in Event[E] with incremental versions (starting at 1) and timestamps offset by 1 second from DefaultBaseTime.
Call Given multiple times for events from different streams:
eskit.TestProjection(t, handler).
Given("order-1", OrderCreated{}).
Given("order-2", OrderCreated{}).
Then(func(t *testing.T) { ... })
func (*ProjectionTest[E]) Then ¶
func (pt *ProjectionTest[E]) Then(fn func(t *testing.T))
Then projects all Given events through the handler, then calls fn for assertions. If any event fails to project, the test fails immediately.
type QueueSubscriber ¶
type QueueSubscriber interface {
SubscribeQueue(ctx context.Context, queueGroup string, handler func(streamID string, events []EventEnvelope)) (BusSubscription, error)
}
QueueSubscriber is an optional interface for EventBus implementations that support queue groups (e.g., NATS). Queue groups ensure each event is processed by exactly ONE subscriber in the group — useful for distributed projections and processors.
type RawEvent ¶
type RawEvent struct {
ID string
StreamType string
StreamID string
Version int
EventType string
GlobalSequence uint64
SchemaVersion int
Timestamp time.Time
Metadata Metadata
// Codec is the name of the codec used to serialize this event.
// Empty string defaults to "json" for backward compatibility.
Codec string
// contains filtered or unexported fields
}
RawEvent holds an event with deferred deserialization. The Data field is only deserialized when Decode() is called. This avoids allocation overhead when you only need metadata or a subset of events.
func (*RawEvent) Decode ¶
Decode deserializes the event data into the target type. If a registry is configured, it uses it for type resolution. The result is cached — subsequent calls return the same value.
func (*RawEvent) DecodeWithRegistry ¶
DecodeWithRegistry deserializes using the event registry (for interface types). Result is cached after first call.
func (*RawEvent) SetRawData ¶
func (r *RawEvent) SetRawData(data []byte, registry *EventRegistry)
SetRawData sets the raw JSON bytes and optional registry (used by stores).
type ResilientDispatcher ¶
type ResilientDispatcher[E any] struct { // contains filtered or unexported fields }
ResilientDispatcher wraps an EventDispatcher with retry and DLQ support. On handler failure, it retries with exponential backoff. If all retries are exhausted, the event goes to the DLQ instead of blocking.
func NewResilientDispatcher ¶
func NewResilientDispatcher[E any](cfg ResilientDispatcherConfig[E]) *ResilientDispatcher[E]
NewResilientDispatcher creates a dispatcher with retry and DLQ support. Panics if Dispatcher or DLQ is nil — these are configuration errors.
func (*ResilientDispatcher[E]) Dispatch ¶
func (rd *ResilientDispatcher[E]) Dispatch(ctx context.Context, events []Event[E]) error
Dispatch processes events with retry and DLQ support. Events that fail all retry attempts are sent to the DLQ. Returns nil even if events go to the DLQ — the caller should not be blocked. Returns an error only if the DLQ itself fails.
type ResilientDispatcherConfig ¶
type ResilientDispatcherConfig[E any] struct { // Dispatcher is the underlying event dispatcher. Required. Dispatcher *EventDispatcher[E] // DLQ receives events that fail all retries. Required. DLQ *DLQ[E] // MaxRetries before sending to DLQ. 0 = default (5). MaxRetries int // BaseDelay for exponential backoff. 0 = default (100ms). BaseDelay time.Duration // MaxDelay caps the backoff. 0 = default (30s). MaxDelay time.Duration }
ResilientDispatcherConfig configures retry and DLQ behavior.
type ServeOption ¶
type ServeOption func(*serveConfig)
ServeOption configures the behaviour of ServeChanges and ServeChangesWatch.
func WithServePollFallback ¶
func WithServePollFallback(d time.Duration) ServeOption
WithServePollFallback sets the fallback poll interval used to guarantee periodic re-renders even when no real change notifications arrive. The default is 5 s. Set to 0 to disable fallback polling entirely.
type Snapshot ¶
type Snapshot[S any] struct { StreamType string StreamID string Version int State S SchemaVersion int // tracks schema for invalidation CreatedAt time.Time // when the snapshot was saved }
Snapshot represents a point-in-time snapshot of decider state.
type SnapshotStore ¶
type SnapshotStore[S any] interface { // SaveSnapshot persists a snapshot of decider state at a given version. // If a snapshot already exists for the stream, it is overwritten. SaveSnapshot(ctx context.Context, snapshot Snapshot[S]) error // LoadSnapshot loads the latest snapshot for a stream. // Returns nil, nil if no snapshot exists (not an error). LoadSnapshot(ctx context.Context, streamType, streamID string) (*Snapshot[S], error) // Invalidate deletes the snapshot for a single stream. // Returns nil if no snapshot existed. Invalidate(ctx context.Context, streamType, streamID string) error // InvalidateAll deletes all snapshots. Use after deploying schema changes // or Evolve logic changes that affect all streams. InvalidateAll(ctx context.Context) error }
SnapshotStore is an optional interface for persisting decider state snapshots. Implementations must be safe for concurrent use.
type StateView ¶
type StateView[E any] struct { // Name identifies this projection (used for checkpoint tracking). Name string // EventTypes lists the event types this view cares about. // Empty = all events (not recommended). EventTypes []string // Evolve applies a single event to the read model. Evolve func(ctx context.Context, event Event[E]) error // Setup initializes the read model (e.g., CREATE TABLE). Optional. Setup func(ctx context.Context) error // Reset clears the read model for rebuild (e.g., TRUNCATE TABLE). Optional. Reset func(ctx context.Context) error // OnChange is called after Evolve succeeds for each event. // Use this to notify SSE handlers or other watchers that the projection // has been updated for a specific stream. Optional. // // Implementations must be non-blocking — slow consumers should be dropped // rather than delaying event processing. OnChange func(streamID string, eventType string) // AtomicCheckpoint indicates that the Evolve function saves the checkpoint // within its own transaction, making Evolve + checkpoint atomic. // When true, the subscription will NOT save the checkpoint separately — // Evolve is responsible for persisting it (using CheckpointFromContext // from the subscription package to get consumer ID and sequence). // // This prevents double processing of non-idempotent side effects // (e.g., sending emails, charging payments) on crash recovery. AtomicCheckpoint bool }
StateView defines a read-model projection with lifecycle hooks. Used with the subscription package for durable, checkpoint-based projections.
StateView is a configuration struct — not a dispatcher. Wire it into the event stream with subscription.FromStateView().
type Stream ¶
type Stream struct {
// Type is the aggregate category (e.g., "order", "account"). Required.
Type string
// ID is the business identifier (e.g., "123", "acc-456"). Required.
ID string
}
Stream identifies a stream by type and business ID.
type StreamLifecycle ¶
type StreamLifecycle[E any] interface { // ArchiveStream moves a stream to cold storage. Archived streams cannot receive new events. ArchiveStream(ctx context.Context, streamType, streamID string) error // RestoreStream brings an archived stream back to active state. RestoreStream(ctx context.Context, streamType, streamID string) error // TombstoneStream marks a stream as deleted. Future appends are rejected. // Events can still be read (for audit). Use DeleteStream for hard delete. TombstoneStream(ctx context.Context, streamType, streamID string) error // DeleteStream permanently removes all events in a stream. Irreversible. DeleteStream(ctx context.Context, streamType, streamID string) error // StreamStatus returns the current lifecycle state of a stream. StreamStatus(ctx context.Context, streamType, streamID string) (StreamState, error) }
StreamLifecycle provides stream lifecycle management.
type StreamState ¶
type StreamState int
StreamState represents the lifecycle state of a stream.
const ( // StreamActive is the default state — the stream accepts appends. StreamActive StreamState = iota // StreamArchived means the stream is in cold storage. Appends are rejected. StreamArchived // StreamTombstoned means the stream is soft-deleted. Appends are rejected. // Events may still be readable for audit purposes. StreamTombstoned )
func (StreamState) String ¶
func (s StreamState) String() string
String returns a human-readable representation of the stream state.
type StreamTypeReader ¶
type StreamTypeReader[E any] interface { // ReadByStreamType returns events filtered by stream type, starting from // the given global sequence (inclusive), up to limit events. ReadByStreamType(ctx context.Context, streamType string, fromSequence uint64, limit int) ([]Event[E], error) }
StreamTypeReader reads events filtered by stream type. Stores that support indexed stream type queries implement this.
type Subscription ¶
type Subscription[E any] struct { // Name identifies this subscription for error messages and debugging. Required. Name string // EventTypes lists event type names this subscription handles (from [TypeName]). // Empty = all events (match-all). EventTypes []string // Handler processes a matched event. Required. Handler func(ctx context.Context, event Event[E]) error }
Subscription defines a named event handler with optional type filtering.
EventTypes declares which event type names this subscription handles. Events with no matching subscription are skipped — zero overhead. Empty EventTypes means ALL events (match-all).
Use Subscription for both read-model projections and side-effect reactions. The EventDispatcher treats them identically; semantics are up to you.
type TenantEventStore ¶
type TenantEventStore[E any] struct { // contains filtered or unexported fields }
TenantEventStore wraps an EventStore to add tenant-scoping. All stream IDs are transparently prefixed with the tenant ID from context. If no tenant is in context, operations return ErrTenantRequired.
func NewTenantEventStore ¶
func NewTenantEventStore[E any](inner EventStore[E]) *TenantEventStore[E]
NewTenantEventStore wraps an event store with tenant-scoping. Panics if inner is nil.
type TenantID ¶
type TenantID string
TenantID is a validated tenant identifier.
func MustTenantFrom ¶
MustTenantFrom extracts the tenant ID or panics if not present. Use in handlers where tenant is always required (after middleware validation).
type Tombstone ¶
type Tombstone struct {
// StreamType is the type of the deleted stream.
StreamType string
// StreamID is the deleted stream.
StreamID string
// Reason explains why the stream was tombstoned (e.g., "gdpr_request", "archived").
Reason string
// Timestamp is when the tombstone was created.
Timestamp time.Time
// ArchivedTo identifies where the stream was archived (empty if not archived).
ArchivedTo string
}
Tombstone records the deletion of a stream for audit purposes.
type TypeCache ¶
type TypeCache[T any] struct { // contains filtered or unexported fields }
TypeCache provides near-zero latency pooled type acquisition for registered event types. It caches the sync.Pool lookup per type, avoiding map lookups on the hot path.
Usage:
registry := eskit.NewEventRegistry()
registry.Register("OrderCreated", func() any { return &OrderCreated{} })
cache := eskit.NewTypeCache[OrderCreated](registry, "OrderCreated")
obj := cache.Acquire() // ~19 ns/op from pool
// use obj...
cache.Release(obj) // return to pool
Thread-safe. The Acquire/Release methods never allocate on the hot path when the pool has available objects.
func NewTypeCache ¶
func NewTypeCache[T any](r *EventRegistry, eventType string) *TypeCache[T]
NewTypeCache creates a cached type lookup for maximum performance. The eventType must already be registered in the registry. Panics if the type is not registered.
type TypeNamer
deprecated
type TypeNamer interface {
EventType() string
}
TypeNamer is an optional interface events can implement to provide their type name. If not implemented, the registry's reverse lookup (reflect.Type → name) is used.
Deprecated: Use eskit.Register[E](reg) instead. The registry derives the wire name from the Go type automatically. TypeNamer remains for backward compatibility.
type UnmarshalFunc ¶
UnmarshalFunc is a function that unmarshals data into a target. Used to thread custom codecs through the deserialization pipeline.
type UpcasterFunc ¶
type UpcasterFunc func(data json.RawMessage) (json.RawMessage, error)
UpcasterFunc transforms raw event JSON from one schema version to the next. Input and output are both JSON bytes — this keeps the chain composable without requiring intermediate Go types to exist in the codebase.
Why JSON bytes instead of typed values? When upcasting v1→v2→v3, the v1 and v2 Go structs may have been deleted. Working with raw JSON means we only need the transformation logic, not the types.
type UpcasterRegistry ¶
type UpcasterRegistry struct {
// contains filtered or unexported fields
}
UpcasterRegistry manages per-event-type version chains. Thread-safe: all methods are safe for concurrent use.
Performance: Uses a map[int] index per event type for O(1) chain lookup instead of linear scan. The index is rebuilt on Register (cold path) so Upcast (hot path) pays zero overhead.
func NewUpcasterRegistry ¶
func NewUpcasterRegistry() *UpcasterRegistry
NewUpcasterRegistry creates an empty upcaster registry.
func (*UpcasterRegistry) ChainLength ¶
func (r *UpcasterRegistry) ChainLength(eventType string) int
ChainLength returns the number of upcasters registered for an event type. Returns 0 if no upcasters are registered. Useful for testing and diagnostics.
func (*UpcasterRegistry) HasUpcaster ¶
func (r *UpcasterRegistry) HasUpcaster(eventType string, fromVersion int) bool
HasUpcaster returns true if an upcaster is registered for the given event type and fromVersion. O(1) via index.
func (*UpcasterRegistry) LatestVersion ¶
func (r *UpcasterRegistry) LatestVersion(eventType string) int
LatestVersion returns the highest version reachable from any registered upcaster for the given event type. Returns 0 if no upcasters are registered.
Why this exists: callers need to know the target version for Upcast() without hardcoding it. The registry knows because the chain's last toVersion is the latest.
func (*UpcasterRegistry) Register ¶
func (r *UpcasterRegistry) Register(eventType string, fromVersion, toVersion int, fn UpcasterFunc)
Register adds an upcaster that converts eventType from fromVersion to toVersion. Upcasters are chained: registering v1→v2 and v2→v3 allows automatic v1→v3.
Panics on programmer errors (nil fn, invalid versions, duplicate registration). These are configuration bugs that must be caught at startup, not runtime.
func (*UpcasterRegistry) RegisteredTypes ¶
func (r *UpcasterRegistry) RegisteredTypes() []string
RegisteredTypes returns all event types that have upcasters registered.
func (*UpcasterRegistry) Upcast ¶
func (r *UpcasterRegistry) Upcast(eventType string, data json.RawMessage, fromVersion, targetVersion int) (json.RawMessage, error)
Upcast transforms event data from fromVersion to targetVersion by chaining registered upcasters. Returns the original data unchanged if fromVersion == targetVersion.
Hot path — zero allocations beyond what the upcaster functions themselves allocate. Chain lookup is O(1) via pre-built index.
type WaitFunc ¶
WaitFunc blocks until a relevant change arrives, then returns true. Returns false when the context is cancelled or the notifier is closed.
type WithMetadata ¶
type WithMetadata interface {
EventMetadata() Metadata
}
WithMetadata is an optional interface for commands that carry metadata to propagate through to stored events.
type WithPrincipal ¶
WithPrincipal is an optional interface for commands that identify who is performing the action. The CommandBus auto-propagates this.
Source Files
¶
- append_options.go
- audit.go
- changenotifier.go
- context.go
- crypto.go
- decider.go
- decidertest.go
- deletion.go
- deserialize.go
- dispatcher.go
- dlq.go
- errors.go
- event.go
- event_registry.go
- eventbus.go
- eventbus_channel.go
- handler.go
- helpers.go
- lifecycle.go
- memory.go
- memory_deletion.go
- memory_lifecycle.go
- middleware.go
- middleware_builtin.go
- multi_subscription.go
- observability.go
- profiler.go
- projectiontest.go
- raw_event.go
- register.go
- runner.go
- serve_changes.go
- singlewriter.go
- snapshot_store.go
- stateview.go
- store.go
- stream.go
- tenant.go
- typecache.go
- upcaster.go
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
sse-stress
command
Command sse-stress benchmarks eskit SSE infrastructure.
|
Command sse-stress benchmarks eskit SSE infrastructure. |
|
codec
module
|
|
|
Package command provides an in-process CommandBus with single-writer guarantee per stream.
|
Package command provides an in-process CommandBus with single-writer guarantee per stream. |
|
Package commandlog provides an interface and middleware for recording every command dispatched through a CommandBus.
|
Package commandlog provides an interface and middleware for recording every command dispatched through a CommandBus. |
|
commandqueue
module
|
|
|
Package conformance provides an end-to-end conformance suite that tests the full event sourcing pipeline:
|
Package conformance provides an end-to-end conformance suite that tests the full event sourcing pipeline: |
|
Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing.
|
Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing. |
|
embeddednats
module
|
|
|
Package eventstoretest provides a reusable conformance test suite for EventStore implementations.
|
Package eventstoretest provides a reusable conformance test suite for EventStore implementations. |
|
Package gdpr provides GDPR-compliant crypto-shredding for event-sourced systems.
|
Package gdpr provides GDPR-compliant crypto-shredding for event-sourced systems. |
|
Package hooks provides composable lifecycle hooks for eskit event stores, command dispatch, and projection processing.
|
Package hooks provides composable lifecycle hooks for eskit event stores, command dispatch, and projection processing. |
|
Package id provides distributed, time-ordered unique ID generation using snowflake IDs.
|
Package id provides distributed, time-ordered unique ID generation using snowflake IDs. |
|
Package liveprojection provides a real-time, topic-based event projection with fan-out to subscribers.
|
Package liveprojection provides a real-time, topic-based event projection with fan-out to subscribers. |
|
Package middleware provides composable middleware for eskit EventStore and CommandBus.
|
Package middleware provides composable middleware for eskit EventStore and CommandBus. |
|
natsstore
module
|
|
|
otelkit
module
|
|
|
pgstore
module
|
|
|
pgview
module
|
|
|
Package processor provides event-driven processing for eskit.
|
Package processor provides event-driven processing for eskit. |
|
Package projection provides tools for managing and rebuilding projections.
|
Package projection provides tools for managing and rebuilding projections. |
|
Package rebuild provides zero-downtime projection rebuilds using shadow tables.
|
Package rebuild provides zero-downtime projection rebuilds using shadow tables. |
|
Package snapshot provides configuration options for decider snapshot behavior.
|
Package snapshot provides configuration options for decider snapshot behavior. |
|
sqlitestore
module
|
|
|
Package sqlview provides SQL-backed StateView projections with transactional event processing.
|
Package sqlview provides SQL-backed StateView projections with transactional event processing. |
|
Package subscription provides guaranteed event delivery for event-sourced systems.
|
Package subscription provides guaranteed event delivery for event-sourced systems. |