Documentation
¶
Overview ¶
Package pool provides generic gRPC connection pooling with optional cluster-aware routing for Raft-based services.
Index ¶
- Constants
- Variables
- func DefaultDialOpts() []grpc.DialOption
- func IsRetryableCode(code codes.Code) bool
- type ClientFactory
- type ClusterOption
- type ClusterOptions
- type ClusterPool
- func (cp *ClusterPool[T]) AddNode(addr string)
- func (cp *ClusterPool[T]) Close() error
- func (cp *ClusterPool[T]) ExecuteOnAny(ctx context.Context, op func(client T) error) error
- func (cp *ClusterPool[T]) GetAny(ctx context.Context) (T, string, error)
- func (cp *ClusterPool[T]) RemoveNode(addr string)
- func (cp *ClusterPool[T]) UpdateNodes(addrs []string)
- type Option
- type Options
- type Pool
Constants ¶
const ( // Default configuration values DefaultDialTimeout = 5 * time.Second DefaultRequestTimeout = 10 * time.Second DefaultHealthCheckPeriod = 30 * time.Second DefaultMaxRetries = 3 DefaultConnsPerHost = 4 // Retry backoff configuration DefaultInitialBackoff = 100 * time.Millisecond DefaultMaxBackoff = 5 * time.Second DefaultBackoffJitter = 0.2 // 20% jitter // Transport-level retry configuration (go-grpc-middleware) DefaultTransportRetries = 3 DefaultPerRetryTimeout = 5 * time.Second DefaultTransportBackoff = 100 * time.Millisecond // KeepAlive settings matching proto/grpc.go KeepAliveTime = 60 * time.Second KeepAliveTimeout = 20 * time.Second // Max message size (1 GiB) MaxMessageSize = 1 << 30 )
Variables ¶
var DefaultRetryableCodes = []codes.Code{ codes.Unavailable, codes.ResourceExhausted, codes.DeadlineExceeded, codes.Aborted, }
DefaultRetryableCodes are gRPC status codes that should trigger a retry
Functions ¶
func DefaultDialOpts ¶
func DefaultDialOpts() []grpc.DialOption
DefaultDialOpts returns the default gRPC dial options with retry interceptor
func IsRetryableCode ¶
IsRetryableCode returns true if the given gRPC code is retryable This is a utility function for checking specific codes
Types ¶
type ClientFactory ¶
type ClientFactory[T any] func(cc grpc.ClientConnInterface) T
ClientFactory creates a gRPC client from a connection
type ClusterOption ¶
type ClusterOption func(*ClusterOptions)
ClusterOption is a functional option for configuring cluster pools
func WithBackoffJitter ¶
func WithBackoffJitter(jitter float64) ClusterOption
WithBackoffJitter sets the jitter fraction (0.0 to 1.0) for backoff
func WithBaseOptions ¶
func WithBaseOptions(opts ...Option) ClusterOption
WithBaseOptions applies base pool options to cluster options
func WithInitialBackoff ¶
func WithInitialBackoff(d time.Duration) ClusterOption
WithInitialBackoff sets the initial backoff duration for cluster-level retries
func WithMaxBackoff ¶
func WithMaxBackoff(d time.Duration) ClusterOption
WithMaxBackoff sets the maximum backoff duration for cluster-level retries
func WithRetryableCodes ¶
func WithRetryableCodes(codes ...codes.Code) ClusterOption
WithRetryableCodes sets which gRPC codes trigger cluster-level retry
type ClusterOptions ¶
type ClusterOptions struct {
Options
// SeedAddrs are initial addresses to connect to
SeedAddrs []string
// Cluster-level retry backoff configuration
// (separate from transport-level retry in dial options)
InitialBackoff time.Duration // Initial backoff between cluster-level retries
MaxBackoff time.Duration // Maximum backoff duration
BackoffJitter float64 // Jitter fraction (0.0 to 1.0)
// RetryableCodes defines which gRPC codes trigger cluster-level retry
RetryableCodes []codes.Code
}
ClusterOptions extends Options with cluster-aware settings
func DefaultClusterOptions ¶
func DefaultClusterOptions(seedAddrs []string) ClusterOptions
DefaultClusterOptions returns ClusterOptions with sensible defaults
type ClusterPool ¶
type ClusterPool[T any] struct { // contains filtered or unexported fields }
ClusterPool manages connections to a cluster of servers with automatic failover and retry logic. It maintains a list of known nodes and tries them in order on failures.
This is a simplified design that relies on server-side forwarding (e.g., LeaderForwarder) to route writes to the correct node. The client doesn't need to track which node is the leader.
func NewClusterPool ¶
func NewClusterPool[T any]( factory ClientFactory[T], opts ClusterOptions, ) *ClusterPool[T]
NewClusterPool creates a new cluster-aware connection pool.
func (*ClusterPool[T]) AddNode ¶
func (cp *ClusterPool[T]) AddNode(addr string)
AddNode adds a node to the known cluster nodes if not already present.
func (*ClusterPool[T]) Close ¶
func (cp *ClusterPool[T]) Close() error
Close closes all connections in the pool
func (*ClusterPool[T]) ExecuteOnAny ¶
func (cp *ClusterPool[T]) ExecuteOnAny(ctx context.Context, op func(client T) error) error
ExecuteOnAny executes an operation on any available node with retry and backoff. This is the primary method for both read and write operations. Server-side forwarding (LeaderForwarder) handles routing writes to the leader.
func (*ClusterPool[T]) GetAny ¶
func (cp *ClusterPool[T]) GetAny(ctx context.Context) (T, string, error)
GetAny returns a client connected to any available node. Tries nodes in order until one succeeds.
func (*ClusterPool[T]) RemoveNode ¶
func (cp *ClusterPool[T]) RemoveNode(addr string)
RemoveNode removes a node from the known cluster nodes.
func (*ClusterPool[T]) UpdateNodes ¶
func (cp *ClusterPool[T]) UpdateNodes(addrs []string)
UpdateNodes updates the list of known cluster nodes. This can be called when receiving topology updates from the cluster.
type Option ¶
type Option func(*Options)
Option is a functional option for configuring pools
func WithConnsPerHost ¶
WithConnsPerHost sets connections per host
func WithDialOpts ¶
func WithDialOpts(opts ...grpc.DialOption) Option
WithDialOpts sets additional dial options
func WithDialTimeout ¶
WithDialTimeout sets the dial timeout
func WithRequestTimeout ¶
WithRequestTimeout sets the request timeout
type Options ¶
type Options struct {
// DialTimeout is the timeout for establishing new connections
DialTimeout time.Duration
// RequestTimeout is the default timeout for requests
RequestTimeout time.Duration
// HealthCheckPeriod is how often to check connection health
HealthCheckPeriod time.Duration
// MaxRetries is the number of times to retry failed requests
MaxRetries int
// ConnsPerHost is the number of connections to maintain per host
ConnsPerHost int
// DialOpts are additional gRPC dial options
DialOpts []grpc.DialOption
}
Options configures a connection pool
func DefaultOptions ¶
func DefaultOptions() Options
DefaultOptions returns Options with sensible defaults
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool manages gRPC connections to multiple hosts. It provides lazy connection creation, connection reuse, and automatic cleanup.
func NewPool ¶
func NewPool[T any](factory ClientFactory[T], opts ...Option) *Pool[T]
NewPool creates a new connection pool with the given client factory