pool

package
v0.0.0-...-61396ba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package pool provides generic gRPC connection pooling with optional cluster-aware routing for Raft-based services.

Index

Constants

View Source
const (
	// Default configuration values
	DefaultDialTimeout       = 5 * time.Second
	DefaultRequestTimeout    = 10 * time.Second
	DefaultHealthCheckPeriod = 30 * time.Second
	DefaultMaxRetries        = 3
	DefaultConnsPerHost      = 4

	// Retry backoff configuration
	DefaultInitialBackoff = 100 * time.Millisecond
	DefaultMaxBackoff     = 5 * time.Second
	DefaultBackoffJitter  = 0.2 // 20% jitter

	// Transport-level retry configuration (go-grpc-middleware)
	DefaultTransportRetries = 3
	DefaultPerRetryTimeout  = 5 * time.Second
	DefaultTransportBackoff = 100 * time.Millisecond

	// KeepAlive settings matching proto/grpc.go
	KeepAliveTime    = 60 * time.Second
	KeepAliveTimeout = 20 * time.Second

	// Max message size (1 GiB)
	MaxMessageSize = 1 << 30
)

Variables

DefaultRetryableCodes are gRPC status codes that should trigger a retry

Functions

func DefaultDialOpts

func DefaultDialOpts() []grpc.DialOption

DefaultDialOpts returns the default gRPC dial options with retry interceptor

func IsRetryableCode

func IsRetryableCode(code codes.Code) bool

IsRetryableCode returns true if the given gRPC code is retryable This is a utility function for checking specific codes

Types

type ClientFactory

type ClientFactory[T any] func(cc grpc.ClientConnInterface) T

ClientFactory creates a gRPC client from a connection

type ClusterOption

type ClusterOption func(*ClusterOptions)

ClusterOption is a functional option for configuring cluster pools

func WithBackoffJitter

func WithBackoffJitter(jitter float64) ClusterOption

WithBackoffJitter sets the jitter fraction (0.0 to 1.0) for backoff

func WithBaseOptions

func WithBaseOptions(opts ...Option) ClusterOption

WithBaseOptions applies base pool options to cluster options

func WithInitialBackoff

func WithInitialBackoff(d time.Duration) ClusterOption

WithInitialBackoff sets the initial backoff duration for cluster-level retries

func WithMaxBackoff

func WithMaxBackoff(d time.Duration) ClusterOption

WithMaxBackoff sets the maximum backoff duration for cluster-level retries

func WithRetryableCodes

func WithRetryableCodes(codes ...codes.Code) ClusterOption

WithRetryableCodes sets which gRPC codes trigger cluster-level retry

type ClusterOptions

type ClusterOptions struct {
	Options

	// SeedAddrs are initial addresses to connect to
	SeedAddrs []string

	// Cluster-level retry backoff configuration
	// (separate from transport-level retry in dial options)
	InitialBackoff time.Duration // Initial backoff between cluster-level retries
	MaxBackoff     time.Duration // Maximum backoff duration
	BackoffJitter  float64       // Jitter fraction (0.0 to 1.0)

	// RetryableCodes defines which gRPC codes trigger cluster-level retry
	RetryableCodes []codes.Code
}

ClusterOptions extends Options with cluster-aware settings

func DefaultClusterOptions

func DefaultClusterOptions(seedAddrs []string) ClusterOptions

DefaultClusterOptions returns ClusterOptions with sensible defaults

type ClusterPool

type ClusterPool[T any] struct {
	// contains filtered or unexported fields
}

ClusterPool manages connections to a cluster of servers with automatic failover and retry logic. It maintains a list of known nodes and tries them in order on failures.

This is a simplified design that relies on server-side forwarding (e.g., LeaderForwarder) to route writes to the correct node. The client doesn't need to track which node is the leader.

func NewClusterPool

func NewClusterPool[T any](
	factory ClientFactory[T],
	opts ClusterOptions,
) *ClusterPool[T]

NewClusterPool creates a new cluster-aware connection pool.

func (*ClusterPool[T]) AddNode

func (cp *ClusterPool[T]) AddNode(addr string)

AddNode adds a node to the known cluster nodes if not already present.

func (*ClusterPool[T]) Close

func (cp *ClusterPool[T]) Close() error

Close closes all connections in the pool

func (*ClusterPool[T]) ExecuteOnAny

func (cp *ClusterPool[T]) ExecuteOnAny(ctx context.Context, op func(client T) error) error

ExecuteOnAny executes an operation on any available node with retry and backoff. This is the primary method for both read and write operations. Server-side forwarding (LeaderForwarder) handles routing writes to the leader.

func (*ClusterPool[T]) GetAny

func (cp *ClusterPool[T]) GetAny(ctx context.Context) (T, string, error)

GetAny returns a client connected to any available node. Tries nodes in order until one succeeds.

func (*ClusterPool[T]) RemoveNode

func (cp *ClusterPool[T]) RemoveNode(addr string)

RemoveNode removes a node from the known cluster nodes.

func (*ClusterPool[T]) UpdateNodes

func (cp *ClusterPool[T]) UpdateNodes(addrs []string)

UpdateNodes updates the list of known cluster nodes. This can be called when receiving topology updates from the cluster.

type Option

type Option func(*Options)

Option is a functional option for configuring pools

func WithConnsPerHost

func WithConnsPerHost(n int) Option

WithConnsPerHost sets connections per host

func WithDialOpts

func WithDialOpts(opts ...grpc.DialOption) Option

WithDialOpts sets additional dial options

func WithDialTimeout

func WithDialTimeout(d time.Duration) Option

WithDialTimeout sets the dial timeout

func WithMaxRetries

func WithMaxRetries(n int) Option

WithMaxRetries sets the max retries

func WithRequestTimeout

func WithRequestTimeout(d time.Duration) Option

WithRequestTimeout sets the request timeout

type Options

type Options struct {
	// DialTimeout is the timeout for establishing new connections
	DialTimeout time.Duration

	// RequestTimeout is the default timeout for requests
	RequestTimeout time.Duration

	// HealthCheckPeriod is how often to check connection health
	HealthCheckPeriod time.Duration

	// MaxRetries is the number of times to retry failed requests
	MaxRetries int

	// ConnsPerHost is the number of connections to maintain per host
	ConnsPerHost int

	// DialOpts are additional gRPC dial options
	DialOpts []grpc.DialOption
}

Options configures a connection pool

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns Options with sensible defaults

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool manages gRPC connections to multiple hosts. It provides lazy connection creation, connection reuse, and automatic cleanup.

func NewPool

func NewPool[T any](factory ClientFactory[T], opts ...Option) *Pool[T]

NewPool creates a new connection pool with the given client factory

func (*Pool[T]) Addresses

func (p *Pool[T]) Addresses() []string

Addresses returns all addresses in the pool

func (*Pool[T]) Close

func (p *Pool[T]) Close() error

Close closes all connections in the pool

func (*Pool[T]) Get

func (p *Pool[T]) Get(ctx context.Context, address string) (T, error)

Get returns a client for the given address. Creates connections lazily if they don't exist.

func (*Pool[T]) Remove

func (p *Pool[T]) Remove(address string)

Remove removes all connections for an address

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL