Documentation
¶
Overview ¶
Package conn provides generic database session and connection management. It is designed to be reusable across multiple Go projects.
Index ¶
- Constants
- Variables
- func DefaultGormOpener(dialector gorm.Dialector, config *gorm.Config) (*gorm.DB, error)
- type Config
- type ConfigOption
- func WithConnectTimeout(seconds int) ConfigOption
- func WithConnectionPool(maxOpen, maxIdle, maxLifetime int) ConfigOption
- func WithCredentials(user, password string) ConfigOption
- func WithDatabase(dbName string) ConfigOption
- func WithEnvVars(role string) ConfigOption
- func WithEnvVarsWithPrefix(role, prefix string) ConfigOption
- func WithGormCallbackRegistrar(registrar GormCallbackRegistrar) ConfigOption
- func WithGormOpener(opener GormOpener) ConfigOption
- func WithGracefulCloseTimeout(seconds int) ConfigOption
- func WithHost(host string) ConfigOption
- func WithLogLevel(level LogLevel) ConfigOption
- func WithPort(port int) ConfigOption
- func WithRLS(enableRLS bool) ConfigOption
- func WithRLSSessionVars(vars []rlsconn.SessionVar) ConfigOption
- func WithRetry(enableRetry bool, maxRetries, retryBackoffMs int) ConfigOption
- func WithRole(role string) ConfigOption
- func WithSSLMode(sslMode string) ConfigOption
- func WithSilentGormLogger() ConfigOption
- func WithSpanStarter(starter SpanStarter) ConfigOption
- func WithoutTrace() ConfigOption
- type GormCallbackRegistrar
- type GormOpener
- type LogLevel
- type QueryDB
- func (q *QueryDB) BindNamed(query string, arg interface{}) (string, []interface{}, error)
- func (q *QueryDB) NamedExecContext(ctx context.Context, query string, arg interface{}) (result sql.Result, err error)
- func (q *QueryDB) NamedQueryContext(ctx context.Context, query string, arg interface{}) (rows *sqlx.Rows, err error)
- func (q *QueryDB) NamedQueryRowContext(ctx context.Context, query string, arg interface{}) (row *sql.Row, err error)
- func (q *QueryDB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
- func (q *QueryDB) Refresh(newExtContext sqlx.ExtContext)
- type ReaderSession
- func (s *ReaderSession) Close() error
- func (s *ReaderSession) DB() *gorm.DB
- func (s *ReaderSession) GetDBOrTx(ctx context.Context) *gorm.DB
- func (s *ReaderSession) GetPoolStats() map[string]any
- func (s *ReaderSession) Logger() *slog.Logger
- func (s *ReaderSession) QueryDB() *QueryDB
- func (s *ReaderSession) ResetConnection(ctx context.Context) error
- func (s *ReaderSession) String() string
- type Session
- func (s *Session) Close() error
- func (s *Session) Connect() (*Session, error)
- func (s *Session) DB() *gorm.DB
- func (s *Session) GetCurrentTenantID(ctx context.Context) (string, error)
- func (s *Session) GetCurrentUserID(ctx context.Context) (string, error)
- func (s *Session) GetDBOrTx(ctx context.Context) *gorm.DB
- func (s *Session) GetPoolStats() map[string]interface{}
- func (s *Session) GetTx(ctx context.Context) (*gorm.DB, error)
- func (s *Session) HasTx(ctx context.Context) bool
- func (s *Session) Logger() *slog.Logger
- func (s *Session) QueryDB() *QueryDB
- func (s *Session) ResetConnection(ctx context.Context) error
- func (s *Session) Role() string
- func (s *Session) String() string
- func (s *Session) Transaction(ctx context.Context, fn func(txCtx context.Context) error) error
- type SessionManager
- type Sessioner
- type SpanStarter
- type TestRLSSession
- func (ts *TestRLSSession) Cleanup()
- func (ts *TestRLSSession) ExecAsSuperuser(ctx context.Context, sql string, args ...any)
- func (ts *TestRLSSession) ExecAsSuperuserWithCleanup(ctx context.Context, sql, cleanupSQL string, args ...any)
- func (ts *TestRLSSession) RLSDB() *gorm.DB
- func (ts *TestRLSSession) RLSSession() *Session
- func (ts *TestRLSSession) RegisterCleanup(fn func())
- func (ts *TestRLSSession) SuperuserDB() *gorm.DB
- func (ts *TestRLSSession) SuperuserSession() *Session
- type TestSession
- func (s *TestSession) Cleanup()
- func (s *TestSession) Context() context.Context
- func (s *TestSession) DB() *gorm.DB
- func (s *TestSession) GetDBOrTx(ctx context.Context) *gorm.DB
- func (s *TestSession) GetQueryDBOrTx() (*QueryDB, error)
- func (s *TestSession) QueryDB() *QueryDB
- func (s *TestSession) Reader() *ReaderSession
- func (s *TestSession) Session() *Session
- func (s *TestSession) SetRLSContext(vars map[string]string, ...) context.Context
- func (s *TestSession) Writer() *WriterSession
- type TransactionManager
- type TransactionManagerOption
- type WriterSession
Examples ¶
Constants ¶
const ( LogLevelSilent = logger.Silent LogLevelError = logger.Error LogLevelWarn = logger.Warn LogLevelInfo = logger.Info )
Log level constants from gorm/logger
Variables ¶
var ( TypeConnection errorsx.ErrorType = "db.connection" TypeTransaction errorsx.ErrorType = "db.transaction" TypeInitialization errorsx.ErrorType = "db.initialization" ErrConnection = errorsx.New("database connection error", errorsx.WithType(TypeConnection), errorsx.WithHTTPStatus(500)) ErrTransaction = errorsx.New("database transaction error", errorsx.WithType(TypeTransaction), errorsx.WithHTTPStatus(500)) ErrInitialization = errorsx.New("database initialization error", errorsx.WithType(TypeInitialization), errorsx.WithHTTPStatus(500)) )
Error types for database operations
Functions ¶
Types ¶
type Config ¶
type Config struct {
Role string // "writer" or "reader"
Host string
Port int
User string
Password string
DBName string
MaxOpenConns int // Maximum open connections (0 = unlimited)
MaxIdleConns int // Maximum idle connections (default: 2)
ConnMaxLifetime int // Maximum connection lifetime in minutes (0 = unlimited)
// SSL settings
SSLMode string // SSL mode (default: "disable")
// Retry settings
EnableRetry bool // Enable retry on transient errors
MaxRetries int // Maximum number of retries
RetryBackoff int // Retry backoff interval in milliseconds
// Connection timeout settings
ConnectTimeout int // TCP connection timeout in seconds (default: 10)
// Connection reset settings
GracefulCloseTimeout int // Graceful close timeout in seconds (0 = default 10s)
// Log level settings
LogLevel *LogLevel // GORM log level (nil defaults to Info)
// RLS settings
EnableRLS bool // Enable Row-Level Security (default: true)
// RLSSessionVars is a list of session variables to set for RLS.
// Example: []rlsconn.SessionVar{{Name: "app.current_tenant_id", ValueGetter: GetTenantIDFromContext}}
RLSSessionVars []rlsconn.SessionVar
// Logger is the slog logger for database operations.
Logger *slog.Logger
// GormOpener is an optional custom GORM opener (e.g., for DataDog APM tracing).
// If nil, DefaultGormOpener is used.
GormOpener GormOpener
// GormCallbackRegistrar is an optional callback registrar (e.g., for DataDog APM resource naming).
// If nil, no callbacks are registered.
GormCallbackRegistrar GormCallbackRegistrar
// SpanStarter is an optional span starter for QueryDB operations.
// If nil, no spans are created.
SpanStarter SpanStarter
// contains filtered or unexported fields
}
Config holds database connection settings.
func NewConfig ¶
func NewConfig(opts ...ConfigOption) *Config
NewConfig creates a new Config by applying the given options to default values.
Examples:
// Basic usage
cfg := NewConfig(
WithHost("localhost"),
WithPort(5432),
WithCredentials("user", "password"),
WithDatabase("mydb"),
)
// Load from environment variables
cfg := NewConfig(WithEnvVars("writer"))
Example ¶
package main
import (
"github.com/hacomono-lib/go-pgkit/conn"
)
func main() {
// Basic usage
_ = conn.NewConfig(
conn.WithHost("localhost"),
conn.WithPort(5432),
conn.WithCredentials("user", "password"),
conn.WithDatabase("mydb"),
)
// Load from environment variables (writer role) + silent logging for production
_ = conn.NewConfig(
conn.WithEnvVars("writer"),
conn.WithSilentGormLogger(),
)
// Test configuration
_ = conn.NewConfig(
conn.WithHost("localhost"),
conn.WithPort(5432),
conn.WithDatabase("test_db"),
conn.WithConnectionPool(10, 2, 30),
)
// Disable tracing for high-frequency SQL operations
_ = conn.NewConfig(
conn.WithEnvVars("writer"),
conn.WithSilentGormLogger(),
conn.WithoutTrace(),
)
}
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a function type for configuring Config.
func WithConnectTimeout ¶
func WithConnectTimeout(seconds int) ConfigOption
WithConnectTimeout sets the TCP connection timeout in seconds. Default: 10 seconds.
func WithConnectionPool ¶
func WithConnectionPool(maxOpen, maxIdle, maxLifetime int) ConfigOption
WithConnectionPool configures connection pool settings. maxLifetime is in minutes.
func WithCredentials ¶
func WithCredentials(user, password string) ConfigOption
WithCredentials sets the database user and password.
func WithDatabase ¶
func WithDatabase(dbName string) ConfigOption
WithDatabase sets the database name.
func WithEnvVars ¶
func WithEnvVars(role string) ConfigOption
WithEnvVars loads configuration from environment variables with the default "DB_" prefix. When a role is specified, role-specific variables (e.g., DB_WRITER_HOST) take precedence. Only non-empty environment variables override defaults; unset variables are ignored.
func WithEnvVarsWithPrefix ¶
func WithEnvVarsWithPrefix(role, prefix string) ConfigOption
WithEnvVarsWithPrefix loads configuration from environment variables with a custom prefix. For example, WithEnvVarsWithPrefix("writer", "MYAPP_") reads MYAPP_HOST, MYAPP_WRITER_HOST, etc.
func WithGormCallbackRegistrar ¶
func WithGormCallbackRegistrar(registrar GormCallbackRegistrar) ConfigOption
WithGormCallbackRegistrar sets a GORM callback registration function.
func WithGormOpener ¶
func WithGormOpener(opener GormOpener) ConfigOption
WithGormOpener sets a custom GORM opener function.
func WithGracefulCloseTimeout ¶
func WithGracefulCloseTimeout(seconds int) ConfigOption
WithGracefulCloseTimeout sets the timeout for graceful connection close on reset.
func WithLogLevel ¶
func WithLogLevel(level LogLevel) ConfigOption
WithLogLevel sets the GORM log level. Valid values: LogLevelSilent, LogLevelError, LogLevelWarn, LogLevelInfo.
func WithRLS ¶
func WithRLS(enableRLS bool) ConfigOption
WithRLS enables or disables Row-Level Security.
func WithRLSSessionVars ¶
func WithRLSSessionVars(vars []rlsconn.SessionVar) ConfigOption
WithRLSSessionVars sets the RLS session variables.
func WithRetry ¶
func WithRetry(enableRetry bool, maxRetries, retryBackoffMs int) ConfigOption
WithRetry configures retry behavior.
func WithRole ¶
func WithRole(role string) ConfigOption
WithRole sets the connection role ("writer" or "reader").
func WithSSLMode ¶
func WithSSLMode(sslMode string) ConfigOption
WithSSLMode sets the SSL mode for the database connection. Valid values: "disable", "require", "verify-ca", "verify-full", "prefer", "allow". Default: "disable".
func WithSilentGormLogger ¶
func WithSilentGormLogger() ConfigOption
WithSilentGormLogger disables GORM log output. Useful for production environments to suppress log noise.
func WithSpanStarter ¶
func WithSpanStarter(starter SpanStarter) ConfigOption
WithSpanStarter sets the span starter for QueryDB tracing.
func WithoutTrace ¶
func WithoutTrace() ConfigOption
WithoutTrace disables all tracing hooks (GormOpener, GormCallbackRegistrar, SpanStarter).
type GormCallbackRegistrar ¶
GormCallbackRegistrar is a function that registers GORM callbacks. This allows for custom callback registration (e.g., DataDog APM resource naming).
type GormOpener ¶
GormOpener is a function that opens a GORM database connection. This allows for custom initialization logic (e.g., DataDog APM tracing).
type LogLevel ¶
LogLevel represents the log level for the database logger. Re-exported from gorm/logger for convenience.
type QueryDB ¶
type QueryDB struct {
// SpanStarter is called to start a span for SQL operations.
// If nil, no spans are created.
SpanStarter SpanStarter
// contains filtered or unexported fields
}
QueryDB wraps sqlx query operations and provides named parameter methods.
func NewQueryDB ¶
func NewQueryDB(extContext sqlx.ExtContext, role string, spanStarter SpanStarter) *QueryDB
NewQueryDB creates a new QueryDB with the given parameters. This is useful for testing.
func (*QueryDB) NamedExecContext ¶
func (q *QueryDB) NamedExecContext(ctx context.Context, query string, arg interface{}) (result sql.Result, err error)
NamedExecContext executes a named query that does not return rows.
func (*QueryDB) NamedQueryContext ¶
func (q *QueryDB) NamedQueryContext(ctx context.Context, query string, arg interface{}) (rows *sqlx.Rows, err error)
NamedQueryContext executes a named query that returns rows.
func (*QueryDB) NamedQueryRowContext ¶
func (q *QueryDB) NamedQueryRowContext(ctx context.Context, query string, arg interface{}) (row *sql.Row, err error)
NamedQueryRowContext executes a named query that returns a single row. Like QueryRowContext, the result set is automatically closed, so it is safe to use within transactions.
func (*QueryDB) QueryRowContext ¶
QueryRowContext executes a query that returns a single row. Note: sql.Row is lazily evaluated — errors are not known until Scan() is called, so the span is finished immediately (error tracking is not possible here).
func (*QueryDB) Refresh ¶
func (q *QueryDB) Refresh(newExtContext sqlx.ExtContext)
Refresh replaces the internal ExtContext with a new one.
type ReaderSession ¶
type ReaderSession struct {
// contains filtered or unexported fields
}
ReaderSession is a read-only session.
func NewReaderSession ¶
func NewReaderSession(config *Config) (*ReaderSession, error)
NewReaderSession creates a new ReaderSession.
func (*ReaderSession) Close ¶
func (s *ReaderSession) Close() error
Close closes the reader session.
func (*ReaderSession) DB ¶
func (s *ReaderSession) DB() *gorm.DB
DB returns the GORM DB for read operations.
func (*ReaderSession) GetDBOrTx ¶
func (s *ReaderSession) GetDBOrTx(ctx context.Context) *gorm.DB
GetDBOrTx returns a context-aware DB connection. ReaderSession does not support transactions, but needs context for RLS session variables.
func (*ReaderSession) GetPoolStats ¶
func (s *ReaderSession) GetPoolStats() map[string]any
GetPoolStats returns connection pool statistics.
func (*ReaderSession) Logger ¶
func (s *ReaderSession) Logger() *slog.Logger
Logger returns the session logger.
func (*ReaderSession) QueryDB ¶
func (s *ReaderSession) QueryDB() *QueryDB
QueryDB returns the QueryDB for read operations.
func (*ReaderSession) ResetConnection ¶
func (s *ReaderSession) ResetConnection(ctx context.Context) error
ResetConnection resets the database connection.
func (*ReaderSession) String ¶
func (s *ReaderSession) String() string
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session provides database connection and transaction functionality.
func (*Session) DB ¶
DB returns the GORM DB instance for use outside transactions. Note: the returned instance may reference a stale connection after ResetConnection.
func (*Session) GetCurrentTenantID ¶
GetCurrentTenantID retrieves the tenant ID from the PostgreSQL session variable (for debugging). Returns an empty string if not set.
func (*Session) GetCurrentUserID ¶
GetCurrentUserID retrieves the user ID from the PostgreSQL session variable (for debugging). Returns an empty string if not set.
func (*Session) GetDBOrTx ¶
GetDBOrTx returns the transaction from the context if one is set, otherwise the base DB connection. In both cases, WithContext(ctx) is applied so that the latest context (carrying RLS session variables) is propagated.
func (*Session) GetPoolStats ¶
GetPoolStats returns connection pool statistics for monitoring/debugging.
func (*Session) GetTx ¶
GetTx retrieves the transaction from the context. Returns an error if no transaction is set.
func (*Session) ResetConnection ¶
ResetConnection closes and recreates the database connection. Uses a write lock to prevent concurrent resets and a cooldown to avoid rapid successive resets.
func (*Session) Transaction ¶
Transaction starts a transaction and executes fn within it. On success, the transaction is committed. On error, it is rolled back. On panic, the transaction is rolled back and the panic is re-raised.
This method wraps GORM's transaction management to provide:
- Stack traces via errorsx
- Detailed error information (including rollback failures)
- Application-specific context management
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager manages Writer and Reader sessions.
func NewSessionManager ¶
func NewSessionManager(writerConfig, readerConfig *Config) (*SessionManager, error)
NewSessionManager creates a SessionManager from writer and reader configs.
func (*SessionManager) Reader ¶
func (sm *SessionManager) Reader() *ReaderSession
Reader returns the ReaderSession.
func (*SessionManager) String ¶
func (sm *SessionManager) String() string
func (*SessionManager) Writer ¶
func (sm *SessionManager) Writer() *WriterSession
Writer returns the WriterSession.
type Sessioner ¶
Sessioner is the common interface for ReaderSession and WriterSession. Used as a type parameter in generic reader implementations.
type SpanStarter ¶
type SpanStarter func(ctx context.Context, operationType, query string, role string) (finish func(err error), newCtx context.Context)
SpanStarter is a function that starts a span for SQL operations. It returns a finish function that should be called when the operation completes.
type TestRLSSession ¶
type TestRLSSession struct {
// contains filtered or unexported fields
}
TestRLSSession provides a test session for verifying the RLS implementation itself.
Manages two sessions:
- superuserSession: for data setup/cleanup (bypasses RLS)
- rlsSession: for verifying RLS behavior (RLS enforced, same flow as production)
Usage:
func TestRLSImplementation(t *testing.T) {
ts := conn.NewTestRLSSession(t, superuserConfig, rlsConfig)
defer ts.Cleanup()
// Setup test data (as superuser, bypasses RLS)
ts.ExecAsSuperuser(ctx, "INSERT INTO tenants ...")
// Verify RLS (production-equivalent is_local=false flow)
var count int64
err := ts.RLSSession().DB().WithContext(ctxWithTenant).
Table("products").Count(&count).Error
}
func NewTestRLSSession ¶
func NewTestRLSSession(t *testing.T, superuserConfig, rlsConfig *Config) *TestRLSSession
NewTestRLSSession creates a new RLS test session. Accepts both a superuser config (for RLS bypass) and an RLS config (for RLS enforcement).
func (*TestRLSSession) Cleanup ¶
func (ts *TestRLSSession) Cleanup()
Cleanup runs registered cleanup functions and closes both sessions.
func (*TestRLSSession) ExecAsSuperuser ¶
func (ts *TestRLSSession) ExecAsSuperuser(ctx context.Context, sql string, args ...any)
ExecAsSuperuser executes arbitrary SQL using the superuser session.
func (*TestRLSSession) ExecAsSuperuserWithCleanup ¶
func (ts *TestRLSSession) ExecAsSuperuserWithCleanup(ctx context.Context, sql, cleanupSQL string, args ...any)
ExecAsSuperuserWithCleanup executes SQL using the superuser session and registers a cleanup SQL statement to be executed on test teardown.
func (*TestRLSSession) RLSDB ¶
func (ts *TestRLSSession) RLSDB() *gorm.DB
RLSDB returns the GORM DB for the RLS-enforced session.
func (*TestRLSSession) RLSSession ¶
func (ts *TestRLSSession) RLSSession() *Session
RLSSession returns the session with RLS enforced.
func (*TestRLSSession) RegisterCleanup ¶
func (ts *TestRLSSession) RegisterCleanup(fn func())
RegisterCleanup registers a cleanup function. Registered functions are executed in reverse order (LIFO).
func (*TestRLSSession) SuperuserDB ¶
func (ts *TestRLSSession) SuperuserDB() *gorm.DB
SuperuserDB returns the GORM DB for the superuser session.
func (*TestRLSSession) SuperuserSession ¶
func (ts *TestRLSSession) SuperuserSession() *Session
SuperuserSession returns the session that bypasses RLS.
type TestSession ¶
type TestSession struct {
// contains filtered or unexported fields
}
TestSession provides a test-scoped session with transaction isolation. All operations run inside a transaction that is rolled back on cleanup, leaving the database unchanged.
Usage:
func TestMyFeature(t *testing.T) {
ts := conn.NewTestSession(t, config)
defer ts.Cleanup()
ctx := ts.Context()
err := ts.DB().WithContext(ctx).Table("products").Find(&products).Error
}
func NewTestSession ¶
func NewTestSession(t *testing.T, config *Config) *TestSession
NewTestSession creates a new test session with the given config. Starts a transaction and registers automatic rollback via t.Cleanup().
func (*TestSession) Cleanup ¶
func (s *TestSession) Cleanup()
Cleanup rolls back the transaction and closes the session.
func (*TestSession) Context ¶
func (s *TestSession) Context() context.Context
Context returns the context with the test transaction embedded.
func (*TestSession) DB ¶
func (s *TestSession) DB() *gorm.DB
DB returns the GORM DB bound to the test transaction.
func (*TestSession) GetDBOrTx ¶
func (s *TestSession) GetDBOrTx(ctx context.Context) *gorm.DB
GetDBOrTx returns the DB with transaction context.
func (*TestSession) GetQueryDBOrTx ¶
func (s *TestSession) GetQueryDBOrTx() (*QueryDB, error)
GetQueryDBOrTx returns a transaction-bound QueryDB if a transaction is set in the context, otherwise the default QueryDB.
func (*TestSession) QueryDB ¶
func (s *TestSession) QueryDB() *QueryDB
QueryDB returns the QueryDB for this test session.
func (*TestSession) Reader ¶
func (s *TestSession) Reader() *ReaderSession
Reader returns a ReaderSession wrapper for testing.
func (*TestSession) Session ¶
func (s *TestSession) Session() *Session
Session returns the underlying Session.
func (*TestSession) SetRLSContext ¶
func (s *TestSession) SetRLSContext(vars map[string]string, ctxSetter func(ctx context.Context, name, value string) context.Context) context.Context
SetRLSContext sets RLS session variables within the test transaction and returns an updated context.
Uses SET LOCAL (is_local=true) so variables are scoped to the transaction and automatically cleared on rollback.
vars maps variable names to values. An empty string value resets the variable to NULL. ctxSetter is an optional callback to also set values in the Go context (nil to skip).
func (*TestSession) Writer ¶
func (s *TestSession) Writer() *WriterSession
Writer returns a WriterSession wrapper for testing.
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager manages transaction execution with automatic retry for retryable errors (pgxretry.ErrRetryableInTx).
It is the application-level counterpart to pgxretry's driver-level retry: pgxretry retries individual queries outside transactions, while TransactionManager retries entire transactions when the driver signals ErrRetryableInTx.
func NewTransactionManager ¶
func NewTransactionManager(session *WriterSession, opts ...TransactionManagerOption) *TransactionManager
NewTransactionManager creates a new TransactionManager for the given WriterSession.
type TransactionManagerOption ¶
type TransactionManagerOption func(*TransactionManager)
TransactionManagerOption configures a TransactionManager.
func WithLogger ¶
func WithLogger(logger *slog.Logger) TransactionManagerOption
WithLogger sets the logger for the TransactionManager. Default is the WriterSession's logger.
func WithMaxRetries ¶
func WithMaxRetries(n int) TransactionManagerOption
WithMaxRetries sets the maximum number of retry attempts. Default is 3.
func WithRetryDelay ¶
func WithRetryDelay(d time.Duration) TransactionManagerOption
WithRetryDelay sets the delay between retry attempts. Default is 1 second.
type WriterSession ¶
type WriterSession struct {
*Session
}
WriterSession is a session for write operations.
func NewWriterSession ¶
func NewWriterSession(config *Config) (*WriterSession, error)
NewWriterSession creates a new WriterSession.
func (*WriterSession) String ¶
func (s *WriterSession) String() string