Documentation
¶
Index ¶
- Constants
- Variables
- func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping, ...) bool
- func BuildProcessedSchemaMapping(tableMappings []*protos.TableMapping, ...) map[string]*protos.TableSchema
- func CheckMigrationCompleted(ctx context.Context, pool shared.CatalogPool, flowName string, ...) (bool, error)
- func Decrypt(ctx context.Context, encKeyID string, payload []byte) ([]byte, error)
- func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error)
- func GetCatalogConnectionPoolFromEnv(ctx context.Context) (shared.CatalogPool, error)
- func GetCatalogConnectionStringFromEnv(ctx context.Context) string
- func GetCatalogPostgresConfigFromEnv(ctx context.Context) *protos.PostgresConfig
- func GetEnvBool(name string, defaultValue bool) bool
- func GetEnvString(name string, defaultValue string) string
- func GetFlowMetadata(ctx context.Context) *protos.FlowContextMetadata
- func GetKmsDecryptedEnvBase64EncodedBytes(ctx context.Context, name string, defaultValue []byte) ([]byte, error)
- func GetKmsDecryptedEnvString(ctx context.Context, name string, defaultValue string) (string, error)
- func GetPGConnectionString(pgConfig *protos.PostgresConfig, flowName string) string
- func GetPeerDBOtelMetricsNamespace() string
- func GetPeerDBOtelTemporalMetricsExportListEnv() string
- func GetWorkflowStatus(ctx context.Context, pool shared.CatalogPool, workflowID string) (protos.FlowStatus, error)
- func GetWorkflowStatusByName(ctx context.Context, pool shared.CatalogPool, flowName string) (protos.FlowStatus, error)
- func LoadTableSchemaFromCatalog(ctx context.Context, pool shared.CatalogPool, flowName string, ...) (*protos.TableSchema, error)
- func LoadTableSchemasFromCatalog(ctx context.Context, querier CatalogQuerier, flowName string, ...) (map[string]*protos.TableSchema, error)
- func LoggerFromCtx(ctx context.Context) log.Logger
- func MarkMigrationCompleted(ctx context.Context, pool shared.CatalogPool, logger log.Logger, ...) error
- func NewContextPropagator[V any](key TemporalContextKey) workflow.ContextPropagator
- func PeerDBAlertingEmailSenderConfigurationSet() string
- func PeerDBAlertingEmailSenderRegion() string
- func PeerDBAlertingEmailSenderReplyToAddresses() string
- func PeerDBAlertingEmailSenderSourceEmail() string
- func PeerDBAlertingGapMinutesAsDuration(ctx context.Context, env map[string]string) (time.Duration, error)
- func PeerDBAllowedTargets() string
- func PeerDBApplicationNamePerMirrorName(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBApplySchemaDeltaToCatalogEnabled(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBAvroNullableLax(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBBigQueryToastMergeChunking(ctx context.Context, env map[string]string) (uint32, error)
- func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int, error)
- func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration
- func PeerDBCDCStoreEnabled(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBCatalogDatabase() string
- func PeerDBCatalogHost() string
- func PeerDBCatalogPassword(ctx context.Context) string
- func PeerDBCatalogPort() uint16
- func PeerDBCatalogRequireTls() bool
- func PeerDBCatalogUser() string
- func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string) (string, error)
- func PeerDBClickHouseAllowedDomains() string
- func PeerDBClickHouseClientName(ctx context.Context, env map[string]string) (string, error)
- func PeerDBClickHouseInitialLoadAllowNonEmptyTables(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBClickHouseInitialLoadPartsPerPartition(ctx context.Context, env map[string]string) (uint64, error)
- func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error)
- func PeerDBCurrentEncKey(ctx context.Context) (shared.PeerDBEncKey, error)
- func PeerDBCurrentEncKeyID() string
- func PeerDBDeploymentUID() string
- func PeerDBDeploymentVersion() string
- func PeerDBEnableClickHouseJSON(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBEnableClickHouseNumericAsString(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBEnableWALHeartbeat(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBEncKeys(ctx context.Context) shared.PeerDBEncKeys
- func PeerDBFlowWorkerMaxMemBytes() uint64
- func PeerDBForceInternalVersion(ctx context.Context, env map[string]string) (uint32, error)
- func PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBGetIncidentIoToken() string
- func PeerDBGetIncidentIoUrl() string
- func PeerDBGroupNormalize(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env map[string]string) (uint32, error)
- func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBMaintenanceModeWaitAlertSeconds() int
- func PeerDBMetricsRecordAggregatesEnabled(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBNormalizeBufferSize(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBOnlyClickHouseAllowed() bool
- func PeerDBOpenConnectionsAlertThreshold(ctx context.Context, env map[string]string) (uint32, error)
- func PeerDBOriginMetaAsDestinationColumn(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBPKMEmptyBatchThrottleThresholdSeconds(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBPostgresCDCHandleInheritanceForNonPartitionedTables(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBPostgresEnableFailoverSlots(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBPostgresWalSenderTimeout(ctx context.Context, env map[string]string) (string, error)
- func PeerDBQueueFlushTimeoutSeconds(ctx context.Context, env map[string]string) (time.Duration, error)
- func PeerDBQueueForceTopicCreation(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBQueueParallelism(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBRAPIRequestLoggingEnabled(ctx context.Context) bool
- func PeerDBReconnectAfterBatches(ctx context.Context, env map[string]string) (int32, error)
- func PeerDBS3BytesPerAvroFile(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBS3PartSize(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBS3UuidPrefix(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBSkipSnapshotExport(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBSlotLagMBAlertThreshold(ctx context.Context, env map[string]string) (uint32, error)
- func PeerDBSnowflakeAutoCompress(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error)
- func PeerDBSnowflakeSkipCompression(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBSourceSchemaAsDestinationColumn(ctx context.Context, env map[string]string) (bool, error)
- func PeerDBTelemetryAWSSNSTopicArn() string
- func PeerDBTelemetrySenderSendErrorAlertsEnabled(ctx context.Context) bool
- func PeerDBTemporalClientCert(ctx context.Context) ([]byte, error)
- func PeerDBTemporalClientCertPath() string
- func PeerDBTemporalClientKey(ctx context.Context) ([]byte, error)
- func PeerDBTemporalClientKeyPath() string
- func PeerDBTemporalEnableCertAuth() bool
- func PeerDBVersionShaShort() string
- func PeerDBWALHeartbeatQuery(ctx context.Context, env map[string]string) (string, error)
- func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string
- func ReadModifyWriteTableSchemasToCatalog(ctx context.Context, catalogPool shared.CatalogPool, logger log.Logger, ...) error
- func RunMigrationOnce(ctx context.Context, pool shared.CatalogPool, logger log.Logger, ...) error
- func SlogLoggerFromCtx(ctx context.Context) *slog.Logger
- func UpdateCDCConfigInCatalog(ctx context.Context, pool shared.CatalogPool, logger log.Logger, ...) error
- func UpdateDynamicSetting(ctx context.Context, pool shared.CatalogPool, name string, value *string) error
- func UpdateFlowStatusInCatalog(ctx context.Context, pool shared.CatalogPool, workflowID string, ...) (protos.FlowStatus, error)
- func UpdateFlowStatusWithNameInCatalog(ctx context.Context, pool shared.CatalogPool, flowName string, ...) (protos.FlowStatus, error)
- func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool shared.CatalogPool, enabled bool) error
- func UpdateTableOIDsInTableSchemaInCatalog(ctx context.Context, pool shared.CatalogPool, logger log.Logger, ...) error
- func WithOperationContext(ctx context.Context, operation protos.FlowOperation) context.Context
- type AdditionalContextMetadata
- type BinaryFormat
- type CatalogQuerier
- type ContextPropagator
- func (c *ContextPropagator[V]) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error)
- func (c *ContextPropagator[V]) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error)
- func (c *ContextPropagator[V]) Inject(ctx context.Context, writer workflow.HeaderWriter) error
- func (c *ContextPropagator[V]) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error
- type PeerDBOAuthConfig
- type TemporalContextKey
Constants ¶
const ( BinaryFormatInvalid = iota BinaryFormatRaw BinaryFormatBase64 BinaryFormatHex )
const (
DefaultPeerDBS3PartSize int64 = 64 * 1024 * 1024 // 64MiB
)
const (
KmsKeyIDEnvVar = "PEERDB_KMS_KEY_ID"
)
Variables ¶
var DynamicIndex = func() map[string]int { defaults := make(map[string]int, len(DynamicSettings)) for i, setting := range DynamicSettings { defaults[setting.Name] = i } return defaults }()
var DynamicSettings = [...]*protos.DynamicSetting{ { Name: "PEERDB_CDC_CHANNEL_BUFFER_SIZE", Description: "Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC", DefaultValue: "131072", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE", Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalization", DefaultValue: "128", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_GROUP_NORMALIZE", Description: "Controls whether normalize applies to one batch at a time, or all pending batches", DefaultValue: "4", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only", DefaultValue: "10", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_QUEUES, }, { Name: "PEERDB_QUEUE_PARALLELISM", Description: "Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub", DefaultValue: "4", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_QUEUES, }, { Name: "PEERDB_CDC_STORE_ENABLED", Description: "Controls whether to enable the store for recovering unchanged Postgres TOAST values within a CDC batch", DefaultValue: "true", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", Description: "CDC: number of records beyond which records are written to disk instead", DefaultValue: "1000000", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", Description: "CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables", DefaultValue: "-1", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_ENABLE_WAL_HEARTBEAT", Description: "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity", DefaultValue: "true", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_WAL_HEARTBEAT_QUERY", DefaultValue: "SELECT pg_logical_emit_message(false,'peerdb_heartbeat','')", ValueType: protos.DynconfValueType_STRING, Description: "SQL to run during each WAL heartbeat", ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_RECONNECT_AFTER_BATCHES", Description: "Force peerdb to reconnect connection to source after N batches", DefaultValue: "0", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_FULL_REFRESH_OVERWRITE_MODE", Description: "Enables full refresh mode for query replication mirrors of overwrite type", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, }, { Name: "PEERDB_NULLABLE", Description: "Propagate nullability in schema", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_AVRO_NULLABLE_LAX", Description: "Make all columns nullable in initial load Avro schema (disables strict nullable checking)", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit", DefaultValue: "8", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_SNOWFLAKE, }, { Name: "PEERDB_SNOWFLAKE_AUTO_COMPRESS", Description: "AUTO_COMPRESS option when uploading to Snowflake", DefaultValue: "true", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_SNOWFLAKE, }, { Name: "PEERDB_SNOWFLAKE_SKIP_COMPRESSION", Description: "Use `NULL` compression when creating Avro files to be uploaded to Snowflake directly", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_SNOWFLAKE, }, { Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT", Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64", DefaultValue: "raw", ValueType: protos.DynconfValueType_STRING, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME", Description: "S3 buckets to store Avro files for mirrors with ClickHouse target", DefaultValue: "", ValueType: protos.DynconfValueType_STRING, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_S3_UUID_PREFIX", Description: "Use random UUID as prefix instead of flow name, can help partitioning on non-AWS based s3 providers", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_S3_PART_SIZE", Description: "S3 upload part size in bytes, may need to increase for large batches. " + "https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html", DefaultValue: strconv.FormatInt(DefaultPeerDBS3PartSize, 10), ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_S3_BYTES_PER_AVRO_FILE", Description: "S3 upload chunk size in bytes, needed for large unpartitioned initial loads.", DefaultValue: "1000000000", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, }, { Name: "PEERDB_QUEUE_FORCE_TOPIC_CREATION", Description: "Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_QUEUES, }, { Name: "PEERDB_ALERTING_GAP_MINUTES", Description: "Duration in minutes before reraising alerts, 0 disables all alerting entirely", DefaultValue: "15", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", Description: "Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely", DefaultValue: "5000", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", Description: "Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely", DefaultValue: "5", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS", Description: "BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_BIGQUERY, }, { Name: "PEERDB_BIGQUERY_TOAST_MERGE_CHUNKING", Description: "BigQuery only: controls number of unchanged toast columns merged per statement in normalization. " + "Avoids statements growing too large", DefaultValue: "8", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_BIGQUERY, }, { Name: "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE", Description: "Enable generating deletion records for updates in ClickHouse, avoids stale records when primary key updated", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS", Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table. Setting left unset when 0", DefaultValue: "0", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE", Description: "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes", DefaultValue: "0", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING", Description: "Map unbounded numerics in Postgres to String in ClickHouse to preserve precision and scale", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_ENABLE_JSON", Description: "Map JSON datatype from source to JSON in ClickHouse instead of String", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_CLIENT_NAME", Description: "Client name to pass to ClickHouse Client", DefaultValue: "peerdb", ValueType: protos.DynconfValueType_STRING, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES", Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely", DefaultValue: "240", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME", Description: "Set Postgres application_name to have mirror name as suffix for each mirror", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_MAINTENANCE_MODE_ENABLED", Description: "Whether PeerDB is in maintenance mode, which disables any modifications to mirrors", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_PKM_EMPTY_BATCH_THROTTLE_THRESHOLD_SECONDS", Description: "Throttle threshold seconds for always sending KeepAlive response when no records are processed, " + "-1 disables always sending responses when no records are processed", DefaultValue: "60", ValueType: protos.DynconfValueType_INT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_CLICKHOUSE_INITIAL_LOAD_PARTS_PER_PARTITION", Description: "Chunk partitions in initial load into N queries, can help mitigate OOM issues on ClickHouse", DefaultValue: "1", ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_CLICKHOUSE_INITIAL_LOAD_ALLOW_NON_EMPTY_TABLES", Description: "Disables validation raising error if destination table of initial load is not empty", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, { Name: "PEERDB_SKIP_SNAPSHOT_EXPORT", Description: "This avoids initial load failing due to connectivity drops, but risks data consistency unless precautions are taken", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_SOURCE_SCHEMA_AS_DESTINATION_COLUMN", Description: "Ingest source schema as column to destination. " + "Useful when multiple tables from source ingest into single table on destination", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_ORIGIN_METADATA_AS_DESTINATION_COLUMN", Description: "Ingest additional metadata fields", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_QUEUES, }, { Name: "PEERDB_POSTGRES_CDC_HANDLE_INHERITANCE_FOR_NON_PARTITIONED_TABLES", Description: "For Postgres CDC: attempt to fetch/remap child tables for tables that aren't partitioned by Postgres." + "Useful for tables that are partitioned by extensions or table inheritance", DefaultValue: "true", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_FORCE_INTERNAL_VERSION", Description: "Forces mirrors to be created with a different internal version than the latest peerdb internal version.", DefaultValue: strconv.FormatUint(uint64(shared.InternalVersion_Latest), 10), ValueType: protos.DynconfValueType_UINT, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_UI_MAINTENANCE_TAB_ENABLED", Description: "Enable/disable the maintenance tab in the PeerDB UI", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_POSTGRES_ENABLE_FAILOVER_SLOTS", Description: "Create slots with failover enabled when possible", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_METRICS_RECORD_AGGREGATES_ENABLED", Description: "Enable/disable recording of aggregate metrics", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_POSTGRES_WAL_SENDER_TIMEOUT", Description: "wal_sender_timeout value passed for Postgres CDC. \"NONE\" means no override, leaving it up to the source DB", DefaultValue: "120s", ValueType: protos.DynconfValueType_STRING, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, { Name: "PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG", Description: "Apply schema deltas to catalog instead of fetching latest schema from source", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, }
Functions ¶
func AdditionalTablesHasOverlap ¶
func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping, additionalTableMappings []*protos.TableMapping, ) bool
func BuildProcessedSchemaMapping ¶
func BuildProcessedSchemaMapping( tableMappings []*protos.TableMapping, tableNameSchemaMapping map[string]*protos.TableSchema, logger log.Logger, ) map[string]*protos.TableSchema
given the output of GetTableSchema, processes it to be used by CDCFlow 1) changes the map key to be the destination table name instead of the source table name 2) performs column exclusion using protos.TableMapping as input.
func CheckMigrationCompleted ¶
func CheckMigrationCompleted( ctx context.Context, pool shared.CatalogPool, flowName string, migrationName string, ) (bool, error)
CheckMigrationCompleted checks if a migration has been completed for a given flow
func FetchConfigFromDB ¶
func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error)
func GetCatalogConnectionPoolFromEnv ¶
func GetCatalogConnectionPoolFromEnv(ctx context.Context) (shared.CatalogPool, error)
func GetCatalogPostgresConfigFromEnv ¶
func GetCatalogPostgresConfigFromEnv(ctx context.Context) *protos.PostgresConfig
func GetEnvBool ¶
func GetEnvString ¶
GetEnvString returns the value of the environment variable with the given name or defaultValue if the environment variable is not set.
func GetFlowMetadata ¶
func GetFlowMetadata(ctx context.Context) *protos.FlowContextMetadata
func GetPGConnectionString ¶
func GetPGConnectionString(pgConfig *protos.PostgresConfig, flowName string) string
func GetPeerDBOtelMetricsNamespace ¶
func GetPeerDBOtelMetricsNamespace() string
func GetPeerDBOtelTemporalMetricsExportListEnv ¶
func GetPeerDBOtelTemporalMetricsExportListEnv() string
func GetWorkflowStatus ¶
func GetWorkflowStatus(ctx context.Context, pool shared.CatalogPool, workflowID string, ) (protos.FlowStatus, error)
func GetWorkflowStatusByName ¶
func GetWorkflowStatusByName(ctx context.Context, pool shared.CatalogPool, flowName string, ) (protos.FlowStatus, error)
func LoadTableSchemaFromCatalog ¶
func LoadTableSchemaFromCatalog( ctx context.Context, pool shared.CatalogPool, flowName string, tableName string, ) (*protos.TableSchema, error)
func LoadTableSchemasFromCatalog ¶
func LoadTableSchemasFromCatalog( ctx context.Context, querier CatalogQuerier, flowName string, tableNames []string, ) (map[string]*protos.TableSchema, error)
func MarkMigrationCompleted ¶
func MarkMigrationCompleted( ctx context.Context, pool shared.CatalogPool, logger log.Logger, flowName string, migrationName string, ) error
MarkMigrationCompleted marks a migration as completed for a given flow
func NewContextPropagator ¶
func NewContextPropagator[V any](key TemporalContextKey) workflow.ContextPropagator
func PeerDBAlertingEmailSenderConfigurationSet ¶
func PeerDBAlertingEmailSenderConfigurationSet() string
func PeerDBAlertingEmailSenderRegion ¶
func PeerDBAlertingEmailSenderRegion() string
func PeerDBAlertingEmailSenderReplyToAddresses ¶
func PeerDBAlertingEmailSenderReplyToAddresses() string
Comma-separated reply-to addresses
func PeerDBAlertingEmailSenderSourceEmail ¶
func PeerDBAlertingEmailSenderSourceEmail() string
func PeerDBAlertingGapMinutesAsDuration ¶
func PeerDBAlertingGapMinutesAsDuration(ctx context.Context, env map[string]string) (time.Duration, error)
PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAllowedTargets ¶
func PeerDBAllowedTargets() string
func PeerDBAvroNullableLax ¶
func PeerDBBigQueryEnableSyncedAtPartitioning ¶
func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[string]string) (bool, error)
PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS, for creating target tables with partitioning by _PEERDB_SYNCED_AT column If true, the target tables will be partitioned by _PEERDB_SYNCED_AT column If false, the target tables will not be partitioned
func PeerDBCDCIdleTimeoutSeconds ¶
env variable doesn't exist anymore, but tests appear to depend on this in lieu of an actual value of IdleTimeoutSeconds
func PeerDBCDCStoreEnabled ¶
func PeerDBCatalogPassword ¶
PEERDB_CATALOG_PASSWORD
func PeerDBCatalogRequireTls ¶
func PeerDBCatalogRequireTls() bool
func PeerDBClickHouseAllowedDomains ¶
func PeerDBClickHouseAllowedDomains() string
func PeerDBCurrentEncKey ¶
func PeerDBCurrentEncKey(ctx context.Context) (shared.PeerDBEncKey, error)
func PeerDBCurrentEncKeyID ¶
func PeerDBCurrentEncKeyID() string
func PeerDBDeploymentVersion ¶
func PeerDBDeploymentVersion() string
func PeerDBEncKeys ¶
func PeerDBEncKeys(ctx context.Context) shared.PeerDBEncKeys
func PeerDBFlowWorkerMaxMemBytes ¶
func PeerDBFlowWorkerMaxMemBytes() uint64
GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBGetIncidentIoToken ¶
func PeerDBGetIncidentIoToken() string
func PeerDBGetIncidentIoUrl ¶
func PeerDBGetIncidentIoUrl() string
func PeerDBGroupNormalize ¶
func PeerDBIntervalSinceLastNormalizeThresholdMinutes ¶
func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env map[string]string) (uint32, error)
PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES, 0 disables normalize gap alerting entirely
func PeerDBMaintenanceModeWaitAlertSeconds ¶
func PeerDBMaintenanceModeWaitAlertSeconds() int
PEERDB_MAINTENANCE_MODE_WAIT_ALERT_SECONDS is how long to wait before alerting that peerdb's been stuck in maintenance mode too long
func PeerDBOnlyClickHouseAllowed ¶
func PeerDBOnlyClickHouseAllowed() bool
func PeerDBOpenConnectionsAlertThreshold ¶
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context, env map[string]string) (uint32, error)
PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBQueueForceTopicCreation ¶
Kafka has topic auto create as an option, auto.create.topics.enable But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub
func PeerDBQueueParallelism ¶
func PeerDBS3PartSize ¶
func PeerDBS3UuidPrefix ¶
func PeerDBSlotLagMBAlertThreshold ¶
PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBTelemetryAWSSNSTopicArn ¶
func PeerDBTelemetryAWSSNSTopicArn() string
PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetrySenderSendErrorAlertsEnabled ¶
PEERDB_TELEMETRY_SENDER_SEND_ERROR_ALERTS_ENABLED is whether to send error alerts to the telemetry sender
func PeerDBTemporalClientCertPath ¶
func PeerDBTemporalClientCertPath() string
func PeerDBTemporalClientKeyPath ¶
func PeerDBTemporalClientKeyPath() string
func PeerDBTemporalEnableCertAuth ¶
func PeerDBTemporalEnableCertAuth() bool
func PeerDBWALHeartbeatQuery ¶
func PeerFlowTaskQueueName ¶
func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string
func RunMigrationOnce ¶
func RunMigrationOnce( ctx context.Context, pool shared.CatalogPool, logger log.Logger, flowName string, migrationName string, migrationFunc func(ctx context.Context) error, ) error
RunMigrationOnce runs a migration function only if it hasn't been completed for the given flow
func UpdateCDCConfigInCatalog ¶
func UpdateCDCConfigInCatalog(ctx context.Context, pool shared.CatalogPool, logger log.Logger, cfg *protos.FlowConnectionConfigsCore, ) error
func UpdateDynamicSetting ¶
func UpdateFlowStatusInCatalog ¶
func UpdateFlowStatusInCatalog(ctx context.Context, pool shared.CatalogPool, workflowID string, status protos.FlowStatus, ) (protos.FlowStatus, error)
func UpdateFlowStatusWithNameInCatalog ¶
func UpdateFlowStatusWithNameInCatalog(ctx context.Context, pool shared.CatalogPool, flowName string, status protos.FlowStatus, ) (protos.FlowStatus, error)
func UpdateTableOIDsInTableSchemaInCatalog ¶
func UpdateTableOIDsInTableSchemaInCatalog( ctx context.Context, pool shared.CatalogPool, logger log.Logger, flowName string, tableOIDs map[string]uint32, ) error
TODO: use ReadModifyWriteTableSchemasToCatalog to guarantee transactionality
func WithOperationContext ¶
Types ¶
type AdditionalContextMetadata ¶
type AdditionalContextMetadata struct {
Operation protos.FlowOperation
}
AdditionalContextMetadata has contextual information of a flow and is specific to a child context and is passed by copy
func GetAdditionalMetadata ¶
func GetAdditionalMetadata(ctx context.Context) AdditionalContextMetadata
type CatalogQuerier ¶
type CatalogQuerier interface {
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}
support both catalog pool and pgx.Tx
type ContextPropagator ¶
type ContextPropagator[V any] struct { Key TemporalContextKey }
func (*ContextPropagator[V]) Extract ¶
func (c *ContextPropagator[V]) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error)
func (*ContextPropagator[V]) ExtractToWorkflow ¶
func (c *ContextPropagator[V]) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error)
func (*ContextPropagator[V]) Inject ¶
func (c *ContextPropagator[V]) Inject(ctx context.Context, writer workflow.HeaderWriter) error
func (*ContextPropagator[V]) InjectFromWorkflow ¶
func (c *ContextPropagator[V]) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error
type PeerDBOAuthConfig ¶
type PeerDBOAuthConfig struct {
// there can be more complex use cases where domain != issuer, but we handle them later if required
OAuthIssuerUrl string `json:"oauth_issuer_url"`
KeySetJson string `json:"key_set_json"`
// This is a custom claim we may wish to validate (if needed)
OAuthJwtClaimKey string `json:"oauth_jwt_claim_key"`
OAuthClaimValue string `json:"oauth_jwt_claim_value"`
// Enabling uses /.well-known/ OpenID discovery endpoints, thus key-set etc. don't need to be specified
OAuthDiscoveryEnabled bool `json:"oauth_discovery_enabled"`
}
func GetPeerDBOAuthConfig ¶
func GetPeerDBOAuthConfig(ctx context.Context) PeerDBOAuthConfig
type TemporalContextKey ¶
type TemporalContextKey string
const ( FlowMetadataKey TemporalContextKey = "x-peerdb-flow-metadata" AdditionalMetadataKey TemporalContextKey = "x-peerdb-additional-metadata" )
func (TemporalContextKey) HeaderKey ¶
func (k TemporalContextKey) HeaderKey() string