Documentation
¶
Overview ¶
Partition sources discover available partitions for assignment. The package includes:
- Static: Fixed list of partitions
Custom sources can be implemented by satisfying the types.PartitionSource interface.
Index ¶
- type NatsKV
- func (s *NatsKV) List(_ context.Context) ([]types.Partition, error)
- func (s *NatsKV) Start(ctx context.Context) error
- func (s *NatsKV) Stop(_ context.Context) error
- func (s *NatsKV) Update(ctx context.Context, partitions []types.Partition) error
- func (s *NatsKV) Watch(ctx context.Context) <-chan struct{}
- type Static
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type NatsKV ¶ added in v1.1.0
type NatsKV struct {
// contains filtered or unexported fields
}
NatsKV implements a partition source backed by a NATS KeyValue bucket.
It watches a specific key in the KV bucket for updates to the partition list. This allows dynamic partition updates without restarting workers.
func NewNatsKV ¶ added in v1.1.0
NewNatsKV creates a new NATS KV-based partition source.
Parameters:
- kv: The NATS KeyValue bucket to use
- key: The key where partitions are stored (JSON encoded)
- logger: Optional logger
func (*NatsKV) Start ¶ added in v1.1.0
Start initializes the source and starts watching for updates.
func (*NatsKV) Update ¶ added in v1.1.0
Update updates the partition list in the KV bucket.
Note: NATS KV buckets have a default value size limit of 1MB (MaxMsgSize). To support large partition lists (e.g., 2000+ partitions), this method automatically compresses the data using Gzip before storing it.
If the compressed size still exceeds the limit, the update will fail.
type Static ¶
type Static struct {
// contains filtered or unexported fields
}
Static implements a partition source with a fixed list of partitions.
func NewStatic ¶
NewStatic creates a new static partition source.
The source returns a fixed list of partitions that never changes. Useful for testing and scenarios where partitions are known at startup.
Parameters:
- partitions: Fixed list of partitions
Returns:
- *Static: Initialized static source
Example:
partitions := []types.Partition{
{Keys: []string{"tool001", "chamber1"}, Weight: 100},
{Keys: []string{"tool001", "chamber2"}, Weight: 150},
}
src := source.NewStatic(partitions)
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash())
if err != nil { /* handle */ }
func (*Static) List ¶ added in v1.1.0
List returns the static list of partitions.
Returns:
- []types.Partition: The fixed list of partitions
- error: Always nil (never fails)
func (*Static) Start ¶ added in v1.1.0
Start implements PartitionSource.Start. For Static source, it validates the static partitions.
func (*Static) Stop ¶ added in v1.1.0
Stop implements PartitionSource.Stop. For Static source, this is a no-op.
func (*Static) Update ¶
Update updates the partition list.
This allows the static source to simulate dynamic partition changes, which is useful for testing partition refresh scenarios.
Parameters:
- ctx: Context for the operation (unused)
- partitions: New list of partitions
Returns:
- error: Always nil
Example:
src := source.NewStatic(initialPartitions) // Later: add more partitions src.Update(context.Background(), expandedPartitions)