flow

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

flow

A Go library for building complex, type-safe workflows from simple steps.

Installation

go get github.com/sam-fredrickson/flow

Quick Example

import "github.com/sam-fredrickson/flow"

type Config struct {
    DatabaseURL string
}

// Define steps that operate on *Config
setupDb := func(ctx context.Context, cfg *Config) error {
    return setupDatabase(ctx, cfg.DatabaseURL)
}

runMigrations := func(ctx context.Context, cfg *Config) error {
    return applyMigrations(ctx, cfg.DatabaseURL)
}

// Compose workflow
workflow := flow.Do(setupDb, runMigrations)

// Execute it
cfg := &Config{DatabaseURL: "postgres://..."}
if err := workflow(context.Background(), cfg); err != nil {
    log.Fatal(err)
}

Key Features

Automatic retry with exponential backoff and jitter:

flow.Retry(
    CallExternalAPI(),
    flow.UpTo(3),
    flow.ExponentialBackoff(100*time.Millisecond, flow.WithFullJitter()),
)
// Those are the default settings, so this can be simplified
flow.Retry(CallExternalAPI())

Parallel execution with automatic error handling and goroutine management:

flow.InParallel(flow.Steps(
    DeployService("web"),
    DeployService("api"),
    DeployService("worker"),
))

Dynamic workflows where steps are determined at runtime:

flow.InParallel(
    flow.ForEach(GetServices, DeployService),
)

For more sophisticated patterns, see:

Documentation

  • Feature Guide — Complete guide covering all features and patterns
  • Design Philosophy — Understanding the principles and motivation behind the library
  • Complete Examples — Runnable examples showing real-world usage
  • Package documentation: Run go doc github.com/sam-fredrickson/flow or visit pkg.go.dev

Documentation

Overview

Package flow provides a library for building complex, type-safe workflows from simple steps—with built-in retry, parallelism, and configuration-driven orchestration.

The Problem

As workflows grow from simple scripts to complex orchestration, the code becomes harder to maintain. Every workflow needs goroutines, WaitGroups, error channels, and retry logic with exponential backoff. Conditional logic, parallel execution, and error handling mix together, obscuring the business logic. What starts as a 10-line script becomes 100 lines of nested if-statements and goroutine coordination.

Flow addresses these problems by letting you compose workflows from simple, reusable steps, while handling the mechanics of execution, error handling, and concurrency.

Core Concepts

Step is the fundamental building block. A step is a function that accepts a context and state, returning an error:

type Step[T any] = func(context.Context, T) error

Steps are composable: you can combine them to build complex workflows while maintaining type safety. Each step can read from and modify the shared state.

Additional types support data transformation:

  • Extract produces a value from the state

  • Transform processes an input into an output

  • Consume processes a value without producing an output

The Pipeline function combines an Extract, Transform, and Consume into a complete Step.

type Extract[T any, U any] = func(context.Context, T) (U, error)
type Transform[T any, In any, Out any] = func(context.Context, T, In) (Out, error)
type Consume[T any, U any] = func(context.Context, T, U) error

func Pipeline[T, A, B any](
	extract Extract[T, A],
	transform Transform[T, A, B],
	consume Consume[T, B],
) Step[T]

Parallel Execution

InParallel runs multiple workflows concurrently. This example shows deploying multiple services in parallel:

type DeploymentConfig struct {
    Services []string
    Region   string
}

deployService := func(name string) flow.Step[*DeploymentConfig] {
    return func(ctx context.Context, cfg *DeploymentConfig) error {
        return deploy(ctx, name, cfg.Region)
    }
}

workflow := flow.InParallel(
    flow.Steps(
        deployService("web"),
        deployService("api"),
        deployService("worker"),
    ),
)

cfg := &DeploymentConfig{Region: "us-west-2"}
if err := workflow(context.Background(), cfg); err != nil {
    log.Fatal(err)
}

Error Handling

Flow provides several error handling strategies:

Retry with exponential backoff:

type Config struct{ APIEndpoint string }

fetchData := func(ctx context.Context, cfg *Config) error {
    return callAPI(ctx, cfg.APIEndpoint)
}

workflow := flow.Retry(
    fetchData,
    flow.UpTo(3),
    flow.ExponentialBackoff(100*time.Millisecond),
)

cfg := &Config{APIEndpoint: "https://api.example.com"}
if err := workflow(context.Background(), cfg); err != nil {
    log.Fatal(err)
}

Best-effort execution with IgnoreError:

workflow := flow.Do(
    criticalStep,
    flow.IgnoreError(sendMetrics), // Continue even if metrics fail
    finalStep,
)

Fallback logic with OnError:

workflow := flow.OnError(
    tryPrimaryDatabase,
    useReadReplica, // Fallback if primary fails
)

Debugging with Traces

Flow provides opt-in execution tracing to help debug workflows, understand performance bottlenecks, and visualize execution paths. Traced wraps a workflow and records timing and errors for all Named steps:

type Config struct{ DBPath string }

workflow := flow.Do(
    flow.Named("validate", ValidateConfig),
    flow.Named("connect", ConnectDB),
    flow.Named("migrate", RunMigrations),
)

// Trace the workflow and output results
err := flow.Spawn(
    flow.Traced(workflow),
    flow.WriteTextTo(os.Stderr),
)(ctx, cfg)

This produces output like:

validate (45ms)
connect (120ms)
migrate (2.3s)

Traces can be filtered to focus on specific issues:

trace, err := flow.Traced(workflow)(ctx, cfg)
slowSteps := trace.Filter(flow.MinDuration(time.Second))
slowSteps.WriteText(os.Stderr)

Tracing supports JSON output for integration with analysis tools:

trace.WriteTo(os.Stdout) // JSON format

All tracing is opt-in and has minimal overhead when not used. Only Named steps are traced—unnamed steps are ignored. See Traced, Named, and the examples/tracing/ directory for more details.

Naming Best Practices

Use Named or AutoNamed liberally throughout your workflow hierarchy. Step names optimize how the library manages context values internally. Workflows that use WithLogging, WithSlogging, or Traced without corresponding Named steps may incur additional context lookup cost.

Configuration-Driven Workflows

Flow excels at configuration-driven orchestration where different components declare their setup needs, and the orchestration layer gathers and optimizes execution. See the examples/config-driven/ directory for a complete example.

Additional Resources

For more information, see:

Requirements

Flow requires Go 1.24 or later and has minimal external dependencies.

Index

Constants

This section is empty.

Variables

View Source
var ErrExhausted = errors.New("extraction exhausted")

ErrExhausted is a sentinel error that signals extraction has been exhausted.

This is used with Collect to indicate that an iterative extraction has reached its end and no more items are available.

Example:

func NextItem(ctx context.Context, state *State) (Item, error) {
    if state.queue.IsEmpty() {
        return Item{}, flow.ErrExhausted
    }
    return state.queue.Pop(), nil
}

Functions

func Flatten

func Flatten[T, U any](_ context.Context, _ T, nested [][]U) ([]U, error)

Flatten flattens a nested slice structure.

This is commonly needed when using Render on a function that returns slices, which produces nested slices ([][]Out). Flatten collapses these into a single slice, making it easy to compose in transformation pipelines.

Example:

// Collect pages, extract records from each page (gives [][]Record), flatten
flow.From(
    flow.Collect(FetchPages),        // Extract[*State, []Page]
    flow.Chain(
        flow.Render(ExtractRecords), // []Page → [][]Record
        flow.Flatten,                // [][]Record → []Record
        flow.Render(ValidateRecord), // []Record → []ValidatedRecord
    ),
)

Common use case: multi-stage ETL pipelines where one transformation produces multiple items per input:

flow.Pipeline(
    GetBatches,                           // Extract[*State, []Batch]
    flow.Chain(
        flow.Render(SplitBatchIntoItems), // Batch → []Item, gives [][]Item
        flow.Flatten,                     // [][]Item → []Item
        flow.Render(ProcessItem),         // []Item → []ProcessedItem
    ),
    flow.Apply(SaveItem),
)

func Logger

func Logger(ctx context.Context) *log.Logger

Logger returns the log.Logger from the context, or log.Default if none is set.

This is useful for custom logging decorators that need to access the configured logger.

Example:

func CustomLogging[T any](step flow.Step[T]) flow.Step[T] {
    return func(ctx context.Context, t T) error {
        logger := flow.Logger(ctx)
        names := flow.Names(ctx)
        // Custom logging logic here
        logger.Printf("Custom: %v\n", names)
        return step(ctx, t)
    }
}

func Names added in v0.2.0

func Names(ctx context.Context) []string

Names returns a copy of the step name stack from the context. Returns nil if no names are present in the context.

This is useful for custom logging decorators or other functionality that needs to inspect the current step hierarchy.

func Slogger

func Slogger(ctx context.Context) *slog.Logger

Slogger returns the slog.Logger from the context, or slog.Default if none is set.

This is useful for custom logging decorators that need to access the configured structured logger.

Example:

func CustomSlogging[T any](step flow.Step[T]) flow.Step[T] {
    return func(ctx context.Context, t T) error {
        logger := flow.Slogger(ctx)
        names := flow.Names(ctx)
        // Custom logging logic here
        logger.Info("custom step", "names", names)
        return step(ctx, t)
    }
}

Types

type AutoNamedOption

type AutoNamedOption func(*autoNamedOptions)

An AutoNamedOption is a function option for AutoNamed and related functions.

func SkipCaller

func SkipCaller(delta int) AutoNamedOption

SkipCaller adds a delta to the number of skipped stack frames.

This is useful when wrapping decorators inside wrapper functions, allowing AutoNamed to skip intermediate layers and identify the original caller.

Example:

func SomeStep() flow.Step[*Task] {
    return TaskStep(func(ctx context.Context, task *Task) error {
        // Implementation
        return nil
    })
}

func TaskStep(step flow.Step[*Task]) flow.Step[*Task] {
    // Skip TaskStep so AutoNamed picks SomeStep instead
    return AutoNamed(Retry(step), flow.SkipCaller(1))
}

type BackoffOption

type BackoffOption func(*backoffConfig)

BackoffOption configures backoff behavior for retry predicates.

func WithFullJitter

func WithFullJitter() BackoffOption

WithFullJitter applies full jitter to backoff delays.

The actual delay will be a random value between 0 and the calculated delay. This is the AWS-recommended approach for maximum desynchronization and is effective at preventing thundering herd problems in distributed systems.

Cancels out any WithPercentageJitter option if both are provided. The last option in the list takes precedence.

Applies to both FixedBackoff and ExponentialBackoff.

func WithMaxDelay

func WithMaxDelay(max time.Duration) BackoffOption

WithMaxDelay caps the maximum backoff delay.

This is useful for preventing exponential backoff from growing unbounded. For example, WithMaxDelay(30*time.Second) ensures retries never wait longer than 30 seconds, even if the exponential calculation would produce a larger value.

The cap is applied after jitter is calculated, ensuring the final delay (including jitter) never exceeds the maximum.

Applies to both FixedBackoff and ExponentialBackoff.

func WithMultiplier

func WithMultiplier(m float64) BackoffOption

WithMultiplier sets the exponential growth rate for ExponentialBackoff.

The default multiplier is 2.0, which doubles the delay on each retry. For example, WithMultiplier(1.5) creates gentler growth (1.5×), while WithMultiplier(3.0) creates more aggressive growth (3×).

Only applies to ExponentialBackoff. This option is ignored by FixedBackoff.

func WithPercentageJitter

func WithPercentageJitter(percent float64) BackoffOption

WithPercentageJitter applies percentage-based jitter to backoff delays.

For example, WithPercentageJitter(0.2) adds ±20% randomness to the delay. The actual delay will be between (delay × (1-percent)) and (delay × (1+percent)).

This provides more predictable retry timing than full jitter while still helping to desynchronize retries across multiple clients.

Cancels out any WithFullJitter option if both are provided. The last option in the list takes precedence.

Applies to both FixedBackoff and ExponentialBackoff.

type Consume

type Consume[T any, U any] = func(context.Context, T, U) error

Consume is a failable, cancelable procedure that takes a parameter.

T is some kind of state upon which the procedure operates. U is the input value to be consumed/processed.

If using parallel steps, ensure that access to T is thread-safe.

func Apply

func Apply[T, U any](f Consume[T, U]) Consume[T, []U]

Apply consumes each element in a slice (serial, fail-fast).

This enables clean serial iteration without orchestrator wrappers. It's fully compositional with existing Feed and Pipeline.

Example:

// Simple: extract and apply
flow.With(
    GetUsers,                             // Extract[T, []User]
    Apply(SaveUser),                      // Consume[T, User]
)                                         // Step[T]

// Pipeline: extract, transform, apply
flow.Pipeline(
    GetIDs,                               // Extract[T, []int]
    Render(LoadEntity),                   // Transform to []Entity
    Apply(ProcessEntity),                 // Consume each
)

// Composed consumption
flow.With(
    GetRecords,
    Apply(flow.Feed(
        ValidateRecord,
        SaveRecord,
    )),
)

func AutoNamedConsume

func AutoNamedConsume[T, U any](consume Consume[T, U], opts ...AutoNamedOption) Consume[T, U]

AutoNamedConsume wraps a Consume with a name automatically derived from the calling function.

When tracing is enabled (via Traced), AutoNamedConsume automatically records execution events, just like NamedConsume.

See AutoNamed for further details.

func Feed

func Feed[T, A, B any](
	transform Transform[T, A, B],
	consume Consume[T, B],
) Consume[T, A]

Feed composes a Transform and a Consume to produce a new Consume.

This combines the "write side" of a data pipeline: transforming an input value and feeding it to a consumer. The result is a new Consume that accepts the original input type A.

Example:

saveConfig := Feed(
    ParseConfig,  // Transform[*State, []byte, Config]
    SaveToDB,     // Consume[*State, Config]
)                 // Consume[*State, []byte]

func NamedConsume

func NamedConsume[T any, U any](
	name string,
	consume Consume[T, U],
) Consume[T, U]

NamedConsume wraps a Consume with a name.

The name is prepended to the error message of the Consume, separated by a colon. For example, if the Consume returns an error "invalid config", the name is prepended to the error message: "example: invalid config".

When tracing is enabled (via Traced), NamedConsume automatically records execution events including timing and errors.

This is useful for debugging and logging.

type Extract

type Extract[T any, U any] = func(context.Context, T) (U, error)

Extract is a failable, cancelable procedure that returns a value.

T is some kind of state upon which the procedure operates. U is the returned value.

If using parallel steps, ensure that access to T is thread-safe.

func AutoNamedExtract

func AutoNamedExtract[T, U any](extract Extract[T, U], opts ...AutoNamedOption) Extract[T, U]

AutoNamedExtract wraps an Extract with a name automatically derived from the calling function.

When tracing is enabled (via Traced), AutoNamedExtract automatically records execution events, just like NamedExtract.

See AutoNamed for further details.

func Collect

func Collect[T, U any](f Extract[T, U]) Extract[T, []U]

Collect repeatedly extracts until ErrExhausted, collecting all results.

This enables pull-based iteration patterns while staying within the Extract algebra. The extractor should return ErrExhausted when no more items are available.

Example:

func NextQueueItem(ctx context.Context, state *State) (Item, error) {
    if state.queue.IsEmpty() {
        return Item{}, flow.ErrExhausted
    }
    return state.queue.Pop(), nil
}

// Collect all items from queue
allItems := Collect(NextQueueItem)  // Extract[*State, []Item]

// Use in pipeline
flow.Pipeline(
    Collect(NextQueueItem),
    Render(TransformItem),
    Apply(SaveItem),
)

func From

func From[T, A, B any](
	extract Extract[T, A],
	transform Transform[T, A, B],
) Extract[T, B]

From composes an Extract and a Transform to produce a new Extract.

This combines the "read side" of a data pipeline: extracting a value from state and transforming it into another value. The result is a new Extract that performs both operations.

Example:

validatedConfig := From(
    GetRawConfig,      // Extract[*State, []byte]
    ParseAndValidate,  // Transform[*State, []byte, Config]
)                      // Extract[*State, Config]

func NamedExtract

func NamedExtract[T any, U any](
	name string,
	extract Extract[T, U],
) Extract[T, U]

NamedExtract wraps an Extract with a name.

The name is prepended to the error message of the Extract, separated by a colon. For example, if the Extract returns an error "invalid config", the name is prepended to the error message: "example: invalid config".

When tracing is enabled (via Traced), NamedExtract automatically records execution events including timing and errors.

This is useful for debugging and logging.

func Traced added in v0.2.0

func Traced[T any](step Step[T], opts ...TraceOption) Extract[T, *Trace]

Traced wraps a workflow and returns an Extract that produces a Trace.

The trace records execution events for all Named steps within the workflow. This enables debugging, performance analysis, and understanding execution paths.

Events are recorded in approximate start order but may have minor variations in parallel workflows due to mutex contention. For precise chronological ordering, sort events by their Start time.

Options can be provided to configure streaming and context optimization. See WithStreamTo.

Example:

workflow := flow.Do(
    flow.Named("validate", ValidateConfig),
    flow.Named("connect", ConnectDB),
    flow.Named("migrate", RunMigrations),
)

// Basic tracing
err := flow.Spawn(
    flow.Traced(workflow),
    flow.WriteTextTo(os.Stdout),
)(ctx, state)

// Tracing with streaming to file
f, _ := os.Create("trace.jsonl")
defer f.Close()
trace, err := flow.Traced(workflow, flow.WithStreamTo(f))(ctx, state)

Tracing is opt-in and has minimal overhead when not used. All Named, NamedExtract, NamedTransform, NamedConsume, and AutoNamed variants automatically record events when a trace is present in the context.

func Value added in v0.2.0

func Value[T, U any](u U) Extract[T, U]

Value lifts a constant value into an Extract.

This creates an Extract that ignores state and context, always returning the given value. It's the monadic return/pure operation for Extract, useful for closing over values in compositions.

Example:

// Close over a slice in ForEach
flow.ForEach(
    flow.Value[*State](items),
    ProcessItem,
)

// Use in Pipeline
flow.Pipeline(
    flow.Value[*State](config),
    ValidateConfig,
    SaveConfig,
)

type IndexedError

type IndexedError struct {
	Index int
	Err   error
}

IndexedError wraps an error with the index at which it occurred in a collection.

This provides context when iterating over elements in functions like Render, Apply, or Collect fails, making it easier to debug which element caused the failure.

Example:

err := flow.Render(ProcessItem)(ctx, state, items)
if err != nil {
    var ie *flow.IndexedError
    if errors.As(err, &ie) {
        fmt.Printf("Failed at element %d: %v\n", ie.Index, ie.Err)
    }
}

func (*IndexedError) Error

func (e *IndexedError) Error() string

Error implements the error interface.

func (*IndexedError) Unwrap

func (e *IndexedError) Unwrap() error

Unwrap returns the underlying error for error inspection via errors.Is and errors.As.

type NamedError added in v0.2.0

type NamedError struct {
	// Name is the name of the step that failed.
	Name string
	// Err is the underlying error from the step.
	Err error
}

NamedError is an error returned by Named and related functions. It wraps the underlying error with the name of the step that failed. Users can use errors.As to detect and inspect NamedErrors.

func (NamedError) Error added in v0.2.0

func (e NamedError) Error() string

Error returns the formatted error message.

func (NamedError) Unwrap added in v0.2.0

func (e NamedError) Unwrap() error

Unwrap returns the underlying error.

type Options

type Options struct {
	// JoinErrors controls error handling.
	//
	// By default, when false, the first step that returns an error stops
	// execution and that error is returned immediately.
	//
	// If enabled, all steps are run to completion regardless of errors, and a
	// combined `errors.Join` error of all non-nil errors is returned.
	JoinErrors bool
}

Options specifies how steps are run sequentially.

type ParallelOptions

type ParallelOptions struct {
	// Limit controls how many goroutines may run.
	//
	// Numbers less than or equal to zero indicate no limit.
	Limit int

	// JoinErrors controls error handling.
	//
	// By default, when false, the first step that returns an error cancels
	// the rest, and this first error is returned. (This is the behavior of
	// the `errgroup` package.)
	//
	// If enabled, all steps are run to completion regardless of errors, and a
	// combined `errors.Join` error of all non-nil errors is returned.
	JoinErrors bool
}

ParallelOptions specifies how steps are run concurrently.

type Predicate

type Predicate[T any] = func(context.Context, T) (bool, error)

A Predicate is a failable boolean condition check.

It returns true or false to indicate whether the condition is met, and may return an error if the condition check itself fails.

T is some kind of state upon which the check operates.

If using parallel steps, ensure that access to T is thread-safe.

func And

func And[T any](predicates ...Predicate[T]) Predicate[T]

And combines multiple predicates with logical AND.

All predicates must return true for the result to be true. Evaluation short-circuits on the first false or error.

Example:

When(
    And(IsProduction(), IsHealthy()),
    DeployToProduction,
)

func Not

func Not[T any](predicate Predicate[T]) Predicate[T]

Not negates a predicate, returning true when the predicate returns false and vice versa.

Example:

While(Not(ServiceIsHealthy()), CheckAndWait())

func Or

func Or[T any](predicates ...Predicate[T]) Predicate[T]

Or combines multiple predicates with logical OR.

Returns true if any predicate returns true. Evaluation short-circuits on the first true or error.

Example:

When(
    Or(IsDevEnvironment(), HasFeatureFlag()),
    EnableExperimentalFeature,
)

type RecoveredPanic

type RecoveredPanic struct {
	Value any
}

RecoveredPanic is an error type that wraps a panic value.

func (*RecoveredPanic) Error

func (p *RecoveredPanic) Error() string

type RetryPredicate

type RetryPredicate = func(context.Context, int, error) bool

A RetryPredicate determines whether a failed step should be retried.

It receives the context, the number of attempts so far, and the error from the last attempt. It returns true to retry, false to stop.

func ExponentialBackoff

func ExponentialBackoff(base time.Duration, opts ...BackoffOption) RetryPredicate

ExponentialBackoff waits with exponentially increasing delays before each retry.

By default, the delay for attempt N is base × 2^(N-1). For example, with a base of 100ms:

  • Attempt 1: 100ms
  • Attempt 2: 200ms
  • Attempt 3: 400ms

If the number of attempts is less than 1, or the calculated delay overflows, the base delay is used instead.

If the context is cancelled during the wait, the predicate returns false.

Options:

func FixedBackoff

func FixedBackoff(delay time.Duration, opts ...BackoffOption) RetryPredicate

FixedBackoff waits for a fixed duration before each retry.

The delay is applied before each retry. If the context is cancelled during the wait, the predicate returns false and the retry is aborted.

Options:

func OnlyIf

func OnlyIf(check func(error) bool) RetryPredicate

OnlyIf conditionally retries based on the error.

The predicate returns true only if the provided check function returns true for the error. This is useful for retrying only transient errors while immediately failing on permanent errors like validation failures.

func UpTo

func UpTo(maxAttempts int) RetryPredicate

UpTo limits retries to a maximum number of attempts.

The predicate returns true if attempts < maxAttempts, allowing retries to continue until the limit is reached.

type Step

type Step[T any] = func(context.Context, T) error

A Step is a failable, cancelable procedure.

T is some kind of state upon which the procedure operates.

If using parallel steps, ensure that access to T is thread-safe.

func AutoNamed

func AutoNamed[T any](step Step[T], opts ...AutoNamedOption) Step[T]

AutoNamed wraps a Step with a name automatically derived from the calling function.

This uses reflection to extract the name of the function that calls AutoNamed, which is useful for reducing repetition when naming steps in step constructors.

When tracing is enabled (via Traced), AutoNamed steps automatically record execution events, just like Named steps.

Example:

func CreateDatabase() flow.Step[*Config] {
    return flow.AutoNamed(func(ctx context.Context, cfg *Config) error {
        // Implementation
        return nil
    })
}
// If this step fails, the error will be prefixed with "CreateDatabase: ..."
// When traced, the event will be named "CreateDatabase"

Note: AutoNamed only works when called directly from a named function. It will not work correctly when called from anonymous functions or closures.

func Do

func Do[T any](steps ...Step[T]) Step[T]

Do executes multiple steps in order, one at a time.

This is the simplest way to run a sequence of static steps. For dynamic step sequences (StepsProvider), use InSerial instead.

Do is the same as DoWith with the default Options.

Example:

Do(
    ValidateConfig(),
    StartDatabase(),
    StartServer(),
)

func DoWith

func DoWith[T any](opts Options, steps ...Step[T]) Step[T]

DoWith executes multiple steps in order, one at a time, with custom options.

This allows configuring behavior like error joining. Use Do for the common case with default options.

Example:

// Run all cleanup steps even if some fail, collect all errors
DoWith(
    Options{JoinErrors: true},
    CleanupTempFiles(),
    CloseConnections(),
    RemoveLockFiles(),
)

func IgnoreError

func IgnoreError[T any](step Step[T]) Step[T]

IgnoreError wraps a step to always return nil, even if the step fails.

This is useful for "best effort" operations where failures should not stop the overall flow.

func InParallel

func InParallel[T any](providers ...StepsProvider[T]) Step[T]

InParallel combines multiple step sequences and runs them concurrently.

InParallel is the same as InParallelWith with the default ParallelOptions.

func InParallelWith

func InParallelWith[T any](
	opts ParallelOptions,
	providers ...StepsProvider[T],
) Step[T]

InParallelWith combines multiple step sequences and runs them concurrently.

All step sequences are expanded sequentially first, then the resulting steps are run in separate goroutines.

func InSerial

func InSerial[T any](providers ...StepsProvider[T]) Step[T]

InSerial combines multiple step sequences and runs them in order, one at a time.

InSerial is the same as InSerialWith with the default Options.

Example:

InSerial(
    flow.ForEach(GetFoos, ProcessFoo),
    flow.ForEach(GetBars, ProcessBar),
)

func InSerialWith

func InSerialWith[T any](
	opts Options,
	providers ...StepsProvider[T],
) Step[T]

InSerialWith combines multiple step sequences and runs them in order, one at a time.

func Named

func Named[T any](name string, step Step[T]) Step[T]

Named wraps a Step with a name.

The name is prepended to the error message of the Step, separated by a colon. For example, if the Step returns an error "invalid config", the name is prepended to the error message: "example: invalid config".

Named also maintains a stack of step names in the context, which can be retrieved using Names. When Named decorators are nested, each appends its name to the stack, creating a hierarchical path (e.g., "process.parse.validate").

When tracing is enabled (via Traced), Named automatically records execution events including timing and errors.

This is useful for debugging and logging.

func OnError

func OnError[T any](step Step[T], onError Transform[T, error, Step[T]]) Step[T]

OnError provides dynamic error handling with fallback steps.

If the primary step fails, onError is called with the error and can return a fallback step to execute instead. The onError transform has access to the context, state, and the error, allowing for sophisticated error handling strategies.

If onError itself returns an error, that error is propagated and no fallback step is executed.

Example:

OnError(
    CallPrimaryAPI,
    func(ctx context.Context, state *State, err error) (Step[*State], error) {
        if errors.Is(err, ErrTimeout) {
            return CallBackupAPI, nil
        }
        if errors.Is(err, ErrRateLimit) {
            return RetryAfterDelay, nil
        }
        return nil, fmt.Errorf("unrecoverable: %w", err)
    },
)

func Pipeline

func Pipeline[T, A, B any](
	extract Extract[T, A],
	transform Transform[T, A, B],
	consume Consume[T, B],
) Step[T]

Pipeline composes an Extract, Transform, and Consume into a complete Step.

This creates a full data pipeline: extract a value from state, transform it, and consume the result. This is a common pattern for workflows that follow the read-process-write model.

Pipeline is equivalent to:

With(From(extract, transform), consume)
With(extract, Feed(transform, consume))

Example:

processConfig := Pipeline(
    GetRawConfig,      // Extract[*State, []byte]
    ParseAndValidate,  // Transform[*State, []byte, Config]
    SaveToDatabase,    // Consume[*State, Config]
)                      // Step[*State]

func RecoverPanics

func RecoverPanics[T any](step Step[T]) Step[T]

RecoverPanics wraps a step to recover from panics and convert them to errors.

If the step panics, the panic value is wrapped in a RecoveredPanic error. This is useful for defensive programming when calling code that may panic.

func Retry

func Retry[T any](
	step Step[T],
	predicates ...RetryPredicate,
) Step[T]

Retry executes a step and retries it on failure based on the given predicates.

All predicates must return true for a retry to occur. If any predicate returns false, the last error is returned immediately.

If no predicates are provided, this defaults to retrying up to 3 times with exponential backoff starting at 100ms and full jitter to prevent thundering herd problems.

func Sleep

func Sleep[T any](duration time.Duration) Step[T]

Sleep pauses execution for the specified duration.

The sleep respects context cancellation, returning context.Canceled if the context is cancelled before the duration elapses.

This is useful in polling loops or when adding delays between operations.

Example:

flow.While(
    ServiceNotReady(),
    flow.Do(
        CheckStatus(),
        flow.Sleep(5*time.Second),
    ),
)

func Spawn

func Spawn[Parent, Child any](
	derive Extract[Parent, Child],
	step Step[Child],
) Step[Parent]

Spawn executes a step with a different state type derived from the parent.

This is useful when you need to run a workflow in a narrower context. The derive function extracts or constructs child state from the parent, then the step runs with that child state.

Example:

// Run database setup with DB-specific state
Spawn(
    PrepareDbConnection,    // Extract[*EnvSetup, *DbSetup]
    RunDbMigrations,        // Step[*DbSetup]
)                           // Step[*EnvSetup]

func Unless

func Unless[T any](
	predicate Predicate[T],
	step Step[T],
) Step[T]

Unless runs the given step only if the predicate returns false.

If the predicate returns true, the step is skipped and nil is returned. If the predicate returns an error, that error is propagated.

func When

func When[T any](
	predicate Predicate[T],
	step Step[T],
) Step[T]

When runs the given step only if the predicate returns true.

If the predicate returns false, the step is skipped and nil is returned. If the predicate returns an error, that error is propagated.

func While

func While[T any](
	predicate Predicate[T],
	step Step[T],
) Step[T]

While repeatedly executes the step as long as the predicate returns true.

The predicate is evaluated before each iteration. If it returns false or an error, the loop terminates. If the step returns an error, that error is propagated immediately.

Example:

While(
    ServiceIsNotReady(),
    CheckStatus(),
)

To prevent infinite loops, combine with WithTimeout or use predicates that eventually become false.

func With

func With[T, U any](
	extract Extract[T, U],
	consume Consume[T, U],
) Step[T]

With combines an Extract and a Consume into a Step.

This creates a two-stage workflow: extract a value from state, then process/consume it. This is useful when you need to read some data and perform an action with it.

Example:

processUsers := With(
    GetUsers,      // Extract[*State, []User]
    SendEmails,    // Consume[*State, []User]
)                  // Step[*State]

func WithDeadline

func WithDeadline[T any](deadline time.Time, step Step[T]) Step[T]

WithDeadline wraps a step with an absolute deadline.

The step is executed with a derived context that will be cancelled at the specified time. If the step does not complete before the deadline, it will receive a cancelled context and should return context.DeadlineExceeded.

Example:

deadline := time.Now().Add(5 * time.Second)
flow.WithDeadline(deadline, ExpensiveOperation())

func WithLogger

func WithLogger[T any](logger *log.Logger, step Step[T]) Step[T]

WithLogger configures a Step to use a specific log.Logger for logging.

The logger is stored in the context and used by WithLogging. If no logger is configured, WithLogging will use log.Default.

This is typically applied once at the root of a workflow to configure logging for all nested steps. For optimal performance, use Named or AutoNamed liberally throughout your workflow—the flow library consolidates all context state into a single lookup via flowCtx, but this optimization only provides constant-time access when step names are present in the context chain. Using WithLogger without corresponding Named steps may incur linear context chain traversal cost.

Example:

myLogger := log.New(os.Stdout, "workflow: ", log.LstdFlags)
step := WithLogger(myLogger,
    Named("process",
        WithLogging(processStep)))

func WithLogging

func WithLogging[T any](step Step[T]) Step[T]

WithLogging wraps a Step with logging that prints messages when the step starts and finishes, including execution duration.

The log messages include the full dotted path of step names from the context (e.g., "process.parse.validate"). If no names are set, logs show "<unknown>".

The logger used is retrieved from the context (set by WithLogger). If no logger is configured, log.Default is used.

Log format:

[step.name] starting step
[step.name] finished step (took 123ms)

Example:

step := Named("process",
    WithLogging(
        Named("parse",
            WithLogging(parseStep))))

This would log:

[process] starting step
[process.parse] starting step
[process.parse] finished step (took 5ms)
[process] finished step (took 10ms)

For custom logging formats, use Names to retrieve the name stack and implement your own logging decorator.

func WithSlogger

func WithSlogger[T any](logger *slog.Logger, step Step[T]) Step[T]

WithSlogger configures a Step to use a specific slog.Logger for structured logging.

The logger is stored in the context and used by WithSlogging. If no logger is configured, WithSlogging will use slog.Default.

This is typically applied once at the root of a workflow to configure logging for all nested steps. For optimal performance, use Named or AutoNamed liberally throughout your workflow—the flow library consolidates all context state into a single lookup via flowCtx, but this optimization only provides constant-time access when step names are present in the context chain. Using WithSlogger without corresponding Named steps may incur linear context chain traversal cost.

Example:

myLogger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
step := WithSlogger(myLogger,
    Named("process",
        WithSlogging(slog.LevelInfo, processStep)))

func WithSlogging

func WithSlogging[T any](level slog.Level, step Step[T]) Step[T]

WithSlogging wraps a Step with structured logging that emits log records when the step starts and finishes, including execution duration.

The log records include the full dotted path of step names from the context as a "name" attribute (e.g., "process.parse.validate"). If no names are set, the name attribute will be "<unknown>". The finish log includes a "duration_ms" attribute with the execution time in milliseconds.

The logger used is retrieved from the context (set by WithSlogger). If no logger is configured, slog.Default is used.

Example:

step := Named("process",
    WithSlogging(slog.LevelInfo,
        Named("parse",
            WithSlogging(slog.LevelDebug, parseStep))))

This would emit structured log records similar to:

{"level":"info","msg":"starting step","name":"process"}
{"level":"debug","msg":"starting step","name":"process.parse"}
{"level":"debug","msg":"finished step","name":"process.parse","duration_ms":5}
{"level":"info","msg":"finished step","name":"process","duration_ms":10}

For custom logging formats, use Names to retrieve the name stack and implement your own logging decorator.

func WithTimeout

func WithTimeout[T any](timeout time.Duration, step Step[T]) Step[T]

WithTimeout wraps a step with a timeout.

The step is executed with a derived context that will be cancelled after the specified duration. If the step does not complete within the timeout, it will receive a cancelled context and should return context.DeadlineExceeded.

Example:

flow.WithTimeout(5*time.Second, ExpensiveOperation())

func WriteFlatTextTo added in v0.2.0

func WriteFlatTextTo(w io.Writer) Step[*Trace]

WriteFlatTextTo returns a Step that serializes a trace to flat text format.

This enables natural composition with Spawn. The flat format shows events in chronological order without tree indentation, making it more suitable for analyzing parallel workflows.

workflow := flow.Spawn(
    flow.Traced(myWorkflow),
    flow.WriteFlatTextTo(os.Stdout),
)

func WriteJSONTo added in v0.2.0

func WriteJSONTo(w io.Writer) Step[*Trace]

WriteJSONTo returns a Step that serializes a trace to JSON.

This enables natural composition with Spawn:

workflow := flow.Spawn(
    flow.Traced(myWorkflow),
    flow.WriteJSONTo(os.Stdout),
)

func WriteTextTo added in v0.2.0

func WriteTextTo(w io.Writer) Step[*Trace]

WriteTextTo returns a Step that serializes a trace to human-readable text.

This enables natural composition with Spawn:

workflow := flow.Spawn(
    flow.Traced(myWorkflow),
    flow.WriteTextTo(os.Stdout),
)

type StepsProvider

type StepsProvider[T any] = Extract[T, []Step[T]]

StepsProvider provides a sequence of steps.

This allows workflows to grow dynamically at runtime; see Steps, Do InParallel, From, and Map.

func ForEach

func ForEach[T, U any](
	extract Extract[T, []U],
	f func(U) Step[T],
) StepsProvider[T]

ForEach is a convenience helper that combines Extract and Map.

It extracts a slice and maps each element to a step, resulting in a StepsProvider. This is useful when you want the flexibility to choose serial or parallel execution at the orchestration level.

Example:

// Define once
processWorkflow := flow.ForEach(GetItems, ProcessItem)

// Use serially
flow.InSerial(processWorkflow)

// Use in parallel
flow.InParallel(processWorkflow)

func Steps

func Steps[T any](steps ...Step[T]) StepsProvider[T]

Steps creates a static step sequence from the provided steps.

This is useful when you need to convert static steps into a StepsProvider for use with InSerial or InParallel. For simple static sequences, prefer using Do directly.

Example:

// Prefer Do for simple static sequences
Do(ValidateConfig(), StartDatabase(), RunMigrations())

// Use Steps when you need a StepsProvider
InSerial(
    Steps(ValidateConfig(), StartDatabase()),
    flow.ForEach(GetServices, DeployService),
)

type Trace added in v0.2.0

type Trace struct {
	// Events is the list of all recorded trace events.
	Events []TraceEvent

	// Start is when the traced workflow began execution.
	Start time.Time

	// Duration is the total execution time of the workflow.
	// For filtered traces (from Filter), this is the sum of event durations.
	Duration time.Duration

	// TotalSteps is the total number of Named steps executed.
	// Only Named, NamedExtract, NamedTransform, NamedConsume, and AutoNamed
	// variants are counted. Unnamed steps are not included.
	// For filtered traces (from Filter), this equals len(Events).
	TotalSteps int

	// TotalErrors is the number of steps that failed with an error.
	// For filtered traces (from Filter), this is the count of events with errors.
	TotalErrors int
}

Trace is the public result type containing execution events and metadata. All fields are directly accessible for querying and analysis.

func (*Trace) Filter added in v0.2.0

func (t *Trace) Filter(filters ...TraceFilter) *Trace

Filter returns a new Trace containing only events matching all provided filters.

Multiple filters are AND'd together. The original trace is not modified.

The returned trace's TotalSteps equals the number of filtered events, and TotalErrors equals the number of filtered events with errors. The Duration field represents the sum of durations of all filtered events. The Start field is set to the earliest start time of the filtered events. If no events match, Start is set to the original trace's Start time.

Example:

// Find slow steps with errors
filtered := trace.Filter(
    flow.MinDuration(time.Second),
    flow.HasError(),
)

func (*Trace) FindEvent added in v0.2.0

func (t *Trace) FindEvent(filters ...TraceFilter) *TraceEvent

FindEvent returns the first event matching all provided filters, or nil if none match.

Multiple filters are AND'd together.

Example:

// Find the first slow database operation
event := trace.FindEvent(
    flow.PathMatches("database.*"),
    flow.MinDuration(time.Second),
)
if event != nil {
    log.Printf("slow DB op: %s took %v", event.Names, event.Duration)
}

func (*Trace) WriteFlatText added in v0.2.0

func (t *Trace) WriteFlatText(w io.Writer) (int64, error)

WriteFlatText outputs a human-readable flat list of events.

Unlike [WriteText], this method shows events in a chronological list with full paths (e.g., "parent > child") and no tree indentation. This format is recommended for analyzing parallel workflows where tree structure may be misleading due to event interleaving.

Example output:

validate (45ms)
connect (120ms)
migrate (2.3s)
migrate > create-tables (1.8s)
migrate > create-indexes (500ms)

Events appear in the order they were recorded (approximate start order). For precise chronological ordering, sort by Start time before formatting:

events := trace.Events
sort.Slice(events, func(i, j int) bool {
    return events[i].Start.Before(events[j].Start)
})

func (*Trace) WriteText added in v0.2.0

func (t *Trace) WriteText(w io.Writer) (int64, error)

WriteText outputs a human-readable tree view of the trace.

The tree view shows the hierarchical structure of steps with their durations. Indentation reflects nesting depth, and the displayed name is the last element of the step path.

Example output:

validate (45ms)
connect (120ms)
migrate (2.3s)
  create-tables (1.8s)
  create-indexes (500ms)

For parallel workflows where events may be interleaved, the tree view shows execution order, not logical nesting, which can be misleading. For parallel workflows, use [WriteFlatText] instead, which shows full paths and makes event relationships clearer.

func (*Trace) WriteTo added in v0.2.0

func (t *Trace) WriteTo(w io.Writer) (int64, error)

WriteTo serializes the trace as JSON to the given writer.

Returns the number of bytes written and any error. The JSON is formatted as a pretty-printed array of events.

This is different from streaming (via WithStreamTo) which outputs JSON Lines format (one event per line) during execution. WriteTo outputs a single JSON array after execution completes.

type TraceEvent added in v0.2.0

type TraceEvent struct {
	// Names is the full hierarchical path of step names.
	// For example: ["parent", "child", "grandchild"]
	Names []string `json:"step_names"`

	// Start is when the step began execution.
	Start time.Time `json:"start"`

	// Duration is how long the step took to execute.
	Duration time.Duration `json:"duration"`

	// Error is the error message if the step failed, empty otherwise.
	Error string `json:"error,omitempty"`
}

TraceEvent represents a single execution event in a traced workflow.

Each event captures the full path of step names, start time, duration, and any error that occurred during execution.

type TraceFilter added in v0.2.0

type TraceFilter func(TraceEvent) bool

TraceFilter is a predicate function for filtering trace events.

func DepthAtMost added in v0.2.0

func DepthAtMost(depth int) TraceFilter

DepthAtMost returns a filter that matches events at or above the given depth.

For example, DepthAtMost(2) matches steps at depth 1 or 2.

func DepthEquals added in v0.2.0

func DepthEquals(depth int) TraceFilter

DepthEquals returns a filter that matches events at the given depth.

Depth is defined as len(Names). For example:

  • depth 1: top-level steps
  • depth 2: first level of nesting
  • depth 3: second level of nesting

func ErrorMatches added in v0.2.0

func ErrorMatches(pattern string) TraceFilter

ErrorMatches returns a filter that matches events where the error message matches the glob pattern.

If the pattern is malformed, no events will match (returns false).

Example patterns:

  • "*timeout*" matches errors containing "timeout"
  • "connection *" matches errors starting with "connection "
  • "database error" matches exactly "database error"

func HasError added in v0.2.0

func HasError() TraceFilter

HasError returns a filter that matches events with errors.

func HasPathPrefix added in v0.2.0

func HasPathPrefix(prefix []string) TraceFilter

HasPathPrefix returns a filter that matches events where the full Names path has the given prefix.

Example:

// Match all steps under ["migrate", "tables"]
filter := flow.HasPathPrefix([]string{"migrate", "tables"})

func MaxDuration added in v0.2.0

func MaxDuration(d time.Duration) TraceFilter

MaxDuration returns a filter that matches events with duration <= d.

func MinDuration added in v0.2.0

func MinDuration(d time.Duration) TraceFilter

MinDuration returns a filter that matches events with duration >= d.

func NameMatches added in v0.2.0

func NameMatches(pattern string) TraceFilter

NameMatches returns a filter that matches events where the step name (last element of Names) matches the glob pattern.

Patterns use filepath.Match semantics—see its documentation for the full details of matching behavior (e.g., *, ?, and [...] for character sets).

If the pattern is malformed, no events will match (returns false).

func NamePrefix added in v0.2.0

func NamePrefix(prefix string) TraceFilter

NamePrefix returns a filter that matches events where the step name (last element of Names) has the given prefix.

func NoError added in v0.2.0

func NoError() TraceFilter

NoError returns a filter that matches events without errors.

func PathMatches added in v0.2.0

func PathMatches(pattern string) TraceFilter

PathMatches returns a filter that matches events where the full dotted path matches the glob pattern.

Patterns use filepath.Match semantics—see its documentation for the full details of matching behavior. The path is constructed by joining Names with "." separators.

If the pattern is malformed, no events will match (returns false).

func TimeRange added in v0.2.0

func TimeRange(start, end time.Time) TraceFilter

TimeRange returns a filter that matches events that started within the given time range.

Both start and end times are inclusive. Events are matched if their Start time falls between start and end.

Example:

// Find events that started in the first second
workflowStart := trace.Events()[0].Start
filter := flow.TimeRange(workflowStart, workflowStart.Add(time.Second))

type TraceOption added in v0.2.0

type TraceOption func(*traceOptions)

TraceOption configures trace behavior.

func WithStreamTo added in v0.2.0

func WithStreamTo(w io.Writer) TraceOption

WithStreamTo configures the trace to stream events as JSON Lines to the given writer.

Events are written in JSON Lines format (one event per line) as they complete, enabling real-time monitoring and ensuring traces are preserved even if the process crashes. This is different from WriteTo/WriteJSONTo which output a pretty-printed JSON array after execution completes.

All events are retained in memory for post-execution querying.

Write failures to the stream are best-effort and do not cause the workflow to fail. This ensures that tracing infrastructure never breaks the workflow itself.

Example:

f, _ := os.Create("trace.jsonl")
defer f.Close()
trace, err := flow.Traced(workflow, flow.WithStreamTo(f))(ctx, state)

// trace.jsonl contains one JSON object per line:
// {"step_names":["validate"],"start":"...","duration":45000000}
// {"step_names":["connect"],"start":"...","duration":120000000}
// ...

type Transform

type Transform[T any, In any, Out any] = func(context.Context, T, In) (Out, error)

A Transform is a failable, cancelable procedure that converts one input into an output.

The error type is used to indicate that the transform failed.

T is some kind of state upon which the procedure operates.

If using parallel steps, ensure that access to T is thread-safe.

func AutoNamedTransform

func AutoNamedTransform[T, A, B any](transform Transform[T, A, B], opts ...AutoNamedOption) Transform[T, A, B]

AutoNamedTransform wraps a Transform with a name automatically derived from the calling function.

When tracing is enabled (via Traced), AutoNamedTransform automatically records execution events, just like NamedTransform.

See AutoNamed for further details.

func Chain

func Chain[T, A, B, C any](
	first Transform[T, A, B],
	second Transform[T, B, C],
) Transform[T, A, C]

Chain composes two Transforms into a single Transform.

This creates a transformation pipeline where the output of the first transform becomes the input to the second. Both transforms have access to the same state T.

Example:

parseAndValidate := Chain(
    ParseJSON,    // Transform[*State, []byte, Config]
    ValidateConfig, // Transform[*State, Config, Config]
)                 // Transform[*State, []byte, Config]

func Chain3

func Chain3[T, A, B, C, D any](
	first Transform[T, A, B],
	second Transform[T, B, C],
	third Transform[T, C, D],
) Transform[T, A, D]

Chain3 composes three Transforms into a single Transform.

This extends Chain to handle three-stage transformation pipelines. Each transform has access to the same state T.

Example:

processData := Chain3(
    Parse,      // Transform[*State, []byte, RawData]
    Validate,   // Transform[*State, RawData, ValidData]
    Normalize,  // Transform[*State, ValidData, NormData]
)               // Transform[*State, []byte, NormData]

func Chain4

func Chain4[T, A, B, C, D, E any](
	first Transform[T, A, B],
	second Transform[T, B, C],
	third Transform[T, C, D],
	fourth Transform[T, D, E],
) Transform[T, A, E]

Chain4 composes four Transforms into a single Transform.

This extends Chain to handle four-stage transformation pipelines. Each transform has access to the same state T.

For longer pipelines, you can nest Chain functions:

Chain(
    Chain4(t1, t2, t3, t4),
    Chain4(t5, t6, t7, t8),
)

Example:

processRequest := Chain4(
    Decode,     // Transform[*State, []byte, Request]
    Validate,   // Transform[*State, Request, ValidReq]
    Enrich,     // Transform[*State, ValidReq, EnrichedReq]
    Sanitize,   // Transform[*State, EnrichedReq, SafeReq]
)               // Transform[*State, []byte, SafeReq]

func FallbackTo

func FallbackTo[T any](fallback Step[T]) Transform[T, error, Step[T]]

FallbackTo returns an error transform that always returns the provided fallback step.

This is useful with OnError when you unconditionally want to run a fallback step on error, regardless of the state or the original error.

func Map

func Map[T any, A any](
	f func(A) Step[T],
) Transform[T, []A, []Step[T]]

Map lifts a function from A to Step[T] into a Transform from []A to []Step[T].

Map is designed to compose with From for dynamic workflows:

flow.InSerial(
    flow.From(
        GetServices,                 // Extract[*Config, []string]
        flow.Map(DeployService), // Transform[*Config, []string, []Step[*Config]]
    ),                               // → StepsProvider[*Config]
)

The call to From results in a StepsProvider, the same type used by InSerial and InParallel. This allows one to switch such a loop from serial to concurrent processing simply by changing the function that wraps From.

Map can also be used with Chain to build more complex transformations:

flow.From(
    GetRawData,                      // Extract[*State, Data]
    flow.Chain(
        ParseItems,                  // Transform[*State, Data, []Item]
        flow.Map(ProcessItem),   // Transform[*State, []Item, []Step[*State]]
    ),
)

func NamedTransform

func NamedTransform[T any, In any, Out any](
	name string,
	transform Transform[T, In, Out],
) Transform[T, In, Out]

NamedTransform wraps a Transform with a name.

The name is prepended to the error message of the Transform, separated by a colon. For example, if the Transform returns an error "invalid config", the name is prepended to the error message: "example: invalid config".

When tracing is enabled (via Traced), NamedTransform automatically records execution events including timing and errors.

This is useful for debugging and logging.

func Render

func Render[T, In, Out any](f Transform[T, In, Out]) Transform[T, []In, []Out]

Render transforms each element in a slice (serial, fail-fast).

This completes the Transform algebra with collection support. It enables natural composition in From and Chain for processing slices.

Example:

// Multi-stage rendering pipeline
flow.Pipeline(
    GetRawData,                           // Extract[T, [][]byte]
    flow.Chain(
        Render(Parse),                    // []byte → []Record
        Render(Validate),                 // []Record → []Record
        Render(Enrich),                   // []Record → []EnrichedRecord
    ),
    Apply(Save),
)

// Simple transformation
flow.From(
    GetUserIDs,                           // Extract[T, []int]
    Render(LoadUser),                     // Transform[T, int, User]
)                                         // Extract[T, []User]

Directories

Path Synopsis
examples
config-driven command
data-pipeline command
static command
tracing command
Package main demonstrates basic tracing functionality in the flow library.
Package main demonstrates basic tracing functionality in the flow library.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL