copy

package
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	COPY_SUCCED = iota
	COPY_SKIPED
	COPY_FAILED
)
View Source
const (
	CopySuccedFileName = "cbcopy_succeed"
	FailedFileName     = "cbcopy_failed"
	SkippedFileName    = "cbcopy_skipped"
)
View Source
const (
	CbcopyTestTable = "public.cbcopy_test"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Application

type Application struct {
	// contains filtered or unexported fields
}

func NewApplication

func NewApplication() *Application

func (*Application) Initialize

func (app *Application) Initialize(cmd *cobra.Command)

func (*Application) Run

func (app *Application) Run(cmd *cobra.Command)

Run is the main entry point for the cbcopy application.

func (*Application) SetFlagDefaults

func (app *Application) SetFlagDefaults(flagSet *pflag.FlagSet)

SetFlagDefaults sets the default values for the command-line flags.

type BaseValidator

type BaseValidator struct {
	// contains filtered or unexported fields
}

BaseValidator provides common validation functionality

func NewBaseValidator

func NewBaseValidator(flags *pflag.FlagSet) *BaseValidator

NewBaseValidator creates a new base validator instance

type ConnectionModeValidator

type ConnectionModeValidator struct {
	*BaseValidator
}

func NewConnectionModeValidator

func NewConnectionModeValidator(flags *pflag.FlagSet) *ConnectionModeValidator

func (*ConnectionModeValidator) Validate

func (v *ConnectionModeValidator) Validate() error

Validate implements the Validator interface

type CopyBase

type CopyBase struct {
	WorkerId             int
	SrcSegmentsHostInfo  []utils.SegmentHostInfo
	DestSegmentsHostInfo []utils.SegmentHostInfo
	ConnectionMode       string
	CompArg              string
}

func (*CopyBase) CommitBegin

func (cc *CopyBase) CommitBegin(conn *dbconn.DBConn) error

func (*CopyBase) FormAllSegsHelperAddress

func (cc *CopyBase) FormAllSegsHelperAddress(ports []HelperPortInfo) (string, string)

func (*CopyBase) FormAllSegsIds

func (cc *CopyBase) FormAllSegsIds() string

func (*CopyBase) FormDestSegsHelperAddress

func (cc *CopyBase) FormDestSegsHelperAddress(ports []HelperPortInfo) (string, string)

func (*CopyBase) FormMasterHelperAddress

func (cc *CopyBase) FormMasterHelperAddress(ports []HelperPortInfo) (string, string)

func (*CopyBase) FormSrcSegsHelperAddress

func (cc *CopyBase) FormSrcSegsHelperAddress(ports []HelperPortInfo) (string, string)

type CopyCommand

type CopyCommand interface {
	CopyTo(conn *dbconn.DBConn, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)
	CopyFrom(conn *dbconn.DBConn, ctx context.Context, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)
	IsCopyFromStarted(rows int64) bool
	IsMasterCopy() bool
}

func CreateCopyStrategy

func CreateCopyStrategy(isReplicated bool,
	numTuples int64,
	workerId int,
	srcSegs []utils.SegmentHostInfo,
	destSegs []utils.SegmentHostInfo,
	srcConn, destConn *dbconn.DBConn,
	connectionMode string) CopyCommand

CreateCopyStrategy creates the appropriate copy strategy based on various factors: - Number of tuples to copy (numTuples) - Number of segments in source and destination clusters (srcSegs, destSegs) - Database versions of source and destination (srcConn.Version, destConn.Version) It returns an instance of a struct that implements the CopyCommand interface.

type CopyManager

type CopyManager struct {
	// contains filtered or unexported fields
}

func NewCopyManager

func NewCopyManager(src, dest, destManageConn, srcManageConn *dbconn.DBConn,
	srcSegmentsHostInfo []utils.SegmentHostInfo,
	destSegmentsIpInfo []utils.SegmentHostInfo,
	timestamp string,
	appName string,
	encodingGuc *SessionGUCs,
	progressBar utils.ProgressBar) *CopyManager

func (*CopyManager) Close

func (m *CopyManager) Close()

func (*CopyManager) Copy

func (m *CopyManager) Copy(tables chan option.TablePair)

Copy manages the concurrent copying of tables from the source to the destination database. It initializes a pool of workers, each responsible for copying a table, and waits for all workers to complete. The function ensures that the table channel is closed when appropriate.

type CopyOnMaster

type CopyOnMaster struct {
	CopyBase
}

func (*CopyOnMaster) CopyFrom

func (com *CopyOnMaster) CopyFrom(conn *dbconn.DBConn, ctx context.Context, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyFrom is part of the CopyCommand interface. It executes the COPY FROM command to receive data into the destination database. The specific implementation varies based on the copy strategy.

func (*CopyOnMaster) CopyTo

func (com *CopyOnMaster) CopyTo(conn *dbconn.DBConn, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyTo is part of the CopyCommand interface. It executes the COPY TO command to send data from the source database. The specific implementation varies based on the copy strategy.

func (*CopyOnMaster) IsCopyFromStarted

func (com *CopyOnMaster) IsCopyFromStarted(rows int64) bool

func (*CopyOnMaster) IsMasterCopy

func (com *CopyOnMaster) IsMasterCopy() bool

type CopyOnSegment

type CopyOnSegment struct {
	CopyBase
}

func (*CopyOnSegment) CopyFrom

func (cos *CopyOnSegment) CopyFrom(conn *dbconn.DBConn, ctx context.Context, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyFrom is the CopyOnSegment strategy's implementation of receiving data. It uses ON SEGMENT clause to execute COPY on each segment.

func (*CopyOnSegment) CopyTo

func (cos *CopyOnSegment) CopyTo(conn *dbconn.DBConn, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyTo is the CopyOnSegment strategy's implementation of sending data. It uses ON SEGMENT clause to execute COPY on each segment.

func (*CopyOnSegment) IsCopyFromStarted

func (cos *CopyOnSegment) IsCopyFromStarted(rows int64) bool

func (*CopyOnSegment) IsMasterCopy

func (cos *CopyOnSegment) IsMasterCopy() bool

type CopyOperation

type CopyOperation struct {
	// contains filtered or unexported fields
}

CopyOperation encapsulates the state for a copy operation

func NewCopyOperation

func NewCopyOperation(command CopyCommand, srcConn, destConn, destManageConn, srcManageConn *dbconn.DBConn,
	srcTable, destTable option.Table, connNum int, connectionMode string) *CopyOperation

NewCopyOperation creates a new CopyOperation instance

func (*CopyOperation) Execute

func (op *CopyOperation) Execute(timestamp string) error

Execute performs the copy operation

type DatabaseValidator

type DatabaseValidator struct {
	*BaseValidator
	// contains filtered or unexported fields
}

DatabaseValidator handles database-related validations

func NewDatabaseValidator

func NewDatabaseValidator(flags *pflag.FlagSet) *DatabaseValidator

NewDatabaseValidator creates a new database validator with default excluded databases

func (*DatabaseValidator) Validate

func (v *DatabaseValidator) Validate() error

Validate implements the Validator interface

type ExtDestGeCopy

type ExtDestGeCopy struct {
	CopyBase
}

func (*ExtDestGeCopy) CopyFrom

func (edgc *ExtDestGeCopy) CopyFrom(conn *dbconn.DBConn, ctx context.Context, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyFrom is the ExtDestGeCopy strategy's implementation of receiving data. It creates an external web table and uses it to load data in parallel.

func (*ExtDestGeCopy) CopyTo

func (edgc *ExtDestGeCopy) CopyTo(conn *dbconn.DBConn, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyTo is the ExtDestGeCopy strategy's implementation of sending data. Used when destination cluster has more segments than source.

func (*ExtDestGeCopy) IsCopyFromStarted

func (edgc *ExtDestGeCopy) IsCopyFromStarted(rows int64) bool

func (*ExtDestGeCopy) IsMasterCopy

func (edgc *ExtDestGeCopy) IsMasterCopy() bool

type ExtDestLtCopy

type ExtDestLtCopy struct {
	CopyBase
}

func (*ExtDestLtCopy) CopyFrom

func (edlc *ExtDestLtCopy) CopyFrom(conn *dbconn.DBConn, ctx context.Context, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyFrom is the ExtDestLtCopy strategy's implementation of receiving data. It creates an external web table and uses it to load data in parallel, specifying the number of source clients for each destination segment.

func (*ExtDestLtCopy) CopyTo

func (edlc *ExtDestLtCopy) CopyTo(conn *dbconn.DBConn, table option.Table, ports []HelperPortInfo, cmdId string) (int64, error)

CopyTo is the ExtDestLtCopy strategy's implementation of sending data. Used when destination cluster has fewer segments than source.

func (*ExtDestLtCopy) IsCopyFromStarted

func (edlc *ExtDestLtCopy) IsCopyFromStarted(rows int64) bool

func (*ExtDestLtCopy) IsMasterCopy

func (edlc *ExtDestLtCopy) IsMasterCopy() bool

type FlagCombinationValidator

type FlagCombinationValidator struct {
	*BaseValidator
	// contains filtered or unexported fields
}

FlagCombinationValidator handles validation of flag combinations

func NewFlagCombinationValidator

func NewFlagCombinationValidator(flags *pflag.FlagSet) *FlagCombinationValidator

NewFlagCombinationValidator creates a new flag combination validator

func (*FlagCombinationValidator) Validate

func (v *FlagCombinationValidator) Validate() error

Validate implements the Validator interface

type HelperPortInfo

type HelperPortInfo struct {
	Content int
	Port    int
}

type MetadataManager

type MetadataManager struct {
	// contains filtered or unexported fields
}

MetadataManager handles all metadata related operations during copy process

func NewMetadataManager

func NewMetadataManager(srcConn, destConn *dbconn.DBConn,
	qm *QueryManager,
	qw *QueryWrapper,
	withGlobal, metaOnly bool,
	timestamp string,

	partNameMap map[string][]string,
	tableMap map[string]string,
	ownerMap map[string]string,
	tablespaceMap map[string]string) *MetadataManager

NewMetadataManager creates a new MetadataManager instance

func (*MetadataManager) Close

func (m *MetadataManager) Close()

func (*MetadataManager) MigrateMetadata

func (m *MetadataManager) MigrateMetadata(srcTables, destTables, nonPhysicalRels []option.Table) (chan option.TablePair, utils.ProgressBar)

MigrateMetadata manages all pre-data operations

func (*MetadataManager) Open

func (m *MetadataManager) Open()

func (*MetadataManager) RestorePostMetadata

func (m *MetadataManager) RestorePostMetadata(dbname, timestamp string)

RestorePostMetadata manages all post-data operations

func (*MetadataManager) Wait

func (m *MetadataManager) Wait()

Wait blocks until metadata migration is complete

type ModeValidator

type ModeValidator struct {
	*BaseValidator
}

ModeValidator handles mode-related validations

func NewModeValidator

func NewModeValidator(flags *pflag.FlagSet) *ModeValidator

NewModeValidator creates a new mode validator instance

func (*ModeValidator) Validate

func (v *ModeValidator) Validate() error

Validate implements the Validator interface

type OwnerMappingValidator

type OwnerMappingValidator struct {
	*BaseValidator
}

OwnerMappingValidator handles owner mapping file validations

func NewOwnerMappingValidator

func NewOwnerMappingValidator(flags *pflag.FlagSet) *OwnerMappingValidator

NewOwnerMappingValidator creates a new owner mapping validator instance

func (*OwnerMappingValidator) Validate

func (v *OwnerMappingValidator) Validate() error

Validate implements the Validator interface

type PartLeafTable

type PartLeafTable struct {
	RootName  string
	LeafName  string
	RelTuples int64
}

PartLeafTable represents a partition leaf table

type PortHelper

type PortHelper struct {
	// contains filtered or unexported fields
}

PortHelper handles port-related operations

func NewPortHelper

func NewPortHelper(conn *dbconn.DBConn) *PortHelper

func (*PortHelper) CreateHelperPortTable

func (ph *PortHelper) CreateHelperPortTable(timestamp string) error

CreateHelperPortTable creates temporary tables for storing helper program ports

func (*PortHelper) GetHelperPortList

func (ph *PortHelper) GetHelperPortList(timestamp string, cmdId string,
	workerId int, isMasterCopy bool) ([]HelperPortInfo, error)

GetHelperPortList retrieves the list of helper program ports

type PortValidator

type PortValidator struct {
	*BaseValidator
}

PortValidator handles port-related validations

func NewPortValidator

func NewPortValidator(flags *pflag.FlagSet) *PortValidator

NewPortValidator creates a new port validator instance

func (*PortValidator) Validate

func (v *PortValidator) Validate() error

Validate implements the Validator interface

type QueryManager

type QueryManager struct {
}

TableManager handles all table-related operations

func NewQueryManager

func NewQueryManager() *QueryManager

func (*QueryManager) CreateDatabaseIfNotExists

func (qm *QueryManager) CreateDatabaseIfNotExists(conn *dbconn.DBConn, dbname string) error

CreateDatabaseIfNotExists creates a database if it doesn't exist

func (*QueryManager) CreateTestTable

func (qm *QueryManager) CreateTestTable(conn *dbconn.DBConn, tableName string) error

CreateTestTable creates an test table with a single integer column

func (*QueryManager) GetAllDatabases

func (qm *QueryManager) GetAllDatabases(conn *dbconn.DBConn) ([]string, error)

GetAllDatabases retrieves all database names from the cluster

func (*QueryManager) GetNonPhysicalRelations

func (qm *QueryManager) GetNonPhysicalRelations(conn *dbconn.DBConn) (map[string]bool, error)

getNonPhysicalRelations returns a map of database relations that don't have direct physical storage (except materialized views), including: - Views (v) - Sequences (S) - Foreign Tables (f) - Materialized Views (m) The returned map uses "schema.name" as key and true as value for all entries.

func (*QueryManager) GetPartitionLeafTables

func (qm *QueryManager) GetPartitionLeafTables(conn *dbconn.DBConn) ([]PartLeafTable, error)

GetPartitionLeafTables retrieves partition leaf tables from the database example output: [{"RootName":"public.t1","LeafName":"public.t1","RelTuples":1000},{"RootName":"public.t2","LeafName":"public.t2","RelTuples":2000}]

func (*QueryManager) GetSessionSetupQuery

func (qm *QueryManager) GetSessionSetupQuery(conn *dbconn.DBConn, applicationName string) string

GetSessionSetupQuery returns the SQL query for setting up database session parameters

func (*QueryManager) GetUserTables

func (tm *QueryManager) GetUserTables(conn *dbconn.DBConn) (map[string]option.TableStatistics, error)

GetUserTables retrieves user tables from the database example output: map[public.t1:{"Partition":0,"RelTuples":1000,"IsReplicated":false} public.t2:{"Partition":0,"RelTuples":2000,"IsReplicated":true}]

func (*QueryManager) IsEmptyTable

func (tm *QueryManager) IsEmptyTable(conn *dbconn.DBConn, schema, table string, workerId int) (bool, error)

IsEmptyTable checks if a table is empty

func (*QueryManager) SchemaExists

func (qm *QueryManager) SchemaExists(conn *dbconn.DBConn, schema string) (bool, error)

SchemaExists checks if a schema exists in the database

type QueryWrapper

type QueryWrapper struct {
	// contains filtered or unexported fields
}

QueryWrapper wraps QueryManager and provides additional data transformation functionality

func NewQueryWrapper

func NewQueryWrapper(qm *QueryManager) *QueryWrapper

NewQueryWrapper creates a new QueryWrapper instance

func (*QueryWrapper) FormUserTableMap

func (qw *QueryWrapper) FormUserTableMap(srcTables, destTables []option.Table) map[string]string

FormUserTableMap creates a map of user tables with their statistics

func (*QueryWrapper) GetDbNameMap

func (qw *QueryWrapper) GetDbNameMap(conn *dbconn.DBConn) map[string]string

GetDbNameMap returns a map of database names based on copy mode

func (*QueryWrapper) GetNonPhysicalRelations

func (qw *QueryWrapper) GetNonPhysicalRelations(conn *dbconn.DBConn, pendingCheckRels []option.Table) []option.Table

func (*QueryWrapper) GetPartitionLeafTables

func (qw *QueryWrapper) GetPartitionLeafTables(conn *dbconn.DBConn, isDest bool) ([]PartLeafTable, error)

GetPartitionLeafTables retrieves partition leaf tables with caching support example output: [{"RootName":"public.t1","LeafName":"public.t1","RelTuples":1000},{"RootName":"public.t2","LeafName":"public.t2","RelTuples":2000}]

func (*QueryWrapper) GetRootPartTables

func (qw *QueryWrapper) GetRootPartTables(conn *dbconn.DBConn, isDest bool) (map[string]bool, error)

GetRootPartTables returns a map of root partition table names example output: map[public.t1:true public.t2:true]

func (*QueryWrapper) GetUserDatabases

func (qw *QueryWrapper) GetUserDatabases(conn *dbconn.DBConn) ([]string, error)

GetUserDatabases returns all user databases excluding system databases

func (*QueryWrapper) GetUserTables

func (qw *QueryWrapper) GetUserTables(srcConn, destConn *dbconn.DBConn) ([]option.Table, []option.Table, []option.Table, map[string][]string)

GetUserTables retrieves and processes user tables based on copy mode

func (*QueryWrapper) ResetCache

func (qw *QueryWrapper) ResetCache()

ResetCache resets the partition leaf table cache

type SchemaValidator

type SchemaValidator struct {
	*BaseValidator
}

SchemaValidator handles schema-related validations

func NewSchemaValidator

func NewSchemaValidator(flags *pflag.FlagSet) *SchemaValidator

NewSchemaValidator creates a new schema validator instance

func (*SchemaValidator) Validate

func (v *SchemaValidator) Validate() error

Validate implements the Validator interface

type SessionGUCs

type SessionGUCs struct {
	ClientEncoding string `db:"client_encoding"`
}

type TableCopier

type TableCopier struct {
	// contains filtered or unexported fields
}

func NewTableCopier

func NewTableCopier(manager *CopyManager,
	srcTable, destTable option.Table,
	workerID int,
	copiedMap map[string]int) *TableCopier

func (*TableCopier) Copy

func (tc *TableCopier) Copy()

Copy performs the data copy operation for a single table. It checks if the copy should be skipped, prepares the destination, executes the data copy, and commits the transaction if successful. Cleanup is handled automatically at the end of the operation.

type TableValidator

type TableValidator struct {
	*BaseValidator
}

TableValidator handles table-related validations

func NewTableValidator

func NewTableValidator(flags *pflag.FlagSet) *TableValidator

NewTableValidator creates a new table validator instance

func (*TableValidator) Validate

func (v *TableValidator) Validate() error

Validate implements the Validator interface

type ValidationError

type ValidationError struct {
	Message string
}

ValidationError defines a custom error type for validation failures

func (*ValidationError) Error

func (e *ValidationError) Error() string

type Validator

type Validator interface {
	Validate() error
}

Validator interface defines the contract for all validators

type ValidatorManager

type ValidatorManager struct {
	// contains filtered or unexported fields
}

ValidatorManager manages all validators

func NewValidatorManager

func NewValidatorManager(flags *pflag.FlagSet) *ValidatorManager

NewValidatorManager creates a new validator manager with all required validators

func (*ValidatorManager) ValidateAll

func (vm *ValidatorManager) ValidateAll() error

ValidateAll runs all validators

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL