gopipe

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: MIT Imports: 14 Imported by: 0

README

gopipe

CI Go Report Card GoDoc Go Version

A lightweight, generic Go library for building composable data pipelines, message routers, and pub/sub systems.

gopipe provides three core packages for building robust, concurrent applications:

  • gopipe: Composable pipe orchestration with concurrency, batching, and error handling
  • channel: Channel utilities (Merge, Filter, Transform, GroupBy, etc.)
  • message: Message handling with pub/sub, routing, CQRS, and CloudEvents support

Core Components

gopipe - Pipe Orchestration

The foundation is simple pipe composition:

type Pipe[In, Out any] func(ctx context.Context, in <-chan In) <-chan Out

Build robust pipelines with:

  • Concurrency & Batching: Parallel processing and batching
  • Context-awareness: Native cancellation and timeout support
  • Error Handling: Retry, recover, and error propagation
  • Middleware: Custom logging and metrics
  • Zero Dependencies: 100% Go standard library
Channel Package

Utilities for channel operations:

  • Transform: Filter, Transform, Process
  • Routing: Route, Merge, Broadcast
  • Batching: Collect, Buffer, GroupBy
  • Lifecycle: Sink, Drain, Cancel

See examples/ for usage patterns.

Message Package

Complete message handling system:

  • Pub/Sub: Publisher/Subscriber with multiple broker implementations
  • Routing: Attribute-based message dispatch with handlers
  • CQRS: Type-safe command and event handlers
  • CloudEvents: CloudEvents v1.0.2 HTTP Protocol Binding support
  • Middleware: Correlation IDs, logging, and custom middleware

Broker Implementations:

  • broker.NewChannelBroker() - In-memory for testing
  • broker.NewHTTPSender/Receiver() - HTTP webhooks with CloudEvents
  • broker.NewIOBroker() - Debug/logging broker (JSONL)

Advanced Features:

Why gopipe?

Manual channel wiring is error-prone and doesn't scale. gopipe provides:

  • Type-safe pipelines: Generic API works with any data type
  • Battle-tested patterns: Pub/sub, CQRS, event sourcing
  • Production-ready: Comprehensive testing and CloudEvents compliance
  • Zero dependencies: Pure Go, no external dependencies

Full Feature List of Pipe Options

  • WithConcurrency: Optional concurrency for parallel processing.
  • WithCancel: Optional cancellation logic.
  • WithBuffer: Optional buffered output channel.
  • WithTimeout: Optional processing timeout via context.
  • WithoutContextPropagation: Opt-out for propagating the parent context to the processing context to prevent cancellation.
  • WithLogConfig: Customizable logging - defaults to success (debug), cancel (warn) and failure (error) with log/slog.
  • WithMetricsCollector: Optional processing metrics can be retrieved and evaluated individually.
  • WithMetadataProvider: Optional metadata enrichment for log messages and metrics based on input values.
  • WithMiddleware: Optional support for custom middleware.
  • WithRecover: Optional recovery on panics.
  • WithRetryConfig: Optional retry on failure with custom configuration.

Installation

go get github.com/fxsml/gopipe

Getting Started

The main concepts are:

  • Processor: Implements the logic for processing and cancellation.
  • Pipe: Responsible for configuration and orchestration of a pipeline running a specific Processor.

For simple channel operations, see the channel package.
For robust orchestration, use gopipe.

Usage

Basic Channel Operations: Filter, Transform, Buffer, Sink
package main

import (
	"fmt"

	"github.com/fxsml/gopipe/channel"
)

func main() {
	// Create an input channel
	in := channel.FromRange(10)

	// Filter even numbers only
	filtered := channel.Filter(in, func(i int) bool {
		return i%2 == 0
	})

	// Transform values (int -> string)
	transformed := channel.Transform(filtered, func(i int) string {
		return fmt.Sprintf("Value: %d", i)
	})

	// Add buffering
	buffered := channel.Buffer(transformed, 10)

	// Consume values and wait for completion
	<-channel.Sink(buffered, func(s string) {
		fmt.Println(s)
	})
}
Advanced Channel Operations: Merge, Flatten, Process, Route
package main

import (
	"context"
	"fmt"

	"github.com/fxsml/gopipe/channel"
)

type Article struct {
	ID   string
	Name string
	Shop string
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Create input with two single articles
	ch1 := channel.FromSlice([]Article{
		{ID: "CH1.1", Name: "Laptop"},
		{ID: "CH1.2", Name: "Phone"},
	})

	// Create input with one slice of articles
	ch2 := channel.FromValues([]Article{
		{ID: "CH3.1", Name: "Tablet"},
		{ID: "CH3.2", Name: "Watch"},
		{ID: "CH3.3", Name: "Sensor"},
	})

	// Merge article channels and flatten slices from ch2
	articlesCh := channel.Merge(ch1, channel.Flatten(ch2))

	// Create a list of shops
	shops := []string{"ShopA", "ShopB"}

	// Add cancellation handling before further processing
	// to stop processing on context cancellation
	articlesCh = channel.Cancel(ctx, articlesCh, func(a Article, err error) {
		fmt.Printf("Processing article %s canceled: %v\n", a.ID, err)
	})

	// Expand articles to multiple shops
	articlesCh = channel.Process(articlesCh, func(a Article) []Article {
		articles := make([]Article, len(shops))
		for i, shop := range shops {
			articles[i] = Article{
				ID:   a.ID,
				Name: a.Name,
				Shop: shop,
			}
		}
		return articles
	})

	// Route shop articles based on shop name
	routed := channel.Route(articlesCh, func(a Article) int {
		switch a.Shop {
		case "ShopA":
			return 0
		case "ShopB":
			return 1
		default:
			return -1
		}
	}, len(shops))

	// Create sinks for each shop
	doneChans := make([]<-chan struct{}, len(shops))
	for i, r := range routed {
		doneChans[i] = channel.Sink(r, func(a Article) {
			fmt.Printf("%s: %s (%s)\n", a.Shop, a.Name, a.ID)
		})
	}

	// Wait for all sinks to complete
	<-channel.Merge(doneChans...)
}
Transform with Pipe
package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	"github.com/fxsml/gopipe"
	"github.com/fxsml/gopipe/channel"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create input channel with string representations of integers
	in := channel.Transform(channel.FromRange(20), func(i int) string {
		return strconv.Itoa(i)
	})

	// Create a transform pipe that converts strings to integers
	pipe := gopipe.NewTransformPipe(
		func(ctx context.Context, val string) (int, error) {
			time.Sleep(100 * time.Millisecond)
			return strconv.Atoi(val)
		},
		gopipe.WithConcurrency[string, int](5), // 5 workers
		gopipe.WithBuffer[string, int](10),     // Buffer up to 10 results
		gopipe.WithRecover[string, int](),      // Recover from panics
	)

	// Start the pipe
	processed := pipe.Start(ctx, in)

	// Consume processed values
	<-channel.Sink(processed, func(val int) {
		fmt.Printf("Processed: %d\n", val)
	})
}
Batch with Pipe
package main

import (
	"context"
	"fmt"
	"strings"
	"time"

	"github.com/fxsml/gopipe"
	"github.com/fxsml/gopipe/channel"
)

// User is a simple user struct for demonstration
type User struct {
	ID   int
	Name string
}

// UserResponse encapsulates the result of a user creation attempt
type UserResponse struct {
	User User
	Err  error
}

// NewCreateUserHandler simulates creating users in batches (e.g. database inserts).
func NewCreateUserHandler() func(context.Context, []string) ([]UserResponse, error) {
	currentID := 1000
	return func(ctx context.Context, names []string) ([]UserResponse, error) {
		// Simulate an error causing the whole batch to fail
		if currentID%3 == 0 {
			defer func() {
				currentID++
			}()
			return nil, fmt.Errorf("create user id '%d'", currentID)
		}

		users := make([]UserResponse, 0, len(names))
		for _, name := range names {
			currentID++
			// Simulate an error for individual name
			if strings.ContainsAny(name, "!@#$%^&*()+=[]{}|\\;:'\",.<>/?`~") {
				users = append(users, UserResponse{Err: fmt.Errorf("invalid name: %q", name)})
				continue
			}
			u := User{Name: name, ID: currentID}
			users = append(users, UserResponse{User: u})
		}
		return users, nil
	}
}

func main() {
	// Create an input channel
	in := make(chan string, 10)

	// Start a goroutine to send new user names - for simplicity just runes
	go func() {
		defer func() {
			close(in)
		}()
		for _, c := range "a+bcdefgh!ijkl?mn@op>qrs#tuvwxyz" {
			in <- string(c)
		}
	}()

	// Create a pipe
	pipe := gopipe.NewBatchPipe(
		NewCreateUserHandler(),
		5,                   // Max batch size
		10*time.Millisecond, // Max batch duration
		gopipe.WithBuffer[[]string, UserResponse](10), // Buffer up to 10 results
	)

	// Create new users in batches
	userResponses := pipe.Start(context.Background(), in)

	// Consume responses
	<-channel.Sink(userResponses, func(userResponse UserResponse) {
		if userResponse.Err != nil {
			fmt.Printf("Failed to create new user: %v\n", userResponse.Err)
			return
		}
		fmt.Printf("Created new user: %v\n", userResponse.User)
	})
}
Message Pub/Sub with CloudEvents
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/fxsml/gopipe/channel"
	"github.com/fxsml/gopipe/message"
	"github.com/fxsml/gopipe/message/broker"
)

func main() {
	ctx := context.Background()

	// Create in-memory broker
	brk := broker.NewChannelBroker()

	// Setup publisher with batching
	pub := message.NewPublisher(brk, message.PublisherConfig{
		MaxBatchSize: 100,
		MaxDuration:  time.Second,
	})

	// Setup subscriber
	sub := message.NewSubscriber(brk, message.SubscriberConfig{
		PollInterval: 100 * time.Millisecond,
	})

	// Subscribe to topic
	msgs := sub.Subscribe(ctx, "orders")

	// Publish messages
	pub.Publish(ctx, []*message.Message{
		{
			Data: []byte(`{"orderId": "123", "amount": 100}`),
			Attributes: message.Attributes{
				message.AttrID:      "evt-1",
				message.AttrType:    "order.created",
				message.AttrSubject: "orders",
			},
		},
	})

	// Consume messages
	timeout := time.After(2 * time.Second)
	select {
	case msg := <-msgs:
		fmt.Printf("Received: %s (type: %s)\n",
			string(msg.Data),
			msg.Attributes[message.AttrType])
	case <-timeout:
		fmt.Println("No messages received")
	}
}

See docs/features/03-message-pubsub.md for more pub/sub patterns.

Message Acknowledgment for Reliable Processing
package main

import (
	"context"
	"fmt"
	"strings"
	"time"

	"github.com/fxsml/gopipe"
	"github.com/fxsml/gopipe/channel"
	"github.com/fxsml/gopipe/message"
)

func main() {
	ctx := context.Background()

	// Simulate message broker integration with ack/nack callbacks
	ack := func() { fmt.Println("✓ Message acknowledged") }
	nack := func(err error) { fmt.Printf("✗ Message rejected: %v\n", err) }

	// Create messages using the new functional options API
	in := channel.FromValues(
		message.New(12,
			message.WithContext[int](ctx),
			message.WithAcking[int](ack, nack),
			message.WithID[int]("msg-001"),
			message.WithProperty[int]("source", "orders-queue"),
		),
		message.New(42,
			message.WithContext[int](ctx),
			message.WithAcking[int](ack, nack),
			message.WithID[int]("msg-002"),
			message.WithProperty[int]("source", "orders-queue"),
		),
	)

	// Create pipe with acknowledgment
	pipe := gopipe.NewTransformPipe(
		func(ctx context.Context, msg *message.Message[int]) (*message.Message[int], error) {
			defer msg.Properties().Set("processed_at", time.Now().Format(time.RFC3339))

			// Simulate processing error
			p := msg.Payload()
			if p == 12 {
				err := fmt.Errorf("cannot process payload 12")
				msg.Nack(err)
				return nil, err
			}

			// On success
			res := p * 2
			msg.Ack()
			return message.Copy(msg, res), nil
		},
	)

	// Process message
	results := pipe.Start(ctx, in)

	// Consume results
	<-channel.Sink(results, func(result *message.Message[int]) {
		var sb strings.Builder
		result.Properties().Range(func(key string, value any) bool {
			sb.WriteString(fmt.Sprintf("  %s: %v\n", key, value))
			return true
		})
		fmt.Printf("Payload: %d\nProperties:\n%s", result.Payload(), sb.String())
	})
}

Documentation

For Contributors
  • CONTRIBUTING.md - Human contributor guide
    • Development workflow
    • Documentation requirements
    • Testing guidelines
    • Code style standards
    • Pre-commit checklist
For AI Assistants
  • CLAUDE.md - AI assistant procedures
    • Feature branch documentation
    • Systematic integration procedures
    • Documentation requirements
    • Git workflow and best practices
Architecture & Features
  • docs/features/ - Feature documentation

    • Feature integration guide
    • 8 detailed feature documents
    • Implementation examples
    • Usage patterns
  • docs/adr/ - Architecture Decision Records

    • ADR organization by status
    • 18 architectural decisions
    • Reading order guide
    • Feature mapping
Changelog
  • CHANGELOG.md - Project changelog
    • All features from feat/pubsub branch
    • Breaking changes
    • Migration guide
    • Integration order

License

[License information to be added]

Contributing

Contributions welcome! See CONTRIBUTING.md for guidelines.

For questions or issues, please open an issue on GitHub.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFailure indicates a processing failure.
	ErrFailure = errors.New("gopipe: processing failed")
	// ErrCancel indicates that processing was canceled.
	ErrCancel = errors.New("gopipe: processing canceled")
)
View Source
var (
	// ErrRetry is the base error for retry operations
	ErrRetry = errors.New("gopipe retry")

	// ErrRetryMaxAttempts is returned when all retry attempts fail
	ErrRetryMaxAttempts = fmt.Errorf("%w: max attempts reached", ErrRetry)

	// ErrRetryTimeout is returned when the overall retry operation times out
	ErrRetryTimeout = fmt.Errorf("%w: timeout reached", ErrRetry)

	// ErrRetryNotRetryable is returned when an error is not retryable
	ErrRetryNotRetryable = fmt.Errorf("%w: not retryable", ErrRetry)
)

Functions

func SetDefaultLogConfig added in v0.7.0

func SetDefaultLogConfig(config LogConfig)

SetDefaultLogConfig sets the default logger configuration for all pipes. May be overridden per-pipe using WithLoggerConfig.

func SetDefaultLogger added in v0.7.0

func SetDefaultLogger(l Logger)

SetDefaultLogger sets the default logger for all pipes. slog.Default() is used by default.

func StartProcessor added in v0.6.0

func StartProcessor[In, Out any](
	ctx context.Context,
	in <-chan In,
	proc Processor[In, Out],
	opts ...Option[In, Out],
) <-chan Out

StartProcessor processes items from the input channel using the provided processor and returns a channel that will receive the processed outputs.

Processing will continue until the input channel is closed or the context is canceled. The output channel is closed when processing is complete. Behavior can be customized with options.

Types

type BackoffFunc added in v0.8.0

type BackoffFunc func(attempt int) time.Duration

BackoffFunc returns the wait duration for a retry attempt. The attempt parameter is one-based (1 for first retry, 2 for second, etc.).

func ConstantBackoff added in v0.8.0

func ConstantBackoff(
	delay time.Duration,
	jitter float64,
) BackoffFunc

ConstantBackoff creates a backoff function that returns a constant duration with optional jitter. The delay parameter specifies the base wait time for all retry attempts. The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation. Jitter helps prevent thundering herd problems in distributed systems.

func ExponentialBackoff added in v0.8.0

func ExponentialBackoff(
	initialDelay time.Duration,
	factor float64,
	maxDelay time.Duration,
	jitter float64,
) BackoffFunc

ExponentialBackoff creates a backoff function with exponential backoff and jitter. Each retry attempt uses baseDelay * factor^(attempt-1) with random jitter applied. The factor parameter controls growth rate (e.g., 2.0 doubles delay each attempt). The maxDelay parameter caps the maximum backoff duration (0 = no limit). The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation. Jitter is applied after the exponential calculation and max delay capping.

type CancelFunc added in v0.3.0

type CancelFunc[In any] func(In, error)

CancelFunc is the function used by Processor.Cancel.

type FanIn added in v0.9.0

type FanIn[T any] struct {
	// contains filtered or unexported fields
}

FanIn merges multiple input channels into a single output channel. It safely handles concurrent Add() calls and provides graceful shutdown.

func NewFanIn added in v0.9.0

func NewFanIn[T any](config FanInConfig) *FanIn[T]

NewFanIn creates a new FanIn instance. Add input channels with Add(), then call Start() exactly once.

func (*FanIn[T]) Add added in v0.9.0

func (f *FanIn[T]) Add(ch <-chan T) (<-chan struct{}, error)

Add registers an input channel to be merged into the output. Safe to call concurrently. Returns a done channel that closes when all messages from the input channel have been processed, and an error if FanIn is already closed.

func (*FanIn[T]) Start added in v0.9.0

func (f *FanIn[T]) Start(ctx context.Context) <-chan T

Start begins merging input channels and returns the output channel. The output channel closes when the context is cancelled and all input channels have been drained (up to ShutdownDuration). Must be called exactly once. Panics if called multiple times.

type FanInConfig added in v0.9.0

type FanInConfig struct {
	// Buffer size for the output channel
	Buffer int
	// ShutdownTimeout is the max time to wait for input channels to drain.
	// If 0, waits indefinitely for clean shutdown.
	ShutdownTimeout time.Duration
}

FanInConfig configures FanIn behavior.

type Generator added in v0.8.0

type Generator[Out any] interface {
	// Generate returns a channel that emits generated values until context cancellation or error.
	Generate(ctx context.Context) <-chan Out
}

Generator produces a stream of values using a provided function.

func NewGenerator added in v0.8.0

func NewGenerator[Out any](
	handle func(context.Context) ([]Out, error),
	opts ...Option[struct{}, Out],
) Generator[Out]

NewGenerator creates a Generator that produces values using the provided handle function. The handle function is called repeatedly until context cancellation.

type LogConfig added in v0.7.0

type LogConfig struct {
	// Args are additional arguments to include in all log messages.
	Args []any

	// LevelSuccess is the log level used for successful processing.
	// Defaults to LogLevelDebug.
	LevelSuccess LogLevel
	// LevelCancel is the log level used when processing is canceled.
	// Defaults to LogLevelWarn.
	LevelCancel LogLevel
	// LevelFailure is the log level used when processing fails.
	// Defaults to LogLevelError.
	LevelFailure LogLevel
	// LevelRetry is the log level used when a retry is attempted.
	// Defaults to LogLevelWarn.
	LevelRetry LogLevel

	// MessageSuccess is the message logged on successful processing.
	// Defaults to "GOPIPE: Success".
	MessageSuccess string
	// MessageCancel is the message logged when processing is canceled.
	// Defaults to "GOPIPE: Cancel".
	MessageCancel string
	// MessageFailure is the message logged when processing fails.
	// Defaults to "GOPIPE: Failure".
	MessageFailure string
	// MessageRetry is the message logged when a retry is attempted.
	// Defaults to "GOPIPE: Retry".
	MessageRetry string

	// Disabled disables all logging when set to true.
	Disabled bool
}

LogConfig holds configuration for the logger middleware. All fields can be customized individually. Defaults from the global defaultLoggerConfig are used for any fields not set.

type LogLevel added in v0.7.0

type LogLevel string

LogLevel represents the severity level for logging messages.

const (
	// LogLevelDebug is used for detailed information.
	LogLevelDebug LogLevel = "debug"
	// LogLevelInfo is used for general information messages.
	LogLevelInfo LogLevel = "info"
	// LogLevelWarn is used for warning conditions.
	LogLevelWarn LogLevel = "warn"
	// LogLevelError is used for error conditions.
	LogLevelError LogLevel = "error"
)

type Logger added in v0.7.0

type Logger interface {
	// Debug logs a message at debug level.
	Debug(msg string, args ...any)
	// Info logs a message at info level.
	Info(msg string, args ...any)
	// Warn logs a message at warning level.
	Warn(msg string, args ...any)
	// Error logs a message at error level.
	Error(msg string, args ...any)
}

Logger defines an interface for logging at different severity levels.

type Metadata added in v0.7.0

type Metadata map[string]any

Metadata is a key-value store for additional information about pipeline items.

func MetadataFromContext added in v0.7.0

func MetadataFromContext(ctx context.Context) Metadata

MetadataFromContext extracts metadata from a context. Returns nil if no metadata is present.

func MetadataFromError added in v0.7.0

func MetadataFromError(err error) Metadata

MetadataFromError extracts metadata from an error. Returns nil if no metadata is present.

type MetadataProvider added in v0.7.0

type MetadataProvider[In any] func(in In) Metadata

MetadataProvider is a function that provides Metadata for a processing context. It may extract information from the input value.

type Metrics added in v0.7.0

type Metrics struct {
	Start    time.Time
	Duration time.Duration
	Input    int
	Output   int
	InFlight int

	Metadata   Metadata
	RetryState *RetryState

	Error error
}

Metrics holds processing metrics for a single input.

func (*Metrics) Cancel added in v0.7.0

func (m *Metrics) Cancel() int

Cancel returns a numeric indicator of cancellation (1 for cancel, 0 otherwise).

func (*Metrics) Failure added in v0.7.0

func (m *Metrics) Failure() int

Failure returns a numeric indicator of failure (1 for failure, 0 otherwise).

func (*Metrics) Retry added in v0.8.0

func (m *Metrics) Retry() int

Retry returns a numeric indicator of retry (1 for retry, 0 otherwise).

func (*Metrics) Success added in v0.7.0

func (m *Metrics) Success() int

Success returns a numeric indicator of success (1 for success, 0 otherwise).

type MetricsCollector added in v0.7.0

type MetricsCollector func(metrics *Metrics)

MetricsCollector defines a function that collects single input metrics.

type MiddlewareFunc added in v0.6.0

type MiddlewareFunc[In, Out any] func(Processor[In, Out]) Processor[In, Out]

MiddlewareFunc wraps a Processor to add additional behavior to processing and cancellation.

type Option

type Option[In, Out any] func(*config[In, Out])

Option configures behavior of a Pipe.

func WithBuffer

func WithBuffer[In, Out any](buffer int) Option[In, Out]

WithBuffer sets output channel buffer size.

func WithCancel added in v0.6.0

func WithCancel[In, Out any](cancel func(In, error)) Option[In, Out]

WithCancel provides an additional cancel function to the processor.

func WithCleanup added in v0.9.0

func WithCleanup[In, Out any](cleanup func(ctx context.Context), timeout time.Duration) Option[In, Out]

WithCleanup adds a cleanup function to be called when processing is complete. If timeout is greater than zero, context will be canceled after the timeout duration.

func WithConcurrency

func WithConcurrency[In, Out any](concurrency int) Option[In, Out]

WithConcurrency sets worker count for concurrent processing.

func WithLogConfig added in v0.7.0

func WithLogConfig[In, Out any](logConfig LogConfig) Option[In, Out]

WithLogConfig overrides the default logger configuration for the pipe.

func WithMetadataProvider added in v0.8.0

func WithMetadataProvider[In, Out any](provider MetadataProvider[In]) Option[In, Out]

WithMetadataProvider adds a metadata provider to enrich context with metadata for each input. Can be used multiple times to add multiple providers. Metadata is available via MetadataFromContext or MetadataFromError. Metadata is used in logging and metrics collection.

func WithMetricsCollector added in v0.8.0

func WithMetricsCollector[In, Out any](collector MetricsCollector) Option[In, Out]

WithMetricsCollector adds a metrics collector to the processing pipeline. Can be used multiple times to add multiple collectors.

func WithMiddleware added in v0.6.0

func WithMiddleware[In, Out any](middleware MiddlewareFunc[In, Out]) Option[In, Out]

WithMiddleware adds middleware to the processing pipeline. Can be used multiple times. Middleware is applied in reverse order: for middlewares A, B, C, the execution flow is A→B→C→process.

func WithRecover added in v0.7.0

func WithRecover[In, Out any]() Option[In, Out]

WithRecover enables panic recovery in process functions. When enabled, any panic that occurs during processing is caught and converted into a RecoveryError. The stack trace is captured and included in the RecoveryError. The stack trace is also printed to stderr in the CancelFunc.

func WithRetryConfig added in v0.8.0

func WithRetryConfig[In, Out any](retryConfig RetryConfig) Option[In, Out]

WithRetryConfig adds retry middleware to the processing pipeline. Failed operations are retried based on ShouldRetry logic, with Backoff between attempts, until MaxAttempts is reached or Timeout expires. Nil fields use default values.

func WithTimeout added in v0.3.0

func WithTimeout[In, Out any](timeout time.Duration) Option[In, Out]

WithTimeout sets maximum duration for each process operation. If the processing exceeds the timeout, the context will be cancelled.

func WithoutContextPropagation added in v0.3.0

func WithoutContextPropagation[In, Out any]() Option[In, Out]

WithoutContextPropagation disables passing parent context to process functions. Each process call will receive a background context instead.

type Pipe added in v0.6.0

type Pipe[Pre, Out any] interface {
	// Start begins processing items from the input channel and returns a channel for outputs.
	// Processing continues until the input channel is closed or the context is canceled.
	Start(ctx context.Context, pre <-chan Pre) <-chan Out
}

Pipe represents a complete processing pipeline that transforms input values to output values. It combines preprocessing with a Processor and optional configuration.

func ApplyPipe added in v0.10.0

func ApplyPipe[In, Inter, Out any](a Pipe[In, Inter], b Pipe[Inter, Out]) Pipe[In, Out]

ApplyPipe combines two Pipes into one, connecting the output of the first to the input of the second. The resulting Pipe takes inputs of type In and produces outputs of type Out.

func NewBatchPipe added in v0.6.0

func NewBatchPipe[In any, Out any](
	handle func(context.Context, []In) ([]Out, error),
	maxSize int,
	maxDuration time.Duration,
	opts ...Option[[]In, Out],
) Pipe[In, Out]

NewBatchPipe creates a Pipe that groups inputs into batches before processing. Each batch is processed as a whole by the handle function, which can return multiple outputs. Batches are created when either maxSize items are collected or maxDuration elapses since the first item.

func NewFilterPipe added in v0.6.0

func NewFilterPipe[In any](
	handle func(context.Context, In) (bool, error),
	opts ...Option[In, In],
) Pipe[In, In]

NewFilterPipe creates a Pipe that selectively passes through inputs based on a predicate function. If the handle function returns true, the input is passed through; if false, the input is discarded. If the handle function returns an error, processing for that item stops and the error is handled.

func NewProcessPipe added in v0.6.0

func NewProcessPipe[In, Out any](
	handle func(context.Context, In) ([]Out, error),
	opts ...Option[In, Out],
) Pipe[In, Out]

NewProcessPipe creates a Pipe that can transform each input into multiple outputs. Unlike NewTransformPipe, this can produce zero, one, or many outputs for each input. The handle function receives a context and input item, and returns a slice of outputs or an error.

func NewSinkPipe added in v0.7.0

func NewSinkPipe[In any](
	handle func(context.Context, In) error,
	opts ...Option[In, struct{}],
) Pipe[In, struct{}]

NewSinkPipe creates a Pipe that applies handle to each value from in. The returned channel is closed after in is closed and all values are processed.

func NewTransformPipe added in v0.6.0

func NewTransformPipe[In, Out any](
	handle func(context.Context, In) (Out, error),
	opts ...Option[In, Out],
) Pipe[In, Out]

NewTransformPipe creates a Pipe that transforms each input into exactly one output. Unlike NewProcessPipe, this always produces exactly one output for each successful input. The handle function receives a context and input item, and returns a single output or an error.

type ProcessFunc added in v0.3.0

type ProcessFunc[In, Out any] func(context.Context, In) ([]Out, error)

ProcessFunc is the function used by Processor.Process.

type Processor added in v0.5.0

type Processor[In, Out any] interface {
	// Process processes a single input item with context awareness.
	// It transforms the input into zero or more output items, or returns an error.
	Process(context.Context, In) ([]Out, error)
	// Cancel handles errors when processing fails.
	Cancel(In, error)
}

Processor combines processing and cancellation logic into a single abstraction. This abstraction allows controlling and manipulating the flow of data and errors.

func NewProcessor added in v0.5.0

func NewProcessor[In, Out any](
	process ProcessFunc[In, Out],
	cancel CancelFunc[In],
) Processor[In, Out]

NewProcessor creates a new Processor with the provided process and cancel functions.

Panics if process is nil. If cancel is nil, a no-op function is used.

type RecoveryError added in v0.7.0

type RecoveryError struct {
	// PanicValue is the original value that was passed to panic().
	PanicValue any
	// StackTrace contains the full stack trace at the point of panic.
	StackTrace string
}

RecoveryError wraps a panic value with the stack trace. This allows panics to be converted to regular errors and handled gracefully.

func (*RecoveryError) Error added in v0.7.0

func (e *RecoveryError) Error() string

type RetryConfig added in v0.8.0

type RetryConfig struct {
	// ShouldRetry determines which errors trigger retry attempts.
	// If nil, defaults to retrying all errors.
	ShouldRetry ShouldRetryFunc

	// Backoff produces the wait duration between retry attempts.
	// If nil, defaults to 1 second constant backoff with jitter ±20%.
	Backoff BackoffFunc

	// MaxAttempts limits the total number of processing attempts, including the initial attempt.
	// Default is 3 attempts. Negative values allow unlimited retries.
	MaxAttempts int

	// Timeout sets the overall time limit for all processing attempts combined.
	// Zero or negative value means no timeout. Default is 1 minute.
	Timeout time.Duration
}

RetryConfig configures retry behavior for failed operations.

type RetryState added in v0.8.0

type RetryState struct {
	// Timeout is the configured overall timeout for all attempts.
	Timeout time.Duration
	// MaxAttempts is the configured maximum number of attempts.
	MaxAttempts int

	// Start is the time when the first attempt started. Duration and
	// Attempts are measured relative to this timestamp.
	Start time.Time
	// Attempts is the total number of processing attempts made. It is
	// 1-based and includes the initial (first) attempt. For example,
	// Attempts==1 means the initial attempt has been made and no retries
	// occurred yet.
	Attempts int
	// Duration is the total elapsed time since Start. This includes the
	// time spent in each attempt as well as any backoff/wait time between
	// attempts.
	Duration time.Duration
	// Causes is a list of all errors encountered during attempts.
	Causes []error
	// Err is the error that caused the retry process to abort (final error).
	Err error
}

RetryState tracks the progress and history of retry attempts.

func RetryStateFromContext added in v0.8.0

func RetryStateFromContext(ctx context.Context) *RetryState

RetryStateFromContext extracts the RetryState from a context. Returns nil if no RetryState is present.

func RetryStateFromError added in v0.8.0

func RetryStateFromError(err error) *RetryState

RetryStateFromError extracts the RetryState from an error. Returns nil if no RetryState is present.

type ShouldRetryFunc added in v0.8.0

type ShouldRetryFunc func(error) bool

ShouldRetryFunc determines whether an error should trigger a retry attempt.

func ShouldNotRetry added in v0.8.0

func ShouldNotRetry(errs ...error) ShouldRetryFunc

ShouldNotRetry creates a function that skips retries on specific errors. If no errors are specified, no errors trigger retries. If errors are specified, matching errors (using errors.Is) skip retries.

func ShouldRetry added in v0.8.0

func ShouldRetry(errs ...error) ShouldRetryFunc

ShouldRetry creates a function that retries on specific errors. If no errors are specified, all errors trigger retries. If errors are specified, only matching errors (using errors.Is) trigger retries.

Directories

Path Synopsis
examples
batch-pipe command
fanin command
generator command
message-ack command
transform-pipe command
internal
cloudevents
Package cloudevents provides CloudEvents v1.0.2 serialization and deserialization for gopipe messages.
Package cloudevents provides CloudEvents v1.0.2 serialization and deserialization for gopipe messages.
cqrs
Package cqrs provides Command Query Responsibility Segregation patterns for gopipe.
Package cqrs provides Command Query Responsibility Segregation patterns for gopipe.
Package middleware provides reusable middleware for message processing pipelines.
Package middleware provides reusable middleware for message processing pipelines.
pipe module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL