conn

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: MIT Imports: 23 Imported by: 0

README

conn

Session management for PostgreSQL with GORM integration, Writer/Reader separation, transactions, and connection pooling.

Part of go-pgkit.

Overview

The conn package provides the main application-facing API for database access. It depends on GORM (gorm.io/gorm) and sqlx (github.com/jmoiron/sqlx) — Session.DB() returns *gorm.DB and Session.QueryDB() provides sqlx-based named queries. It also integrates pgxretry for automatic retry and rlsconn for Row-Level Security.

Creating a Session

cfg := conn.NewConfig(
    conn.WithHost("localhost"),
    conn.WithPort(5432),
    conn.WithCredentials("app_user", "app_password"),
    conn.WithDatabase("mydb"),
    conn.WithConnectionPool(25, 5, 60),
)

session, err := cfg.Connect()
if err != nil {
    log.Fatal(err)
}
defer session.Close()

Or from environment variables:

cfg := conn.NewConfig(conn.WithEnvVars("writer"))

session, err := cfg.Connect()

See docs/configuration.md for the full options reference.

SessionManager

SessionManager manages separate writer and reader sessions:

writerCfg := conn.NewConfig(conn.WithEnvVars("writer"))
readerCfg := conn.NewConfig(conn.WithEnvVars("reader"))

sm, err := conn.NewSessionManager(writerCfg, readerCfg)
if err != nil {
    log.Fatal(err)
}
defer sm.Close()

// Use writer for mutations
sm.Writer().DB()         // *gorm.DB
sm.Writer().QueryDB()    // *QueryDB

// Use reader for queries
sm.Reader().DB()         // *gorm.DB
sm.Reader().QueryDB()    // *QueryDB
  • WriterSession exposes all Session methods including Transaction()
  • ReaderSession exposes a read-safe subset: DB(), QueryDB(), GetDBOrTx(), ResetConnection(), GetPoolStats(), Close()

Transactions

Use WriterSession.Transaction to execute code within a database transaction:

err := sm.Writer().Transaction(ctx, func(txCtx context.Context) error {
    // All operations using txCtx share the same transaction
    if err := sm.Writer().GetDBOrTx(txCtx).Create(&order).Error; err != nil {
        return err  // triggers rollback
    }
    if err := sm.Writer().GetDBOrTx(txCtx).Create(&orderItem).Error; err != nil {
        return err  // triggers rollback
    }
    return nil  // triggers commit
})
  • Returning nil commits the transaction
  • Returning an error rolls back and propagates the error
  • Panics are caught, the transaction is rolled back, and the panic is re-raised
  • GetDBOrTx(ctx) returns the transaction if one exists in the context, otherwise the base DB
TransactionManager

TransactionManager wraps WriterSession.Transaction with automatic retry for pgxretry.ErrRetryableInTx errors. It is the application-level counterpart to pgxretry's driver-level retry: pgxretry retries individual queries outside transactions, while TransactionManager retries entire transactions.

tm := conn.NewTransactionManager(sm.Writer())

err := tm.Do(ctx, func(txCtx context.Context) error {
    if err := sm.Writer().GetDBOrTx(txCtx).Create(&order).Error; err != nil {
        return err
    }
    if err := sm.Writer().GetDBOrTx(txCtx).Create(&orderItem).Error; err != nil {
        return err
    }
    return nil
})

Customize retry behavior with functional options:

tm := conn.NewTransactionManager(sm.Writer(),
    conn.WithMaxRetries(5),
    conn.WithRetryDelay(500 * time.Millisecond),
    conn.WithLogger(customLogger),
)
  • Retries only on pgxretry.ErrRetryableInTx (connection errors that occurred inside a transaction)
  • Calls ResetConnection before each retry to recover the underlying connection
  • Non-retryable errors are returned immediately without retry
  • Respects context cancellation between retries
  • Defaults: 3 retries, 1 second delay

Raw SQL with QueryDB

QueryDB wraps sqlx for named parameter queries:

qdb := session.QueryDB()

// Named query (struct or map args)
rows, err := qdb.NamedQueryContext(ctx, "SELECT * FROM products WHERE tenant_id = :tenant_id", map[string]any{
    "tenant_id": tenantID,
})
defer rows.Close()

// Named exec
result, err := qdb.NamedExecContext(ctx, "UPDATE products SET price = :price WHERE id = :id", product)

// Single row
row, err := qdb.NamedQueryRowContext(ctx, "SELECT COUNT(*) FROM products WHERE tenant_id = :tenant_id", map[string]any{
    "tenant_id": tenantID,
})

// Positional query
row := qdb.QueryRowContext(ctx, "SELECT COUNT(*) FROM products WHERE tenant_id = $1", tenantID)

Connection Pool Management

Pool Statistics
stats := session.GetPoolStats()
// Returns: max_open_connections, open_connections, in_use, idle,
//          wait_count, wait_duration, max_idle_closed, max_idle_time_closed, max_lifetime_closed
Connection Reset

ResetConnection replaces the current connection with a new one, useful for recovering from persistent connection issues:

err := session.ResetConnection(ctx)
  • Creates a new connection and pings it before swapping
  • Closes the old connection gracefully in a background goroutine
  • Has a 1-second cooldown to prevent rapid successive resets
  • Thread-safe with minimal lock duration

See docs/architecture.md for the full reset flow.

Caveats

  • Driver registration is process-global: The first Config.Connect() call registers the pgxretry driver with its retry and RLS settings. All subsequent connections in the process use the same driver configuration. If RLS is enabled, the first connection must have RLS configured.
  • sslmode defaults to disable. Override with WithSSLMode or DB_SSLMODE. See docs/configuration.md — Caveats.
  • GORM prepared statements are disabled to ensure per-request context propagation for RLS session variables.

Further Reading

  • Configuration — All config options, environment variables, defaults
  • RLS Guide — Row-Level Security setup and integration
  • Testing — TestSession, TestRLSSession, Docker setup
  • Architecture — Driver layering, connection lifecycle

Documentation

Overview

Package conn provides generic database session and connection management. It is designed to be reusable across multiple Go projects.

Index

Examples

Constants

View Source
const (
	LogLevelSilent = logger.Silent
	LogLevelError  = logger.Error
	LogLevelWarn   = logger.Warn
	LogLevelInfo   = logger.Info
)

Log level constants from gorm/logger

Variables

View Source
var (
	TypeConnection     errorsx.ErrorType = "db.connection"
	TypeTransaction    errorsx.ErrorType = "db.transaction"
	TypeInitialization errorsx.ErrorType = "db.initialization"

	ErrConnection     = errorsx.New("database connection error", errorsx.WithType(TypeConnection), errorsx.WithHTTPStatus(500))
	ErrTransaction    = errorsx.New("database transaction error", errorsx.WithType(TypeTransaction), errorsx.WithHTTPStatus(500))
	ErrInitialization = errorsx.New("database initialization error", errorsx.WithType(TypeInitialization), errorsx.WithHTTPStatus(500))
)

Error types for database operations

Functions

func DefaultGormOpener

func DefaultGormOpener(dialector gorm.Dialector, config *gorm.Config) (*gorm.DB, error)

DefaultGormOpener is the default GORM opener that uses gorm.Open directly.

Types

type Config

type Config struct {
	Role            string // "writer" or "reader"
	Host            string
	Port            int
	User            string
	Password        string
	DBName          string
	MaxOpenConns    int // Maximum open connections (0 = unlimited)
	MaxIdleConns    int // Maximum idle connections (default: 2)
	ConnMaxLifetime int // Maximum connection lifetime in minutes (0 = unlimited)

	// SSL settings
	SSLMode string // SSL mode (default: "disable")

	// Retry settings
	EnableRetry  bool // Enable retry on transient errors
	MaxRetries   int  // Maximum number of retries
	RetryBackoff int  // Retry backoff interval in milliseconds

	// Connection timeout settings
	ConnectTimeout int // TCP connection timeout in seconds (default: 10)

	// Connection reset settings
	GracefulCloseTimeout int // Graceful close timeout in seconds (0 = default 10s)

	// Log level settings
	LogLevel *LogLevel // GORM log level (nil defaults to Info)

	// RLS settings
	EnableRLS bool // Enable Row-Level Security (default: true)

	// RLSSessionVars is a list of session variables to set for RLS.
	// Example: []rlsconn.SessionVar{{Name: "app.current_tenant_id", ValueGetter: GetTenantIDFromContext}}
	RLSSessionVars []rlsconn.SessionVar

	// Logger is the slog logger for database operations.
	Logger *slog.Logger

	// GormOpener is an optional custom GORM opener (e.g., for DataDog APM tracing).
	// If nil, DefaultGormOpener is used.
	GormOpener GormOpener

	// GormCallbackRegistrar is an optional callback registrar (e.g., for DataDog APM resource naming).
	// If nil, no callbacks are registered.
	GormCallbackRegistrar GormCallbackRegistrar

	// SpanStarter is an optional span starter for QueryDB operations.
	// If nil, no spans are created.
	SpanStarter SpanStarter
	// contains filtered or unexported fields
}

Config holds database connection settings.

func NewConfig

func NewConfig(opts ...ConfigOption) *Config

NewConfig creates a new Config by applying the given options to default values.

Examples:

// Basic usage
cfg := NewConfig(
    WithHost("localhost"),
    WithPort(5432),
    WithCredentials("user", "password"),
    WithDatabase("mydb"),
)

// Load from environment variables
cfg := NewConfig(WithEnvVars("writer"))
Example
package main

import (
	"github.com/hacomono-lib/go-pgkit/conn"
)

func main() {
	// Basic usage
	_ = conn.NewConfig(
		conn.WithHost("localhost"),
		conn.WithPort(5432),
		conn.WithCredentials("user", "password"),
		conn.WithDatabase("mydb"),
	)

	// Load from environment variables (writer role) + silent logging for production
	_ = conn.NewConfig(
		conn.WithEnvVars("writer"),
		conn.WithSilentGormLogger(),
	)

	// Test configuration
	_ = conn.NewConfig(
		conn.WithHost("localhost"),
		conn.WithPort(5432),
		conn.WithDatabase("test_db"),
		conn.WithConnectionPool(10, 2, 30),
	)

	// Disable tracing for high-frequency SQL operations
	_ = conn.NewConfig(
		conn.WithEnvVars("writer"),
		conn.WithSilentGormLogger(),
		conn.WithoutTrace(),
	)
}

func (*Config) Connect

func (c *Config) Connect() (*Session, error)

func (*Config) String

func (c *Config) String() string

String returns a string representation of Config (password is hidden).

func (*Config) WithLogger

func (c *Config) WithLogger(logger *slog.Logger) *Config

WithLogger sets the slog logger for database operations.

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a function type for configuring Config.

func WithConnectTimeout

func WithConnectTimeout(seconds int) ConfigOption

WithConnectTimeout sets the TCP connection timeout in seconds. Default: 10 seconds.

func WithConnectionPool

func WithConnectionPool(maxOpen, maxIdle, maxLifetime int) ConfigOption

WithConnectionPool configures connection pool settings. maxLifetime is in minutes.

func WithCredentials

func WithCredentials(user, password string) ConfigOption

WithCredentials sets the database user and password.

func WithDatabase

func WithDatabase(dbName string) ConfigOption

WithDatabase sets the database name.

func WithEnvVars

func WithEnvVars(role string) ConfigOption

WithEnvVars loads configuration from environment variables with the default "DB_" prefix. When a role is specified, role-specific variables (e.g., DB_WRITER_HOST) take precedence. Only non-empty environment variables override defaults; unset variables are ignored.

func WithEnvVarsWithPrefix

func WithEnvVarsWithPrefix(role, prefix string) ConfigOption

WithEnvVarsWithPrefix loads configuration from environment variables with a custom prefix. For example, WithEnvVarsWithPrefix("writer", "MYAPP_") reads MYAPP_HOST, MYAPP_WRITER_HOST, etc.

func WithGormCallbackRegistrar

func WithGormCallbackRegistrar(registrar GormCallbackRegistrar) ConfigOption

WithGormCallbackRegistrar sets a GORM callback registration function.

func WithGormOpener

func WithGormOpener(opener GormOpener) ConfigOption

WithGormOpener sets a custom GORM opener function.

func WithGracefulCloseTimeout

func WithGracefulCloseTimeout(seconds int) ConfigOption

WithGracefulCloseTimeout sets the timeout for graceful connection close on reset.

func WithHost

func WithHost(host string) ConfigOption

WithHost sets the database host.

func WithLogLevel

func WithLogLevel(level LogLevel) ConfigOption

WithLogLevel sets the GORM log level. Valid values: LogLevelSilent, LogLevelError, LogLevelWarn, LogLevelInfo.

func WithPort

func WithPort(port int) ConfigOption

WithPort sets the database port.

func WithRLS

func WithRLS(enableRLS bool) ConfigOption

WithRLS enables or disables Row-Level Security.

func WithRLSSessionVars

func WithRLSSessionVars(vars []rlsconn.SessionVar) ConfigOption

WithRLSSessionVars sets the RLS session variables.

func WithRetry

func WithRetry(enableRetry bool, maxRetries, retryBackoffMs int) ConfigOption

WithRetry configures retry behavior.

func WithRole

func WithRole(role string) ConfigOption

WithRole sets the connection role ("writer" or "reader").

func WithSSLMode

func WithSSLMode(sslMode string) ConfigOption

WithSSLMode sets the SSL mode for the database connection. Valid values: "disable", "require", "verify-ca", "verify-full", "prefer", "allow". Default: "disable".

func WithSilentGormLogger

func WithSilentGormLogger() ConfigOption

WithSilentGormLogger disables GORM log output. Useful for production environments to suppress log noise.

func WithSpanStarter

func WithSpanStarter(starter SpanStarter) ConfigOption

WithSpanStarter sets the span starter for QueryDB tracing.

func WithoutTrace

func WithoutTrace() ConfigOption

WithoutTrace disables all tracing hooks (GormOpener, GormCallbackRegistrar, SpanStarter).

type GormCallbackRegistrar

type GormCallbackRegistrar func(db *gorm.DB, logger *slog.Logger)

GormCallbackRegistrar is a function that registers GORM callbacks. This allows for custom callback registration (e.g., DataDog APM resource naming).

type GormOpener

type GormOpener func(dialector gorm.Dialector, config *gorm.Config) (*gorm.DB, error)

GormOpener is a function that opens a GORM database connection. This allows for custom initialization logic (e.g., DataDog APM tracing).

type LogLevel

type LogLevel = logger.LogLevel

LogLevel represents the log level for the database logger. Re-exported from gorm/logger for convenience.

type QueryDB

type QueryDB struct {

	// SpanStarter is called to start a span for SQL operations.
	// If nil, no spans are created.
	SpanStarter SpanStarter
	// contains filtered or unexported fields
}

QueryDB wraps sqlx query operations and provides named parameter methods.

func NewQueryDB

func NewQueryDB(extContext sqlx.ExtContext, role string, spanStarter SpanStarter) *QueryDB

NewQueryDB creates a new QueryDB with the given parameters. This is useful for testing.

func (*QueryDB) BindNamed

func (q *QueryDB) BindNamed(query string, arg interface{}) (string, []interface{}, error)

BindNamed binds named parameters in the query to positional parameters.

func (*QueryDB) NamedExecContext

func (q *QueryDB) NamedExecContext(ctx context.Context, query string, arg interface{}) (result sql.Result, err error)

NamedExecContext executes a named query that does not return rows.

func (*QueryDB) NamedQueryContext

func (q *QueryDB) NamedQueryContext(ctx context.Context, query string, arg interface{}) (rows *sqlx.Rows, err error)

NamedQueryContext executes a named query that returns rows.

func (*QueryDB) NamedQueryRowContext

func (q *QueryDB) NamedQueryRowContext(ctx context.Context, query string, arg interface{}) (row *sql.Row, err error)

NamedQueryRowContext executes a named query that returns a single row. Like QueryRowContext, the result set is automatically closed, so it is safe to use within transactions.

func (*QueryDB) QueryRowContext

func (q *QueryDB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext executes a query that returns a single row. Note: sql.Row is lazily evaluated — errors are not known until Scan() is called, so the span is finished immediately (error tracking is not possible here).

func (*QueryDB) Refresh

func (q *QueryDB) Refresh(newExtContext sqlx.ExtContext)

Refresh replaces the internal ExtContext with a new one.

type ReaderSession

type ReaderSession struct {
	// contains filtered or unexported fields
}

ReaderSession is a read-only session.

func NewReaderSession

func NewReaderSession(config *Config) (*ReaderSession, error)

NewReaderSession creates a new ReaderSession.

func (*ReaderSession) Close

func (s *ReaderSession) Close() error

Close closes the reader session.

func (*ReaderSession) DB

func (s *ReaderSession) DB() *gorm.DB

DB returns the GORM DB for read operations.

func (*ReaderSession) GetDBOrTx

func (s *ReaderSession) GetDBOrTx(ctx context.Context) *gorm.DB

GetDBOrTx returns a context-aware DB connection. ReaderSession does not support transactions, but needs context for RLS session variables.

func (*ReaderSession) GetPoolStats

func (s *ReaderSession) GetPoolStats() map[string]any

GetPoolStats returns connection pool statistics.

func (*ReaderSession) Logger

func (s *ReaderSession) Logger() *slog.Logger

Logger returns the session logger.

func (*ReaderSession) QueryDB

func (s *ReaderSession) QueryDB() *QueryDB

QueryDB returns the QueryDB for read operations.

func (*ReaderSession) ResetConnection

func (s *ReaderSession) ResetConnection(ctx context.Context) error

ResetConnection resets the database connection.

func (*ReaderSession) String

func (s *ReaderSession) String() string

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session provides database connection and transaction functionality.

func (*Session) Close

func (s *Session) Close() error

Close closes the database connection.

func (*Session) Connect

func (s *Session) Connect() (*Session, error)

func (*Session) DB

func (s *Session) DB() *gorm.DB

DB returns the GORM DB instance for use outside transactions. Note: the returned instance may reference a stale connection after ResetConnection.

func (*Session) GetCurrentTenantID

func (s *Session) GetCurrentTenantID(ctx context.Context) (string, error)

GetCurrentTenantID retrieves the tenant ID from the PostgreSQL session variable (for debugging). Returns an empty string if not set.

func (*Session) GetCurrentUserID

func (s *Session) GetCurrentUserID(ctx context.Context) (string, error)

GetCurrentUserID retrieves the user ID from the PostgreSQL session variable (for debugging). Returns an empty string if not set.

func (*Session) GetDBOrTx

func (s *Session) GetDBOrTx(ctx context.Context) *gorm.DB

GetDBOrTx returns the transaction from the context if one is set, otherwise the base DB connection. In both cases, WithContext(ctx) is applied so that the latest context (carrying RLS session variables) is propagated.

func (*Session) GetPoolStats

func (s *Session) GetPoolStats() map[string]interface{}

GetPoolStats returns connection pool statistics for monitoring/debugging.

func (*Session) GetTx

func (s *Session) GetTx(ctx context.Context) (*gorm.DB, error)

GetTx retrieves the transaction from the context. Returns an error if no transaction is set.

func (*Session) HasTx

func (s *Session) HasTx(ctx context.Context) bool

HasTx checks whether a transaction is set in the context.

func (*Session) Logger

func (s *Session) Logger() *slog.Logger

Logger returns the logger for database operations.

func (*Session) QueryDB

func (s *Session) QueryDB() *QueryDB

QueryDB returns the QueryDB instance for raw SQL operations.

func (*Session) ResetConnection

func (s *Session) ResetConnection(ctx context.Context) error

ResetConnection closes and recreates the database connection. Uses a write lock to prevent concurrent resets and a cooldown to avoid rapid successive resets.

func (*Session) Role

func (s *Session) Role() string

Role returns the session role.

func (*Session) String

func (s *Session) String() string

String returns a string representation of the session.

func (*Session) Transaction

func (s *Session) Transaction(ctx context.Context, fn func(txCtx context.Context) error) error

Transaction starts a transaction and executes fn within it. On success, the transaction is committed. On error, it is rolled back. On panic, the transaction is rolled back and the panic is re-raised.

This method wraps GORM's transaction management to provide:

  1. Stack traces via errorsx
  2. Detailed error information (including rollback failures)
  3. Application-specific context management

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages Writer and Reader sessions.

func NewSessionManager

func NewSessionManager(writerConfig, readerConfig *Config) (*SessionManager, error)

NewSessionManager creates a SessionManager from writer and reader configs.

func (*SessionManager) Close

func (sm *SessionManager) Close() error

Close closes both sessions.

func (*SessionManager) Reader

func (sm *SessionManager) Reader() *ReaderSession

Reader returns the ReaderSession.

func (*SessionManager) String

func (sm *SessionManager) String() string

func (*SessionManager) Writer

func (sm *SessionManager) Writer() *WriterSession

Writer returns the WriterSession.

type Sessioner

type Sessioner interface {
	GetDBOrTx(ctx context.Context) *gorm.DB
}

Sessioner is the common interface for ReaderSession and WriterSession. Used as a type parameter in generic reader implementations.

type SpanStarter

type SpanStarter func(ctx context.Context, operationType, query string, role string) (finish func(err error), newCtx context.Context)

SpanStarter is a function that starts a span for SQL operations. It returns a finish function that should be called when the operation completes.

type TestRLSSession

type TestRLSSession struct {
	// contains filtered or unexported fields
}

TestRLSSession provides a test session for verifying the RLS implementation itself.

Manages two sessions:

  • superuserSession: for data setup/cleanup (bypasses RLS)
  • rlsSession: for verifying RLS behavior (RLS enforced, same flow as production)

Usage:

func TestRLSImplementation(t *testing.T) {
    ts := conn.NewTestRLSSession(t, superuserConfig, rlsConfig)
    defer ts.Cleanup()

    // Setup test data (as superuser, bypasses RLS)
    ts.ExecAsSuperuser(ctx, "INSERT INTO tenants ...")

    // Verify RLS (production-equivalent is_local=false flow)
    var count int64
    err := ts.RLSSession().DB().WithContext(ctxWithTenant).
        Table("products").Count(&count).Error
}

func NewTestRLSSession

func NewTestRLSSession(t *testing.T, superuserConfig, rlsConfig *Config) *TestRLSSession

NewTestRLSSession creates a new RLS test session. Accepts both a superuser config (for RLS bypass) and an RLS config (for RLS enforcement).

func (*TestRLSSession) Cleanup

func (ts *TestRLSSession) Cleanup()

Cleanup runs registered cleanup functions and closes both sessions.

func (*TestRLSSession) ExecAsSuperuser

func (ts *TestRLSSession) ExecAsSuperuser(ctx context.Context, sql string, args ...any)

ExecAsSuperuser executes arbitrary SQL using the superuser session.

func (*TestRLSSession) ExecAsSuperuserWithCleanup

func (ts *TestRLSSession) ExecAsSuperuserWithCleanup(ctx context.Context, sql, cleanupSQL string, args ...any)

ExecAsSuperuserWithCleanup executes SQL using the superuser session and registers a cleanup SQL statement to be executed on test teardown.

func (*TestRLSSession) RLSDB

func (ts *TestRLSSession) RLSDB() *gorm.DB

RLSDB returns the GORM DB for the RLS-enforced session.

func (*TestRLSSession) RLSSession

func (ts *TestRLSSession) RLSSession() *Session

RLSSession returns the session with RLS enforced.

func (*TestRLSSession) RegisterCleanup

func (ts *TestRLSSession) RegisterCleanup(fn func())

RegisterCleanup registers a cleanup function. Registered functions are executed in reverse order (LIFO).

func (*TestRLSSession) SuperuserDB

func (ts *TestRLSSession) SuperuserDB() *gorm.DB

SuperuserDB returns the GORM DB for the superuser session.

func (*TestRLSSession) SuperuserSession

func (ts *TestRLSSession) SuperuserSession() *Session

SuperuserSession returns the session that bypasses RLS.

type TestSession

type TestSession struct {
	// contains filtered or unexported fields
}

TestSession provides a test-scoped session with transaction isolation. All operations run inside a transaction that is rolled back on cleanup, leaving the database unchanged.

Usage:

func TestMyFeature(t *testing.T) {
    ts := conn.NewTestSession(t, config)
    defer ts.Cleanup()

    ctx := ts.Context()
    err := ts.DB().WithContext(ctx).Table("products").Find(&products).Error
}

func NewTestSession

func NewTestSession(t *testing.T, config *Config) *TestSession

NewTestSession creates a new test session with the given config. Starts a transaction and registers automatic rollback via t.Cleanup().

func (*TestSession) Cleanup

func (s *TestSession) Cleanup()

Cleanup rolls back the transaction and closes the session.

func (*TestSession) Context

func (s *TestSession) Context() context.Context

Context returns the context with the test transaction embedded.

func (*TestSession) DB

func (s *TestSession) DB() *gorm.DB

DB returns the GORM DB bound to the test transaction.

func (*TestSession) GetDBOrTx

func (s *TestSession) GetDBOrTx(ctx context.Context) *gorm.DB

GetDBOrTx returns the DB with transaction context.

func (*TestSession) GetQueryDBOrTx

func (s *TestSession) GetQueryDBOrTx() (*QueryDB, error)

GetQueryDBOrTx returns a transaction-bound QueryDB if a transaction is set in the context, otherwise the default QueryDB.

func (*TestSession) QueryDB

func (s *TestSession) QueryDB() *QueryDB

QueryDB returns the QueryDB for this test session.

func (*TestSession) Reader

func (s *TestSession) Reader() *ReaderSession

Reader returns a ReaderSession wrapper for testing.

func (*TestSession) Session

func (s *TestSession) Session() *Session

Session returns the underlying Session.

func (*TestSession) SetRLSContext

func (s *TestSession) SetRLSContext(vars map[string]string, ctxSetter func(ctx context.Context, name, value string) context.Context) context.Context

SetRLSContext sets RLS session variables within the test transaction and returns an updated context.

Uses SET LOCAL (is_local=true) so variables are scoped to the transaction and automatically cleared on rollback.

vars maps variable names to values. An empty string value resets the variable to NULL. ctxSetter is an optional callback to also set values in the Go context (nil to skip).

func (*TestSession) Writer

func (s *TestSession) Writer() *WriterSession

Writer returns a WriterSession wrapper for testing.

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager manages transaction execution with automatic retry for retryable errors (pgxretry.ErrRetryableInTx).

It is the application-level counterpart to pgxretry's driver-level retry: pgxretry retries individual queries outside transactions, while TransactionManager retries entire transactions when the driver signals ErrRetryableInTx.

func NewTransactionManager

func NewTransactionManager(session *WriterSession, opts ...TransactionManagerOption) *TransactionManager

NewTransactionManager creates a new TransactionManager for the given WriterSession.

func (*TransactionManager) Do

func (tm *TransactionManager) Do(ctx context.Context, fn func(txCtx context.Context) error) error

Do executes fn within a transaction. If the transaction fails with pgxretry.ErrRetryableInTx, it resets the connection and retries up to maxRetries times. Non-retryable errors are returned immediately.

type TransactionManagerOption

type TransactionManagerOption func(*TransactionManager)

TransactionManagerOption configures a TransactionManager.

func WithLogger

func WithLogger(logger *slog.Logger) TransactionManagerOption

WithLogger sets the logger for the TransactionManager. Default is the WriterSession's logger.

func WithMaxRetries

func WithMaxRetries(n int) TransactionManagerOption

WithMaxRetries sets the maximum number of retry attempts. Default is 3.

func WithRetryDelay

func WithRetryDelay(d time.Duration) TransactionManagerOption

WithRetryDelay sets the delay between retry attempts. Default is 1 second.

type WriterSession

type WriterSession struct {
	*Session
}

WriterSession is a session for write operations.

func NewWriterSession

func NewWriterSession(config *Config) (*WriterSession, error)

NewWriterSession creates a new WriterSession.

func (*WriterSession) String

func (s *WriterSession) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL