source

package
v1.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Partition sources discover available partitions for assignment. The package includes:

  • Static: Fixed list of partitions

Custom sources can be implemented by satisfying the types.PartitionSource interface.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type NatsKV added in v1.1.0

type NatsKV struct {
	// contains filtered or unexported fields
}

NatsKV implements a partition source backed by a NATS KeyValue bucket.

It watches a specific key in the KV bucket for updates to the partition list. This allows dynamic partition updates without restarting workers.

func NewNatsKV added in v1.1.0

func NewNatsKV(kv jetstream.KeyValue, key string, logger types.Logger) *NatsKV

NewNatsKV creates a new NATS KV-based partition source.

Parameters:

  • kv: The NATS KeyValue bucket to use
  • key: The key where partitions are stored (JSON encoded)
  • logger: Optional logger

func (*NatsKV) List added in v1.1.0

func (s *NatsKV) List(_ context.Context) ([]types.Partition, error)

List returns the current list of partitions.

func (*NatsKV) Start added in v1.1.0

func (s *NatsKV) Start(ctx context.Context) error

Start initializes the source and starts watching for updates.

func (*NatsKV) Stop added in v1.1.0

func (s *NatsKV) Stop(_ context.Context) error

Stop stops the watcher.

func (*NatsKV) Update added in v1.1.0

func (s *NatsKV) Update(ctx context.Context, partitions []types.Partition) error

Update updates the partition list in the KV bucket.

Note: NATS KV buckets have a default value size limit of 1MB (MaxMsgSize). To support large partition lists (e.g., 2000+ partitions), this method automatically compresses the data using Gzip before storing it.

If the compressed size still exceeds the limit, the update will fail.

func (*NatsKV) Watch added in v1.3.0

func (s *NatsKV) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that emits a signal when the partition list changes.

type Static

type Static struct {
	// contains filtered or unexported fields
}

Static implements a partition source with a fixed list of partitions.

func NewStatic

func NewStatic(partitions []types.Partition) *Static

NewStatic creates a new static partition source.

The source returns a fixed list of partitions that never changes. Useful for testing and scenarios where partitions are known at startup.

Parameters:

  • partitions: Fixed list of partitions

Returns:

  • *Static: Initialized static source

Example:

partitions := []types.Partition{
    {Keys: []string{"tool001", "chamber1"}, Weight: 100},
    {Keys: []string{"tool001", "chamber2"}, Weight: 150},
}
src := source.NewStatic(partitions)
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash())
if err != nil { /* handle */ }

func (*Static) List added in v1.1.0

func (s *Static) List(_ context.Context) ([]types.Partition, error)

List returns the static list of partitions.

Returns:

  • []types.Partition: The fixed list of partitions
  • error: Always nil (never fails)

func (*Static) Start added in v1.1.0

func (s *Static) Start(_ context.Context) error

Start implements PartitionSource.Start. For Static source, it validates the static partitions.

func (*Static) Stop added in v1.1.0

func (s *Static) Stop(_ context.Context) error

Stop implements PartitionSource.Stop. For Static source, this is a no-op.

func (*Static) Update

func (s *Static) Update(_ context.Context, partitions []types.Partition) error

Update updates the partition list.

This allows the static source to simulate dynamic partition changes, which is useful for testing partition refresh scenarios.

Parameters:

  • ctx: Context for the operation (unused)
  • partitions: New list of partitions

Returns:

  • error: Always nil

Example:

src := source.NewStatic(initialPartitions)
// Later: add more partitions
src.Update(context.Background(), expandedPartitions)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL