internal

package
v0.0.0-...-69105db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: AGPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BinaryFormatInvalid = iota
	BinaryFormatRaw
	BinaryFormatBase64
	BinaryFormatHex
)
View Source
const (
	DefaultPeerDBS3PartSize int64 = 64 * 1024 * 1024 // 64MiB
)
View Source
const (
	KmsKeyIDEnvVar = "PEERDB_KMS_KEY_ID"
)

Variables

View Source
var DynamicIndex = func() map[string]int {
	defaults := make(map[string]int, len(DynamicSettings))
	for i, setting := range DynamicSettings {
		defaults[setting.Name] = i
	}
	return defaults
}()
View Source
var DynamicSettings = [...]*protos.DynamicSetting{
	{
		Name:             "PEERDB_CDC_CHANNEL_BUFFER_SIZE",
		Description:      "Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC",
		DefaultValue:     "131072",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE",
		Description:      "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalization",
		DefaultValue:     "128",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_GROUP_NORMALIZE",
		Description:      "Controls whether normalize applies to one batch at a time, or all pending batches",
		DefaultValue:     "4",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS",
		Description:      "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only",
		DefaultValue:     "10",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_QUEUES,
	},
	{
		Name:             "PEERDB_QUEUE_PARALLELISM",
		Description:      "Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub",
		DefaultValue:     "4",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_QUEUES,
	},
	{
		Name:             "PEERDB_CDC_STORE_ENABLED",
		Description:      "Controls whether to enable the store for recovering unchanged Postgres TOAST values within a CDC batch",
		DefaultValue:     "true",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD",
		Description:      "CDC: number of records beyond which records are written to disk instead",
		DefaultValue:     "1000000",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD",
		Description:      "CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables",
		DefaultValue:     "-1",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_ENABLE_WAL_HEARTBEAT",
		Description:      "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity",
		DefaultValue:     "true",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_WAL_HEARTBEAT_QUERY",
		DefaultValue:     "SELECT pg_logical_emit_message(false,'peerdb_heartbeat','')",
		ValueType:        protos.DynconfValueType_STRING,
		Description:      "SQL to run during each WAL heartbeat",
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_RECONNECT_AFTER_BATCHES",
		Description:      "Force peerdb to reconnect connection to source after N batches",
		DefaultValue:     "0",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:         "PEERDB_FULL_REFRESH_OVERWRITE_MODE",
		Description:  "Enables full refresh mode for query replication mirrors of overwrite type",
		DefaultValue: "false",
		ValueType:    protos.DynconfValueType_BOOL,
		ApplyMode:    protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
	},
	{
		Name:             "PEERDB_NULLABLE",
		Description:      "Propagate nullability in schema",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_AVRO_NULLABLE_LAX",
		Description:      "Make all columns nullable in initial load Avro schema (disables strict nullable checking)",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
		Description:      "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
		DefaultValue:     "8",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
	},
	{
		Name:             "PEERDB_SNOWFLAKE_AUTO_COMPRESS",
		Description:      "AUTO_COMPRESS option when uploading to Snowflake",
		DefaultValue:     "true",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
	},
	{
		Name:             "PEERDB_SNOWFLAKE_SKIP_COMPRESSION",
		Description:      "Use `NULL` compression when creating Avro files to be uploaded to Snowflake directly",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_BINARY_FORMAT",
		Description:      "Binary field encoding on clickhouse destination; either raw, hex, or base64",
		DefaultValue:     "raw",
		ValueType:        protos.DynconfValueType_STRING,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME",
		Description:      "S3 buckets to store Avro files for mirrors with ClickHouse target",
		DefaultValue:     "",
		ValueType:        protos.DynconfValueType_STRING,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_S3_UUID_PREFIX",
		Description:      "Use random UUID as prefix instead of flow name, can help partitioning on non-AWS based s3 providers",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name: "PEERDB_S3_PART_SIZE",
		Description: "S3 upload part size in bytes, may need to increase for large batches. " +
			"https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html",
		DefaultValue:     strconv.FormatInt(DefaultPeerDBS3PartSize, 10),
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:         "PEERDB_S3_BYTES_PER_AVRO_FILE",
		Description:  "S3 upload chunk size in bytes, needed for large unpartitioned initial loads.",
		DefaultValue: "1000000000",
		ValueType:    protos.DynconfValueType_INT,
		ApplyMode:    protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
	},
	{
		Name:             "PEERDB_QUEUE_FORCE_TOPIC_CREATION",
		Description:      "Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_QUEUES,
	},
	{
		Name:             "PEERDB_ALERTING_GAP_MINUTES",
		Description:      "Duration in minutes before reraising alerts, 0 disables all alerting entirely",
		DefaultValue:     "15",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD",
		Description:      "Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely",
		DefaultValue:     "5000",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD",
		Description:      "Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely",
		DefaultValue:     "5",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS",
		Description:      "BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_BIGQUERY,
	},
	{
		Name: "PEERDB_BIGQUERY_TOAST_MERGE_CHUNKING",
		Description: "BigQuery only: controls number of unchanged toast columns merged per statement in normalization. " +
			"Avoids statements growing too large",
		DefaultValue:     "8",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_BIGQUERY,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE",
		Description:      "Enable generating deletion records for updates in ClickHouse, avoids stale records when primary key updated",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS",
		Description:      "Configures max_insert_threads setting on clickhouse for inserting into destination table. Setting left unset when 0",
		DefaultValue:     "0",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE",
		Description:      "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes",
		DefaultValue:     "0",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING",
		Description:      "Map unbounded numerics in Postgres to String in ClickHouse to preserve precision and scale",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_ENABLE_JSON",
		Description:      "Map JSON datatype from source to JSON in ClickHouse instead of String",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_CLIENT_NAME",
		Description:      "Client name to pass to ClickHouse Client",
		DefaultValue:     "peerdb",
		ValueType:        protos.DynconfValueType_STRING,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
		Description:      "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
		DefaultValue:     "240",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME",
		Description:      "Set Postgres application_name to have mirror name as suffix for each mirror",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_MAINTENANCE_MODE_ENABLED",
		Description:      "Whether PeerDB is in maintenance mode, which disables any modifications to mirrors",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name: "PEERDB_PKM_EMPTY_BATCH_THROTTLE_THRESHOLD_SECONDS",
		Description: "Throttle threshold seconds for always sending KeepAlive response when no records are processed, " +
			"-1 disables always sending responses when no records are processed",
		DefaultValue:     "60",
		ValueType:        protos.DynconfValueType_INT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_INITIAL_LOAD_PARTS_PER_PARTITION",
		Description:      "Chunk partitions in initial load into N queries, can help mitigate OOM issues on ClickHouse",
		DefaultValue:     "1",
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_CLICKHOUSE_INITIAL_LOAD_ALLOW_NON_EMPTY_TABLES",
		Description:      "Disables validation raising error if destination table of initial load is not empty",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
	},
	{
		Name:             "PEERDB_SKIP_SNAPSHOT_EXPORT",
		Description:      "This avoids initial load failing due to connectivity drops, but risks data consistency unless precautions are taken",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name: "PEERDB_SOURCE_SCHEMA_AS_DESTINATION_COLUMN",
		Description: "Ingest source schema as column to destination. " +
			"Useful when multiple tables from source ingest into single table on destination",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_ORIGIN_METADATA_AS_DESTINATION_COLUMN",
		Description:      "Ingest additional metadata fields",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_QUEUES,
	},
	{
		Name: "PEERDB_POSTGRES_CDC_HANDLE_INHERITANCE_FOR_NON_PARTITIONED_TABLES",
		Description: "For Postgres CDC: attempt to fetch/remap child tables for tables that aren't partitioned by Postgres." +
			"Useful for tables that are partitioned by extensions or table inheritance",
		DefaultValue:     "true",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_FORCE_INTERNAL_VERSION",
		Description:      "Forces mirrors to be created with a different internal version than the latest peerdb internal version.",
		DefaultValue:     strconv.FormatUint(uint64(shared.InternalVersion_Latest), 10),
		ValueType:        protos.DynconfValueType_UINT,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_UI_MAINTENANCE_TAB_ENABLED",
		Description:      "Enable/disable the maintenance tab in the PeerDB UI",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_POSTGRES_ENABLE_FAILOVER_SLOTS",
		Description:      "Create slots with failover enabled when possible",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_METRICS_RECORD_AGGREGATES_ENABLED",
		Description:      "Enable/disable recording of aggregate metrics",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_POSTGRES_WAL_SENDER_TIMEOUT",
		Description:      "wal_sender_timeout value passed for Postgres CDC. \"NONE\" means no override, leaving it up to the source DB",
		DefaultValue:     "120s",
		ValueType:        protos.DynconfValueType_STRING,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
	{
		Name:             "PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG",
		Description:      "Apply schema deltas to catalog instead of fetching latest schema from source",
		DefaultValue:     "false",
		ValueType:        protos.DynconfValueType_BOOL,
		ApplyMode:        protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
		TargetForSetting: protos.DynconfTarget_ALL,
	},
}

Functions

func AdditionalTablesHasOverlap

func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
	additionalTableMappings []*protos.TableMapping,
) bool

func BuildProcessedSchemaMapping

func BuildProcessedSchemaMapping(
	tableMappings []*protos.TableMapping,
	tableNameSchemaMapping map[string]*protos.TableSchema,
	logger log.Logger,
) map[string]*protos.TableSchema

given the output of GetTableSchema, processes it to be used by CDCFlow 1) changes the map key to be the destination table name instead of the source table name 2) performs column exclusion using protos.TableMapping as input.

func CheckMigrationCompleted

func CheckMigrationCompleted(
	ctx context.Context,
	pool shared.CatalogPool,
	flowName string,
	migrationName string,
) (bool, error)

CheckMigrationCompleted checks if a migration has been completed for a given flow

func Decrypt

func Decrypt(ctx context.Context, encKeyID string, payload []byte) ([]byte, error)

func FetchConfigFromDB

func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error)

func GetCatalogConnectionPoolFromEnv

func GetCatalogConnectionPoolFromEnv(ctx context.Context) (shared.CatalogPool, error)

func GetCatalogConnectionStringFromEnv

func GetCatalogConnectionStringFromEnv(ctx context.Context) string

func GetCatalogPostgresConfigFromEnv

func GetCatalogPostgresConfigFromEnv(ctx context.Context) *protos.PostgresConfig

func GetEnvBool

func GetEnvBool(name string, defaultValue bool) bool

func GetEnvString

func GetEnvString(name string, defaultValue string) string

GetEnvString returns the value of the environment variable with the given name or defaultValue if the environment variable is not set.

func GetFlowMetadata

func GetFlowMetadata(ctx context.Context) *protos.FlowContextMetadata

func GetKmsDecryptedEnvBase64EncodedBytes

func GetKmsDecryptedEnvBase64EncodedBytes(ctx context.Context, name string, defaultValue []byte) ([]byte, error)

func GetKmsDecryptedEnvString

func GetKmsDecryptedEnvString(ctx context.Context, name string, defaultValue string) (string, error)

func GetPGConnectionString

func GetPGConnectionString(pgConfig *protos.PostgresConfig, flowName string) string

func GetPeerDBOtelMetricsNamespace

func GetPeerDBOtelMetricsNamespace() string

func GetPeerDBOtelTemporalMetricsExportListEnv

func GetPeerDBOtelTemporalMetricsExportListEnv() string

func GetWorkflowStatus

func GetWorkflowStatus(ctx context.Context, pool shared.CatalogPool,
	workflowID string,
) (protos.FlowStatus, error)

func GetWorkflowStatusByName

func GetWorkflowStatusByName(ctx context.Context, pool shared.CatalogPool,
	flowName string,
) (protos.FlowStatus, error)

func LoadTableSchemaFromCatalog

func LoadTableSchemaFromCatalog(
	ctx context.Context,
	pool shared.CatalogPool,
	flowName string,
	tableName string,
) (*protos.TableSchema, error)

func LoadTableSchemasFromCatalog

func LoadTableSchemasFromCatalog(
	ctx context.Context,
	querier CatalogQuerier,
	flowName string,
	tableNames []string,
) (map[string]*protos.TableSchema, error)

func LoggerFromCtx

func LoggerFromCtx(ctx context.Context) log.Logger

func MarkMigrationCompleted

func MarkMigrationCompleted(
	ctx context.Context,
	pool shared.CatalogPool,
	logger log.Logger,
	flowName string,
	migrationName string,
) error

MarkMigrationCompleted marks a migration as completed for a given flow

func NewContextPropagator

func NewContextPropagator[V any](key TemporalContextKey) workflow.ContextPropagator

func PeerDBAlertingEmailSenderConfigurationSet

func PeerDBAlertingEmailSenderConfigurationSet() string

func PeerDBAlertingEmailSenderRegion

func PeerDBAlertingEmailSenderRegion() string

func PeerDBAlertingEmailSenderReplyToAddresses

func PeerDBAlertingEmailSenderReplyToAddresses() string

Comma-separated reply-to addresses

func PeerDBAlertingEmailSenderSourceEmail

func PeerDBAlertingEmailSenderSourceEmail() string

func PeerDBAlertingGapMinutesAsDuration

func PeerDBAlertingGapMinutesAsDuration(ctx context.Context, env map[string]string) (time.Duration, error)

PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely

func PeerDBAllowedTargets

func PeerDBAllowedTargets() string

func PeerDBApplicationNamePerMirrorName

func PeerDBApplicationNamePerMirrorName(ctx context.Context, env map[string]string) (bool, error)

func PeerDBApplySchemaDeltaToCatalogEnabled

func PeerDBApplySchemaDeltaToCatalogEnabled(ctx context.Context, env map[string]string) (bool, error)

func PeerDBAvroNullableLax

func PeerDBAvroNullableLax(ctx context.Context, env map[string]string) (bool, error)

func PeerDBBigQueryEnableSyncedAtPartitioning

func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[string]string) (bool, error)

PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS, for creating target tables with partitioning by _PEERDB_SYNCED_AT column If true, the target tables will be partitioned by _PEERDB_SYNCED_AT column If false, the target tables will not be partitioned

func PeerDBBigQueryToastMergeChunking

func PeerDBBigQueryToastMergeChunking(ctx context.Context, env map[string]string) (uint32, error)

func PeerDBCDCChannelBufferSize

func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int, error)

func PeerDBCDCDiskSpillMemPercentThreshold

func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context, env map[string]string) (int64, error)

func PeerDBCDCDiskSpillRecordsThreshold

func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context, env map[string]string) (int64, error)

func PeerDBCDCIdleTimeoutSeconds

func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration

env variable doesn't exist anymore, but tests appear to depend on this in lieu of an actual value of IdleTimeoutSeconds

func PeerDBCDCStoreEnabled

func PeerDBCDCStoreEnabled(ctx context.Context, env map[string]string) (bool, error)

func PeerDBCatalogDatabase

func PeerDBCatalogDatabase() string

PEERDB_CATALOG_DATABASE

func PeerDBCatalogHost

func PeerDBCatalogHost() string

PEERDB_CATALOG_HOST

func PeerDBCatalogPassword

func PeerDBCatalogPassword(ctx context.Context) string

PEERDB_CATALOG_PASSWORD

func PeerDBCatalogPort

func PeerDBCatalogPort() uint16

PEERDB_CATALOG_PORT

func PeerDBCatalogRequireTls

func PeerDBCatalogRequireTls() bool

func PeerDBCatalogUser

func PeerDBCatalogUser() string

PEERDB_CATALOG_USER

func PeerDBClickHouseAWSS3BucketName

func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string) (string, error)

func PeerDBClickHouseAllowedDomains

func PeerDBClickHouseAllowedDomains() string

func PeerDBClickHouseClientName

func PeerDBClickHouseClientName(ctx context.Context, env map[string]string) (string, error)

func PeerDBClickHouseInitialLoadAllowNonEmptyTables

func PeerDBClickHouseInitialLoadAllowNonEmptyTables(ctx context.Context, env map[string]string) (bool, error)

func PeerDBClickHouseInitialLoadPartsPerPartition

func PeerDBClickHouseInitialLoadPartsPerPartition(ctx context.Context, env map[string]string) (uint64, error)

func PeerDBClickHouseMaxInsertThreads

func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error)

func PeerDBClickHouseParallelNormalize

func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error)

func PeerDBCurrentEncKey

func PeerDBCurrentEncKey(ctx context.Context) (shared.PeerDBEncKey, error)

func PeerDBCurrentEncKeyID

func PeerDBCurrentEncKeyID() string

func PeerDBDeploymentUID

func PeerDBDeploymentUID() string

PEERDB_DEPLOYMENT_UID

func PeerDBDeploymentVersion

func PeerDBDeploymentVersion() string

func PeerDBEnableClickHouseJSON

func PeerDBEnableClickHouseJSON(ctx context.Context, env map[string]string) (bool, error)

func PeerDBEnableClickHouseNumericAsString

func PeerDBEnableClickHouseNumericAsString(ctx context.Context, env map[string]string) (bool, error)

func PeerDBEnableClickHousePrimaryUpdate

func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error)

func PeerDBEnableWALHeartbeat

func PeerDBEnableWALHeartbeat(ctx context.Context, env map[string]string) (bool, error)

func PeerDBEncKeys

func PeerDBEncKeys(ctx context.Context) shared.PeerDBEncKeys

func PeerDBFlowWorkerMaxMemBytes

func PeerDBFlowWorkerMaxMemBytes() uint64

GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum

func PeerDBForceInternalVersion

func PeerDBForceInternalVersion(ctx context.Context, env map[string]string) (uint32, error)

func PeerDBFullRefreshOverwriteMode

func PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error)

func PeerDBGetIncidentIoToken

func PeerDBGetIncidentIoToken() string

func PeerDBGetIncidentIoUrl

func PeerDBGetIncidentIoUrl() string

func PeerDBGroupNormalize

func PeerDBGroupNormalize(ctx context.Context, env map[string]string) (int64, error)

func PeerDBIntervalSinceLastNormalizeThresholdMinutes

func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env map[string]string) (uint32, error)

PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES, 0 disables normalize gap alerting entirely

func PeerDBMaintenanceModeEnabled

func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (bool, error)

func PeerDBMaintenanceModeWaitAlertSeconds

func PeerDBMaintenanceModeWaitAlertSeconds() int

PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS is how long to wait before alerting that peerdb's been stuck in maintenance mode too long

func PeerDBMetricsRecordAggregatesEnabled

func PeerDBMetricsRecordAggregatesEnabled(ctx context.Context, env map[string]string) (bool, error)

func PeerDBNormalizeBufferSize

func PeerDBNormalizeBufferSize(ctx context.Context, env map[string]string) (int64, error)

func PeerDBNullable

func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error)

func PeerDBOnlyClickHouseAllowed

func PeerDBOnlyClickHouseAllowed() bool

func PeerDBOpenConnectionsAlertThreshold

func PeerDBOpenConnectionsAlertThreshold(ctx context.Context, env map[string]string) (uint32, error)

PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely

func PeerDBOriginMetaAsDestinationColumn

func PeerDBOriginMetaAsDestinationColumn(ctx context.Context, env map[string]string) (bool, error)

func PeerDBPKMEmptyBatchThrottleThresholdSeconds

func PeerDBPKMEmptyBatchThrottleThresholdSeconds(ctx context.Context, env map[string]string) (int64, error)

func PeerDBPostgresCDCHandleInheritanceForNonPartitionedTables

func PeerDBPostgresCDCHandleInheritanceForNonPartitionedTables(ctx context.Context, env map[string]string) (bool, error)

func PeerDBPostgresEnableFailoverSlots

func PeerDBPostgresEnableFailoverSlots(ctx context.Context, env map[string]string) (bool, error)

func PeerDBPostgresWalSenderTimeout

func PeerDBPostgresWalSenderTimeout(ctx context.Context, env map[string]string) (string, error)

func PeerDBQueueFlushTimeoutSeconds

func PeerDBQueueFlushTimeoutSeconds(ctx context.Context, env map[string]string) (time.Duration, error)

func PeerDBQueueForceTopicCreation

func PeerDBQueueForceTopicCreation(ctx context.Context, env map[string]string) (bool, error)

Kafka has topic auto create as an option, auto.create.topics.enable But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub

func PeerDBQueueParallelism

func PeerDBQueueParallelism(ctx context.Context, env map[string]string) (int64, error)

func PeerDBRAPIRequestLoggingEnabled

func PeerDBRAPIRequestLoggingEnabled(ctx context.Context) bool

func PeerDBReconnectAfterBatches

func PeerDBReconnectAfterBatches(ctx context.Context, env map[string]string) (int32, error)

func PeerDBS3BytesPerAvroFile

func PeerDBS3BytesPerAvroFile(ctx context.Context, env map[string]string) (int64, error)

func PeerDBS3PartSize

func PeerDBS3PartSize(ctx context.Context, env map[string]string) (int64, error)

func PeerDBS3UuidPrefix

func PeerDBS3UuidPrefix(ctx context.Context, env map[string]string) (bool, error)

func PeerDBSkipSnapshotExport

func PeerDBSkipSnapshotExport(ctx context.Context, env map[string]string) (bool, error)

func PeerDBSlotLagMBAlertThreshold

func PeerDBSlotLagMBAlertThreshold(ctx context.Context, env map[string]string) (uint32, error)

PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely

func PeerDBSnowflakeAutoCompress

func PeerDBSnowflakeAutoCompress(ctx context.Context, env map[string]string) (bool, error)

func PeerDBSnowflakeMergeParallelism

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error)

func PeerDBSnowflakeSkipCompression

func PeerDBSnowflakeSkipCompression(ctx context.Context, env map[string]string) (bool, error)

func PeerDBSourceSchemaAsDestinationColumn

func PeerDBSourceSchemaAsDestinationColumn(ctx context.Context, env map[string]string) (bool, error)

func PeerDBTelemetryAWSSNSTopicArn

func PeerDBTelemetryAWSSNSTopicArn() string

PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN

func PeerDBTelemetrySenderSendErrorAlertsEnabled

func PeerDBTelemetrySenderSendErrorAlertsEnabled(ctx context.Context) bool

PEERDB_TELEMETRY_SENDER_SEND_ERROR_ALERTS_ENABLED is whether to send error alerts to the telemetry sender

func PeerDBTemporalClientCert

func PeerDBTemporalClientCert(ctx context.Context) ([]byte, error)

func PeerDBTemporalClientCertPath

func PeerDBTemporalClientCertPath() string

func PeerDBTemporalClientKey

func PeerDBTemporalClientKey(ctx context.Context) ([]byte, error)

func PeerDBTemporalClientKeyPath

func PeerDBTemporalClientKeyPath() string

func PeerDBTemporalEnableCertAuth

func PeerDBTemporalEnableCertAuth() bool

func PeerDBVersionShaShort

func PeerDBVersionShaShort() string

PEERDB_VERSION_SHA_SHORT

func PeerDBWALHeartbeatQuery

func PeerDBWALHeartbeatQuery(ctx context.Context, env map[string]string) (string, error)

func PeerFlowTaskQueueName

func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string

func ReadModifyWriteTableSchemasToCatalog

func ReadModifyWriteTableSchemasToCatalog(
	ctx context.Context,
	catalogPool shared.CatalogPool,
	logger log.Logger,
	flowName string,
	tableNames []string,
	modifyFn func(map[string]*protos.TableSchema) (map[string]*protos.TableSchema, error),
) error

func RunMigrationOnce

func RunMigrationOnce(
	ctx context.Context,
	pool shared.CatalogPool,
	logger log.Logger,
	flowName string,
	migrationName string,
	migrationFunc func(ctx context.Context) error,
) error

RunMigrationOnce runs a migration function only if it hasn't been completed for the given flow

func SlogLoggerFromCtx

func SlogLoggerFromCtx(ctx context.Context) *slog.Logger

func UpdateCDCConfigInCatalog

func UpdateCDCConfigInCatalog(ctx context.Context, pool shared.CatalogPool,
	logger log.Logger, cfg *protos.FlowConnectionConfigsCore,
) error

func UpdateDynamicSetting

func UpdateDynamicSetting(ctx context.Context, pool shared.CatalogPool, name string, value *string) error

func UpdateFlowStatusInCatalog

func UpdateFlowStatusInCatalog(ctx context.Context, pool shared.CatalogPool,
	workflowID string, status protos.FlowStatus,
) (protos.FlowStatus, error)

func UpdateFlowStatusWithNameInCatalog

func UpdateFlowStatusWithNameInCatalog(ctx context.Context, pool shared.CatalogPool,
	flowName string, status protos.FlowStatus,
) (protos.FlowStatus, error)

func UpdatePeerDBMaintenanceModeEnabled

func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool shared.CatalogPool, enabled bool) error

func UpdateTableOIDsInTableSchemaInCatalog

func UpdateTableOIDsInTableSchemaInCatalog(
	ctx context.Context,
	pool shared.CatalogPool,
	logger log.Logger,
	flowName string,
	tableOIDs map[string]uint32,
) error

TODO: use ReadModifyWriteTableSchemasToCatalog to guarantee transactionality

func WithOperationContext

func WithOperationContext(ctx context.Context, operation protos.FlowOperation) context.Context

Types

type AdditionalContextMetadata

type AdditionalContextMetadata struct {
	Operation protos.FlowOperation
}

AdditionalContextMetadata has contextual information of a flow and is specific to a child context and is passed by copy

func GetAdditionalMetadata

func GetAdditionalMetadata(ctx context.Context) AdditionalContextMetadata

type BinaryFormat

type BinaryFormat int

func PeerDBBinaryFormat

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error)

type CatalogQuerier

type CatalogQuerier interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}

support both catalog pool and pgx.Tx

type ContextPropagator

type ContextPropagator[V any] struct {
	Key TemporalContextKey
}

func (*ContextPropagator[V]) Extract

func (*ContextPropagator[V]) ExtractToWorkflow

func (c *ContextPropagator[V]) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error)

func (*ContextPropagator[V]) Inject

func (c *ContextPropagator[V]) Inject(ctx context.Context, writer workflow.HeaderWriter) error

func (*ContextPropagator[V]) InjectFromWorkflow

func (c *ContextPropagator[V]) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error

type PeerDBOAuthConfig

type PeerDBOAuthConfig struct {
	// there can be more complex use cases where domain != issuer, but we handle them later if required
	OAuthIssuerUrl string `json:"oauth_issuer_url"`
	KeySetJson     string `json:"key_set_json"`
	// This is a custom claim we may wish to validate (if needed)
	OAuthJwtClaimKey string `json:"oauth_jwt_claim_key"`
	OAuthClaimValue  string `json:"oauth_jwt_claim_value"`
	// Enabling uses /.well-known/ OpenID discovery endpoints, thus key-set etc. don't need to be specified
	OAuthDiscoveryEnabled bool `json:"oauth_discovery_enabled"`
}

func GetPeerDBOAuthConfig

func GetPeerDBOAuthConfig(ctx context.Context) PeerDBOAuthConfig

type TemporalContextKey

type TemporalContextKey string
const (
	FlowMetadataKey       TemporalContextKey = "x-peerdb-flow-metadata"
	AdditionalMetadataKey TemporalContextKey = "x-peerdb-additional-metadata"
)

func (TemporalContextKey) HeaderKey

func (k TemporalContextKey) HeaderKey() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL