audit

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2025 License: Unlicense Imports: 26 Imported by: 0

Documentation

Overview

Package audit provides a robust, in-memory, publish-subscribe bus for managing and processing audit events. It is designed for high-throughput, low-latency event processing, incorporating features for resilience, observability, and data integrity. The package offers mechanisms for asynchronous event delivery, historical event storage, spillover to disk during backpressure, rate limiting, and circuit breaking.

Core Concepts:

The central component of this package is the `Bus` which facilitates the publish-subscribe pattern.

  • Event: The `Event` interface represents a single auditable occurrence. It defines methods to access common attributes such as `ID()`, `Type()`, `Time()`, `Source()`, `ContextID()`, and `Payload()`. The `BasicEvent` struct provides a simple, concrete implementation of this interface. Events also carry `SpanContext()` for distributed tracing correlation.

  • EventType: A `EventType` is a string identifier for the kind of an audit event (e.g., "http_request_received", "auth_login"). The special `EventAny` constant allows subscribing to all events.

  • Handler: A `Handler` is a function type `func(evt Event) error` that processes an incoming `Event`. Handlers are subscribed to specific `EventType`s or to `EventAny`.

  • BusConfig: The `BusConfig` struct holds all configurable parameters for the `Bus`, allowing fine-grained control over its behavior, including buffer sizes, worker counts, asynchronous behavior, sampling rates, spillover, memory limits, circuit breaker settings, rate limiting, error handling, metrics, transport, and access control. `DefaultBusConfig()` provides sensible defaults.

Key Features:

  1. Event Publishing: Events can be published to the bus using several methods: - `Publish(evt Event)`: Publishes an event. If the bus is configured for asynchronous delivery (`Async: true`), the event is placed into an internal queue for processing by workers. If the queue is full, the event may be dropped or spilled to disk. If `Async` is `false`, it acts like `PublishSync`. - `PublishSync(evt Event)`: Publishes an event synchronously, meaning all subscribed handlers for that event type and global handlers are executed immediately within the calling goroutine. - `PublishWithTimeout(evt Event, timeout time.Duration)`: Publishes an event asynchronously but blocks until the event is accepted into the internal queue or the specified timeout is reached. If the timeout is exceeded, an `ErrPublishTimeout` is returned, and the event may be spilled.
  1. Event Subscription: Handlers can be registered to receive events using `Subscribe(et EventType, h Handler)`. Multiple handlers can subscribe to the same event type. Handlers can also subscribe to `EventAny` to receive all events.
  1. Event History: The `Bus` maintains an in-memory history of recently published events, capped by `HistoryCap`. - `History(ctx context.Context)`: Retrieves a slice of the stored historical events. - `SetHistoryCap(n int)`: Dynamically adjusts the capacity of the history buffer. The history also respects a `MaxMemoryMB` limit to prevent excessive memory consumption, estimating event sizes to manage the memory footprint.
  1. Spillover to Disk: When the internal event queue is full or the circuit breaker is open, events can be spilled to disk if `SpilloverDir` is configured in `BusConfig`. The `spilloverHandler` writes events as JSON lines to a log file. The `RecoverSpillover()` method attempts to re-publish these spilled events when the bus is operating normally and the queue has capacity.
  1. Rate Limiting: The bus can enforce a publishing rate limit using the `RateLimit` and `RateBurst` parameters in `BusConfig`. Events exceeding this rate are dropped or spilled.
  1. Circuit Breaker: A `circuitBreaker` mechanism is integrated to protect the bus from continuously failing handlers or external transports. If `CircuitMaxFails` consecutive handler errors occur within a `CircuitTimeout`, the circuit opens, and events are dropped or spilled instead of being processed, allowing the failing component to recover.
  1. Metrics Integration: The `BusMetrics` interface defines a contract for reporting metrics such as published events, dropped events, and handler latency. `PrometheusMetrics` provides a Prometheus-compatible implementation, and `nopMetrics` is a no-operation implementation for when metrics are not needed. Metrics can be configured using `WithMetrics` or `WithMetricsRegisterer`.
  1. External Transport: The `Transport` interface allows integrating external systems (e.g., message queues like Kafka) for event persistence or further processing. `KafkaTransport` is provided as a concrete implementation for sending events to Kafka, supporting retries and asynchronous sending. A transport can be set via `WithTransport`.

Event Definition and Types:

The package defines a comprehensive set of predefined `EventType` constants and corresponding `New...` helper functions for common application events. These include:

  • HTTP Events: `EventTypeHTTPRequestReceived`, `EventTypeHTTPResponseSent`, `EventTypeHTTPRouteNotFound`, `EventTypeHTTPMethodNotAllowed`.
  • Authentication Events: `EventTypeAuthRegister`, `EventTypeAuthLogin`, `EventTypeAuthLogout`, `EventTypeAuthTokenIssued`, `EventTypeAuthTokenRevoked`, `EventTypeAuthCredentialsChecked`.
  • Database Events: `EventTypeDBConnected`, `EventTypeDBInit`, `EventTypeDBError`, `EventTypeDBQuery`, `EventTypeDBExec`, `EventTypeDBTxStarted`, `EventTypeDBTxCommitted`, `EventTypeDBTxRolledBack`.
  • Work Item Events: `EventTypeWorkItemCreated`, `EventTypeWorkItemUpdated`, `EventTypeWorkItemDeleted`, `EventTypeWorkItemAssigned`, `EventTypeWorkItemUnassigned`, `EventTypeCustomFieldSet`.
  • Comment & Attachment Events: `EventTypeCommentAdded`, `EventTypeCommentDeleted`, `EventTypeAttachmentAdded`, `EventTypeAttachmentRemoved`.
  • User Events: `EventTypeUserCreated`, `EventTypeUserUpdated`, `EventTypeUserDeleted`, `EventTypeUserLoggedIn`, `EventTypeUserLoggedOut`.
  • Team Events: `EventTypeTeamCreated`, `EventTypeTeamUpdated`, `EventTypeTeamDeleted`, `EventTypeTeamMemberAdded`, `EventTypeTeamMemberRemoved`.

Generic Typed Events: The `EventT[T any]` interface and `BasicEventT[T any]` struct provide support for creating events with a specific, strongly-typed payload, enhancing compile-time safety and readability. For instance, `NewHTTPRequestReceivedT` creates an event with `HTTPRequestPayload`.

Persistence and Observability:

  1. Logging Integration: The `Logger` struct provides a high-level API for publishing various log-level events (Info, Warning, Debug, Error, Fatal, AssertionFailed) to the `Bus`. `SetupLogging` configures persistent storage for audit events: - File-based logging using `lumberjack` for log rotation, compression, and retention. - Database persistence to a SQL database (e.g., PostgreSQL, MySQL, SQLite) with batch insertion and retry logic. `SetupDatabase` initializes the necessary `audit` table and indexes. The `LogOption` functional options allow detailed configuration of file and database handlers.
  1. Schema Validation: `EventSchema` allows defining expected fields and their types for an event's payload. `RegisterSchema` is used to register these schemas, and `validatePayload` ensures that published events conform to their registered schema, catching data inconsistencies early. Predefined event types have their schemas registered during package initialization.

Security and Data Integrity:

  1. Access Control for History: The `AccessControlFunc` type defines a function that can be used to enforce permissions before allowing access to the event history. It can be configured using `WithAccessControl` in `BusConfig`. `CheckHistoryAccess` provides a default role-based check (`"admin"`) if no custom function is provided.
  1. Payload Sanitization: The `SanitizePayload` function automatically redacts sensitive information (e.g., "email", "password") from event payloads before they are stored or processed further. Custom `Sanitizer` functions can be defined and applied.
  1. Event Encryption: `EncryptEvent` provides a mechanism to encrypt event payloads using AES-256 GCM, ensuring that sensitive data is protected at rest or in transit. `GenerateAESKey` can be used to generate strong encryption keys.

Configuration:

The `Bus` is initialized using `NewBus` with variadic `BusOption` functions. These options cover various aspects such as history capacity (`WithHistoryCap`), async buffer size (`WithBufferSize`), worker count (`WithWorkerCount`), asynchronous delivery (`WithAsync`), sampling rate (`WithSampleRate`), spillover directory (`WithSpilloverDir`), maximum memory usage (`WithMaxMemoryMB`), circuit breaker parameters (`WithCircuitBreaker`), metrics implementation (`WithMetrics`, `WithMetricsRegisterer`), external transport (`WithTransport`), access control for history (`WithAccessControl`), and rate limiting (`WithRateLimit`).

Configuration can also be loaded from environment variables using `LoadConfigFromEnv()`.

Usage Patterns:

  1. Initializing the Audit Bus: bus, err := audit.NewBus( audit.WithHistoryCap(1000), audit.WithAsync(true), audit.WithBufferSize(500), audit.WithSpilloverDir("/var/log/audit"), audit.WithMaxMemoryMB(50), audit.WithMetricsRegisterer(prometheus.DefaultRegisterer), ) if err != nil { log.Fatalf("Failed to create audit bus: %v", err) } defer bus.Close()
  1. Subscribing a Handler: bus.Subscribe(audit.EventTypeAuthLogin, func(evt audit.Event) error { fmt.Printf("User %s logged in at %s\n", evt.Payload().(map[string]interface{})["user_id"], evt.Time()) return nil })
  1. Publishing an Event: ctx := context.Background() // Assuming a user ID and source are available loginEvent := audit.NewAuthLogin(ctx, "my-service", "user123") bus.Publish(loginEvent)
  1. Setting up Persistent Logging: db, err := sql.Open("sqlite3", "./audit.db") // Example using SQLite if err != nil { log.Fatalf("Failed to open database: %v", err) } if err := audit.SetupDatabase(db); err != nil { log.Fatalf("Failed to setup audit database: %v", err) } closers, err := audit.SetupLogging(bus, db, audit.WithFilePath("/var/log/app_audit.log"), audit.WithDBBatchSize(50), ) if err != nil { log.Fatalf("Failed to setup logging: %v", err) } for _, closer := range closers { defer closer() }
  1. Using the Logger API: logger := audit.NewLogger(bus) logger.Info(ctx, "my-service", "Application started", map[string]string{"version": "1.0.0"}) logger.Error(ctx, "my-service", "Database connection failed", fmt.Errorf("connection refused"), nil)

Concurrency:

The `Bus` and its components are designed to be safe for concurrent use. Internal synchronization primitives (mutexes, atomic operations, channels, and wait groups) are used to protect shared data and manage concurrent access. Asynchronous event processing is handled by a worker pool.

Error Handling:

Errors during event processing by handlers or internal bus operations are typically reported via the `ErrorFunc` configured in `BusConfig`, which defaults to logging the error. For methods like `PublishWithTimeout`, specific errors are returned to the caller.

This package aims to provide a comprehensive solution for managing audit events within Go applications, offering flexibility, performance, and resilience for critical operational insights.

Index

Constants

This section is empty.

Variables

View Source
var ErrPublishTimeout = fmt.Errorf("audit bus: publish timeout")

Functions

func CheckHistoryAccess

func CheckHistoryAccess(ctx context.Context) error

CheckHistoryAccess verifies access to history data.

func ContextIDFrom

func ContextIDFrom(ctx context.Context) string

ContextIDFrom retrieves the correlation ID from the context, returning an empty string if not set.

func GenerateAESKey

func GenerateAESKey() ([]byte, error)

GenerateAESKey generates a 32-byte AES key for encryption.

func RegisterSchema

func RegisterSchema(et EventType, s EventSchema)

RegisterSchema registers a schema for an event type.

func SetupDatabase

func SetupDatabase(db *sql.DB) error

SetupDatabase initializes the audit table and indexes in the database.

func SetupLogging

func SetupLogging(bus *Bus, db *sql.DB, opts ...LogOption) ([]func() error, error)

SetupLogging configures file and database persistence for audit events.

func WithContextID

func WithContextID(ctx context.Context, id string) context.Context

WithContextID attaches a correlation ID to the context for event correlation.

Types

type AccessControlFunc

type AccessControlFunc func(ctx context.Context) error

AccessControlFunc defines a function to check history access permissions.

type AuthRegisterPayload

type AuthRegisterPayload struct {
	UserID string
	Email  string
}

AuthRegisterPayload is the payload for user registration events.

type BasicEvent

type BasicEvent struct {
	IDVal        string
	TypeVal      EventType
	TimeVal      time.Time
	SourceVal    string
	ContextIDVal string
	PayloadVal   interface{}
	SpanCtx      trace.SpanContext
}

BasicEvent is a simple implementation of Event.

func NewBasicEvent

func NewBasicEvent(t EventType, source, contextID string, payload interface{}, spanCtx trace.SpanContext) BasicEvent

func (BasicEvent) ContextID

func (e BasicEvent) ContextID() string

func (BasicEvent) ID

func (e BasicEvent) ID() string

func (BasicEvent) Payload

func (e BasicEvent) Payload() interface{}

func (BasicEvent) Source

func (e BasicEvent) Source() string

func (BasicEvent) SpanContext

func (e BasicEvent) SpanContext() trace.SpanContext

func (BasicEvent) Time

func (e BasicEvent) Time() time.Time

func (BasicEvent) Type

func (e BasicEvent) Type() EventType

type BasicEventT

type BasicEventT[T any] struct {
	BasicEvent
	// contains filtered or unexported fields
}

BasicEventT is a generic implementation of EventT.

func NewAuthRegisterT

func NewAuthRegisterT(ctx context.Context, source, userID, email string) BasicEventT[AuthRegisterPayload]

NewAuthRegisterT creates a typed Event for user registration.

func NewHTTPRequestReceivedT

func NewHTTPRequestReceivedT(ctx context.Context, source, method, path string) BasicEventT[HTTPRequestPayload]

NewHTTPRequestReceivedT creates a typed Event for an incoming HTTP request.

func NewHTTPResponseSentT

func NewHTTPResponseSentT(ctx context.Context, source string, status int, duration time.Duration) BasicEventT[HTTPResponsePayload]

NewHTTPResponseSentT creates a typed Event for an HTTP response sent.

func (BasicEventT[T]) TypedPayload

func (e BasicEventT[T]) TypedPayload() T

TypedPayload returns the typed payload.

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is an in-memory publish/subscribe bus for audit events.

func DefaultBus

func DefaultBus() *Bus

func NewBus

func NewBus(opts ...BusOption) (*Bus, error)

func (*Bus) Close

func (b *Bus) Close()

func (*Bus) History

func (b *Bus) History(ctx context.Context) ([]Event, error)

func (*Bus) Publish

func (b *Bus) Publish(evt Event)

func (*Bus) PublishSync

func (b *Bus) PublishSync(evt Event)

func (*Bus) PublishWithTimeout

func (b *Bus) PublishWithTimeout(evt Event, timeout time.Duration) error

func (*Bus) RecoverSpillover

func (b *Bus) RecoverSpillover() error

func (*Bus) SetHistoryCap

func (b *Bus) SetHistoryCap(n int)

func (*Bus) SetSampleRate

func (b *Bus) SetSampleRate(rate float64)

func (*Bus) Subscribe

func (b *Bus) Subscribe(et EventType, h Handler)

type BusConfig

type BusConfig struct {
	HistoryCap      int
	BufferSize      int
	WorkerCount     int
	Async           bool
	SampleRate      float64
	SpilloverDir    string
	MaxMemoryMB     int
	CircuitTimeout  time.Duration
	CircuitMaxFails int
	RateLimit       int
	RateBurst       int
	ErrorFunc       func(error, Event)
	Metrics         BusMetrics
	Transport       Transport
	AccessControl   AccessControlFunc
}

BusConfig holds parameters to configure a Bus.

func DefaultBusConfig

func DefaultBusConfig() BusConfig

type BusMetrics

type BusMetrics interface {
	EventPublished(et EventType)
	EventDropped(et EventType)
	HandlerLatency(et EventType, d time.Duration)
}

BusMetrics defines the interface for audit bus metrics.

type BusOption

type BusOption func(*BusConfig)

BusOption defines a functional option for configuring Bus.

func LoadConfigFromEnv

func LoadConfigFromEnv() []BusOption

LoadConfigFromEnv loads configuration from environment variables.

func WithAccessControl

func WithAccessControl(f AccessControlFunc) BusOption

WithAccessControl sets the access control function for history access.

func WithAsync

func WithAsync(async bool) BusOption

WithAsync enables or disables async delivery.

func WithBufferSize

func WithBufferSize(n int) BusOption

WithBufferSize sets the async queue size.

func WithCircuitBreaker

func WithCircuitBreaker(timeout time.Duration, maxFails int) BusOption

WithCircuitBreaker sets circuit breaker parameters.

func WithHistoryCap

func WithHistoryCap(n int) BusOption

WithHistoryCap sets the history buffer size.

func WithMaxMemoryMB

func WithMaxMemoryMB(mb int) BusOption

WithMaxMemoryMB sets the memory limit in MB.

func WithMetrics

func WithMetrics(metrics BusMetrics) BusOption

WithMetrics sets the metrics implementation.

func WithMetricsRegisterer

func WithMetricsRegisterer(registerer prometheus.Registerer) BusOption

WithMetricsRegisterer sets the Prometheus registerer for metrics.

func WithRateLimit

func WithRateLimit(rate, burst int) BusOption

WithRateLimit sets the rate limit for event publishing (events per second) and burst size.

func WithSampleRate

func WithSampleRate(rate float64) BusOption

WithSampleRate sets the sampling rate.

func WithSpilloverDir

func WithSpilloverDir(dir string) BusOption

WithSpilloverDir sets the spillover directory.

func WithTransport

func WithTransport(transport Transport) BusOption

WithTransport sets the external transport.

func WithWorkerCount

func WithWorkerCount(n int) BusOption

WithWorkerCount sets the number of workers.

type ContextIDKey

type ContextIDKey struct{}

ContextIDKey is used to store correlation IDs in context.

type Event

type Event interface {
	ID() string
	Type() EventType
	Time() time.Time
	Source() string
	ContextID() string
	Payload() interface{}
	SpanContext() trace.SpanContext
}

Event represents an occurrence to be audited.

func EncryptEvent

func EncryptEvent(evt Event, key []byte) (Event, error)

EncryptEvent encrypts an event's payload using AES.

func NewAssertionFailed

func NewAssertionFailed(ctx context.Context, source, message, detail string) Event

NewAssertionFailed creates an assertion-failed logging event.

func NewAttachmentAdded

func NewAttachmentAdded(ctx context.Context, source, attachmentID, itemID, filename string) Event

NewAttachmentAdded creates an Event when an attachment is added.

func NewAttachmentRemoved

func NewAttachmentRemoved(ctx context.Context, source, attachmentID, itemID string) Event

NewAttachmentRemoved creates an Event when an attachment is removed.

func NewAuthCredentialsChecked

func NewAuthCredentialsChecked(ctx context.Context, source, email string, success bool) Event

NewAuthCredentialsChecked creates an Event after credential verification.

func NewAuthLogin

func NewAuthLogin(ctx context.Context, source, userID string) Event

NewAuthLogin creates an Event for a successful login.

func NewAuthLogout

func NewAuthLogout(ctx context.Context, source, userID string) Event

NewAuthLogout creates an Event for user logout.

func NewAuthRegister

func NewAuthRegister(ctx context.Context, source, userID, email string) Event

NewAuthRegister creates an Event for user registration.

func NewAuthTokenIssued

func NewAuthTokenIssued(ctx context.Context, source, userID string, expiresIn time.Duration) Event

NewAuthTokenIssued creates an Event when a token is issued.

func NewAuthTokenRevoked

func NewAuthTokenRevoked(ctx context.Context, source, tokenID string) Event

NewAuthTokenRevoked creates an Event when a token is revoked.

func NewCommentAdded

func NewCommentAdded(ctx context.Context, source, commentID, itemID, content string) Event

NewCommentAdded creates an Event for a new comment.

func NewCommentDeleted

func NewCommentDeleted(ctx context.Context, source, commentID, itemID string) Event

NewCommentDeleted creates an Event when a comment is deleted.

func NewCustomFieldSet

func NewCustomFieldSet(ctx context.Context, source, itemID, field string, value interface{}) Event

NewCustomFieldSet creates an Event when a custom field is set.

func NewDBConnected

func NewDBConnected(ctx context.Context, source, driver, dsn string) Event

NewDBConnected creates an Event for successful DB connection.

func NewDBError

func NewDBError(ctx context.Context, source, query string, err error) Event

NewDBError creates an Event on database error.

func NewDBExec

func NewDBExec(ctx context.Context, source, stmt string, rowsAffected int64) Event

NewDBExec creates an Event after executing a statement.

func NewDBInit

func NewDBInit(ctx context.Context, source, schema string) Event

NewDBInit creates an Event when DB schema initialization completes.

func NewDBQuery

func NewDBQuery(ctx context.Context, source, query string, duration time.Duration) Event

NewDBQuery creates an Event after executing a query.

func NewDBTxCommitted

func NewDBTxCommitted(ctx context.Context, source, txID string) Event

NewDBTxCommitted creates an Event when a transaction commits.

func NewDBTxRolledBack

func NewDBTxRolledBack(ctx context.Context, source, txID, reason string) Event

NewDBTxRolledBack creates an Event when a transaction rolls back.

func NewDBTxStarted

func NewDBTxStarted(ctx context.Context, source, txID string) Event

NewDBTxStarted creates an Event when a transaction begins.

func NewDebug

func NewDebug(ctx context.Context, source, message string, fields map[string]string) Event

NewDebug creates a debug-level logging event.

func NewError

func NewError(ctx context.Context, source, message string, err error, fields map[string]string) Event

NewError creates an error-level logging event.

func NewFatal

func NewFatal(ctx context.Context, source, message string, fields map[string]string) Event

NewFatal creates a fatal-level logging event.

func NewHTTPMethodNotAllowed

func NewHTTPMethodNotAllowed(ctx context.Context, source, method string) Event

NewHTTPMethodNotAllowed creates an Event when the HTTP method is not allowed.

func NewHTTPRequestReceived

func NewHTTPRequestReceived(ctx context.Context, source, method, path string) Event

NewHTTPRequestReceived creates an Event for an incoming HTTP request.

func NewHTTPResponseSent

func NewHTTPResponseSent(ctx context.Context, source string, status int, duration time.Duration) Event

NewHTTPResponseSent creates an Event when an HTTP response is sent.

func NewHTTPRouteNotFound

func NewHTTPRouteNotFound(ctx context.Context, source, path string) Event

NewHTTPRouteNotFound creates an Event when no route matches.

func NewInfo

func NewInfo(ctx context.Context, source, message string, fields map[string]string) Event

NewInfo creates an info-level logging event.

func NewTeamCreated

func NewTeamCreated(ctx context.Context, source, teamID, name string) Event

NewTeamCreated creates an Event for a new team.

func NewTeamDeleted

func NewTeamDeleted(ctx context.Context, source, teamID string) Event

NewTeamDeleted creates an Event when a team is deleted.

func NewTeamMemberAdded

func NewTeamMemberAdded(ctx context.Context, source, teamID, userID string) Event

NewTeamMemberAdded creates an Event when a user joins a team.

func NewTeamMemberRemoved

func NewTeamMemberRemoved(ctx context.Context, source, teamID, userID string) Event

NewTeamMemberRemoved creates an Event when a user leaves a team.

func NewTeamUpdated

func NewTeamUpdated(ctx context.Context, source, teamID string, changes map[string]interface{}) Event

NewTeamUpdated creates an Event for team updates.

func NewUserCreated

func NewUserCreated(ctx context.Context, source, userID, email string) Event

NewUserCreated creates an Event for a new user.

func NewUserDeleted

func NewUserDeleted(ctx context.Context, source, userID string) Event

NewUserDeleted creates an Event when a user is deleted.

func NewUserLoggedIn

func NewUserLoggedIn(ctx context.Context, source, userID string) Event

NewUserLoggedIn creates an Event when a user logs in.

func NewUserLoggedOut

func NewUserLoggedOut(ctx context.Context, source, userID string) Event

NewUserLoggedOut creates an Event when a user logs out.

func NewUserUpdated

func NewUserUpdated(ctx context.Context, source, userID string, changes map[string]interface{}) Event

NewUserUpdated creates an Event for user updates.

func NewWarning

func NewWarning(ctx context.Context, source, message string, fields map[string]string) Event

NewWarning creates a warning-level logging event.

func NewWorkItemAssigned

func NewWorkItemAssigned(ctx context.Context, source, itemID, assigneeID string) Event

NewWorkItemAssigned creates an Event when a work item is assigned.

func NewWorkItemCreated

func NewWorkItemCreated(ctx context.Context, source, id string, metadata map[string]string) Event

NewWorkItemCreated creates an Event for a new work item.

func NewWorkItemDeleted

func NewWorkItemDeleted(ctx context.Context, source, id string) Event

NewWorkItemDeleted creates an Event for deletion of a work item.

func NewWorkItemUnassigned

func NewWorkItemUnassigned(ctx context.Context, source, itemID, assigneeID string) Event

NewWorkItemUnassigned creates an Event when a work item is unassigned.

func NewWorkItemUpdated

func NewWorkItemUpdated(ctx context.Context, source, id string, changes map[string]interface{}) Event

NewWorkItemUpdated creates an Event for a work item update.

func SanitizePayload

func SanitizePayload(evt Event) Event

SanitizePayload sanitizes sensitive fields in an event's payload.

type EventSchema

type EventSchema struct {
	RequiredFields []string
	FieldTypes     map[string]reflect.Type
}

EventSchema defines the schema for an event's payload.

type EventT

type EventT[T any] interface {
	Event
	TypedPayload() T
}

EventT is a generic Event interface with typed payloads.

type EventType

type EventType string

EventType identifies the kind of an audit event.

const (
	EventTypeHTTPRequestReceived  EventType = "http_request_received"
	EventTypeHTTPResponseSent     EventType = "http_response_sent"
	EventTypeHTTPRouteNotFound    EventType = "http_route_not_found"
	EventTypeHTTPMethodNotAllowed EventType = "http_method_not_allowed"
)

HTTP Event Types

const (
	EventTypeAuthRegister           EventType = "auth_register"
	EventTypeAuthLogin              EventType = "auth_login"
	EventTypeAuthLogout             EventType = "auth_logout"
	EventTypeAuthTokenIssued        EventType = "auth_token_issued"
	EventTypeAuthTokenRevoked       EventType = "auth_token_revoked"
	EventTypeAuthCredentialsChecked EventType = "auth_credentials_checked"
)

Authentication Event Types

const (
	EventTypeDBConnected    EventType = "db_connected"
	EventTypeDBInit         EventType = "db_init"
	EventTypeDBError        EventType = "db_error"
	EventTypeDBQuery        EventType = "db_query"
	EventTypeDBExec         EventType = "db_exec"
	EventTypeDBTxStarted    EventType = "db_tx_started"
	EventTypeDBTxCommitted  EventType = "db_tx_committed"
	EventTypeDBTxRolledBack EventType = "db_tx_rolled_back"
)

Database Event Types

const (
	EventTypeWorkItemCreated    EventType = "work_item_created"
	EventTypeWorkItemUpdated    EventType = "work_item_updated"
	EventTypeWorkItemDeleted    EventType = "work_item_deleted"
	EventTypeWorkItemAssigned   EventType = "work_item_assigned"
	EventTypeWorkItemUnassigned EventType = "work_item_unassigned"
	EventTypeCustomFieldSet     EventType = "custom_field_set"
)

Work Item Event Types

const (
	EventTypeCommentAdded      EventType = "comment_added"
	EventTypeCommentDeleted    EventType = "comment_deleted"
	EventTypeAttachmentAdded   EventType = "attachment_added"
	EventTypeAttachmentRemoved EventType = "attachment_removed"
)

Comment & Attachment Event Types

const (
	EventTypeUserCreated   EventType = "user_created"
	EventTypeUserUpdated   EventType = "user_updated"
	EventTypeUserDeleted   EventType = "user_deleted"
	EventTypeUserLoggedIn  EventType = "user_logged_in"
	EventTypeUserLoggedOut EventType = "user_logged_out"
)

User Event Types

const (
	EventTypeTeamCreated       EventType = "team_created"
	EventTypeTeamUpdated       EventType = "team_updated"
	EventTypeTeamDeleted       EventType = "team_deleted"
	EventTypeTeamMemberAdded   EventType = "team_member_added"
	EventTypeTeamMemberRemoved EventType = "team_member_removed"
)

Team Event Types

const (
	EventTypeLogInfo            EventType = "log_info"
	EventTypeLogWarning         EventType = "log_warning"
	EventTypeLogDebug           EventType = "log_debug"
	EventTypeLogError           EventType = "log_error"
	EventTypeLogFatal           EventType = "log_fatal"
	EventTypeLogAssertionFailed EventType = "log_assertion_failed"
)

Logging Event Types

const EventAny EventType = "*"

EventAny is used to subscribe to all events.

type HTTPRequestPayload

type HTTPRequestPayload struct {
	Method string
	Path   string
}

HTTPRequestPayload is the payload for HTTP request events.

type HTTPResponsePayload

type HTTPResponsePayload struct {
	Status     int
	DurationMS int64
}

HTTPResponsePayload is the payload for HTTP response events.

type Handler

type Handler func(evt Event) error

Handler processes an incoming Event.

type KafkaOption

type KafkaOption func(*KafkaTransport)

KafkaOption configures KafkaTransport.

func WithKafkaAsync

func WithKafkaAsync(async bool) KafkaOption

WithKafkaAsync enables asynchronous producing.

func WithKafkaRetries

func WithKafkaRetries(n int) KafkaOption

WithKafkaRetries sets the number of retries.

func WithKafkaRetryDelay

func WithKafkaRetryDelay(d time.Duration) KafkaOption

WithKafkaRetryDelay sets the initial retry delay.

type KafkaTransport

type KafkaTransport struct {
	// contains filtered or unexported fields
}

KafkaTransport implements Transport using Kafka.

func NewKafkaTransport

func NewKafkaTransport(brokers []string, topic string, opts ...KafkaOption) (*KafkaTransport, error)

NewKafkaTransport creates a Kafka transport.

func (*KafkaTransport) Close

func (t *KafkaTransport) Close() error

Close shuts down the transport.

func (*KafkaTransport) Send

func (t *KafkaTransport) Send(evt Event) error

Send sends an event to Kafka with retry logic.

func (*KafkaTransport) Start

func (t *KafkaTransport) Start() error

Start initializes the transport.

type LogConfig

type LogConfig struct {
	FilePath      string
	MaxSizeMB     int
	MaxBackups    int
	MaxAgeDays    int
	Compress      bool
	DBBatchSize   int
	FlushInterval time.Duration
	RetryCount    int
	RetryDelay    time.Duration
}

LogConfig holds configuration for logging handlers.

func DefaultLogConfig

func DefaultLogConfig() LogConfig

DefaultLogConfig returns a default logging configuration.

type LogOption

type LogOption func(*LogConfig)

LogOption is a functional option for configuring logging.

func WithCompress

func WithCompress(compress bool) LogOption

func WithDBBatchSize

func WithDBBatchSize(size int) LogOption

func WithFilePath

func WithFilePath(path string) LogOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) LogOption

func WithMaxAgeDays

func WithMaxAgeDays(days int) LogOption

func WithMaxBackups

func WithMaxBackups(backups int) LogOption

func WithMaxSizeMB

func WithMaxSizeMB(size int) LogOption

func WithRetryCount

func WithRetryCount(count int) LogOption

func WithRetryDelay

func WithRetryDelay(delay time.Duration) LogOption

type LogPayload

type LogPayload struct {
	Message string
	Fields  map[string]string
	Error   string // Only for error and fatal events
	Detail  string // Only for assertion failed events
}

LogPayload is the payload structure for logging events.

type Logger

type Logger struct {
	Bus     *Bus
	Metrics BusMetrics
}

func NewLogger

func NewLogger(bus *Bus) *Logger

func (*Logger) AssertEqual

func (l *Logger) AssertEqual(ctx context.Context, source, name string, got, want interface{})

func (*Logger) AssertNoError

func (l *Logger) AssertNoError(ctx context.Context, source string, err error)

func (*Logger) AssertTrue

func (l *Logger) AssertTrue(ctx context.Context, source, name string, cond bool)

func (*Logger) Debug

func (l *Logger) Debug(ctx context.Context, source, message string, fields map[string]string)

func (*Logger) Error

func (l *Logger) Error(ctx context.Context, source, message string, err error, fields map[string]string)

func (*Logger) Fatal

func (l *Logger) Fatal(ctx context.Context, source, message string, fields map[string]string)

func (*Logger) Info

func (l *Logger) Info(ctx context.Context, source, message string, fields map[string]string)

func (*Logger) Warning

func (l *Logger) Warning(ctx context.Context, source, message string, fields map[string]string)

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements BusMetrics with Prometheus.

func NewPrometheusMetrics

func NewPrometheusMetrics(registerer prometheus.Registerer) *PrometheusMetrics

NewPrometheusMetrics creates a new PrometheusMetrics instance.

func (*PrometheusMetrics) EventDropped

func (m *PrometheusMetrics) EventDropped(et EventType)

EventDropped increments the dropped counter.

func (*PrometheusMetrics) EventPublished

func (m *PrometheusMetrics) EventPublished(et EventType)

EventPublished increments the published counter.

func (*PrometheusMetrics) HandlerLatency

func (m *PrometheusMetrics) HandlerLatency(et EventType, d time.Duration)

HandlerLatency records the handler latency.

type Sanitizer

type Sanitizer func(key string, value interface{}) interface{}

Sanitizer defines a function to sanitize sensitive data.

type Transport

type Transport interface {
	Start() error
	Send(evt Event) error
	Close() error
}

Transport defines an interface for external event transport.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL