combiner

package
v0.0.0-...-9915564 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 4 Imported by: 1

Documentation

Overview

combiner package implements a combiner queue that allows pushing job items to a background worker that combines multiple job items into a single batch.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Combiner

type Combiner[Job any] struct {
	// contains filtered or unexported fields
}

Combiner combines multiple jobs for a single worker to process in batches.

func New

func New[Job any](
	parent context.Context,
	options Options[Job],
) *Combiner[Job]

New creates a new combiner queue.

Parent context is passed to the job processing as the context. process is used to process work items and fail is called for jobs when they need to be aborted, due to context cancellation. queueSize is used to create new queues, queueSize = -1 means the queue is unbounded.

func (*Combiner[Job]) Close

func (combiner *Combiner[Job]) Close()

Close shuts down all workers.

func (*Combiner[Job]) Enqueue

func (combiner *Combiner[Job]) Enqueue(ctx context.Context, job Job)

Enqueue adds a new job to the queue.

func (*Combiner[Job]) Stop

func (combiner *Combiner[Job]) Stop()

Stop prevents new worker from being started, without canceling existing jobs.

func (*Combiner[Job]) Wait

func (combiner *Combiner[Job]) Wait(ctx context.Context) error

Wait waits for the active workers to be completed.

type Options

type Options[Job any] struct {
	Process   ProcessFunc[Job]
	Fail      ProcessFunc[Job]
	QueueSize int
}

Options is for configuring the combiner queue.

type ProcessFunc

type ProcessFunc[Job any] func(ctx context.Context, queue *Queue[Job])

ProcessFunc processes a queue of jobs.

type Queue

type Queue[Job any] struct {
	// contains filtered or unexported fields
}

Queue is a finalizable list of jobs with a limit to how many jobs it can handle.

func NewQueue

func NewQueue[Job any](maxJobsPerBatch int) *Queue[Job]

NewQueue returns a new limited job queue.

func (*Queue[Job]) Batches

func (jobs *Queue[Job]) Batches() iter.Seq[[]Job]

Batches iterates over batches until all done.

The iterator slice should not be used outside of the loop.

func (*Queue[Job]) Completed

func (jobs *Queue[Job]) Completed() bool

Completed returns true when the queue does not accept any new jobs and all the jobs have been completed.

func (*Queue[Job]) PopAll

func (jobs *Queue[Job]) PopAll() (_ []Job, ok bool)

PopAll returns all the jobs in this list.

When there's no more items to be pulled, the queue automatically closes.

func (*Queue[Job]) TryPush

func (jobs *Queue[Job]) TryPush(job Job) bool

TryPush tries to add a job to the queue and returns false when the queue does not accept new jobs.

maxJobsPerBatch < 0, means no limit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL