Documentation
¶
Overview ¶
Package proc contains classes and functions that process script nodes and write results to tables or files
Index ¶
- Constants
- Variables
- func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
- func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef, ...) string
- func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext) error
- type BatchStats
- func RunCreateDistinctTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateFile(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableRelForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- type CustomProcessorRunner
- type DataIdxSeqModeType
- type FileInserter
- type FileRecordHeap
- type FileRecordHeapItem
- type IndexKeyItem
- type InserterDrainStrategy
- type PreparedQuery
- type Rowset
- func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)
- func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)
- func (rs *Rowset) ArrangeByRowid(rowids []int64) error
- func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
- func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error
- func (rs *Rowset) GetFieldNames() *[]string
- func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]any, error)
- func (rs *Rowset) InitRows(capacity int) error
- func (rs *Rowset) ToString() string
- type TableInserter
- type TableRecord
- type TableRecordBatch
- type TableRecordItem
- type TableRecordPtr
- type WriteChannelItem
- type WriteFileBatch
Constants ¶
View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive
View Source
const MaxAmazonKeyspacesBatchLen int = 30
View Source
const MaxAmazonKeyspacesInElements int = 100
const ReadFileTableInserterBatchSize int = 500 // 1000 still may cause OOM on c7g.large daemon boxes when loading portfolio bigtest parquet files const TableToTableInserterBatchSize int = 500 // Same as above, although there were no OOMs reported yet
View Source
const MaxInserterErrors int = 5
Variables ¶
View Source
var ErrDuplicateKey = errors.New("duplicate key")
View Source
var ErrDuplicateRowid = errors.New("duplicate rowid")
Functions ¶
func CreateDataTableCql ¶
func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
func CreateIdxTableCql ¶
func DeleteDataAndUniqueIndexesByBatchIdx ¶
func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext) error
To test it, see comments in the end of RunCreateTableRelForBatch
Types ¶
type BatchStats ¶ added in v1.1.1
func RunCreateDistinctTableForBatch ¶ added in v1.1.18
func RunCreateDistinctTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateFile ¶
func RunCreateFile(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startToken int64, endToken int64) (BatchStats, error)
func RunCreateTableForBatch ¶
func RunCreateTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateTableForCustomProcessorForBatch ¶
func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateTableRelForBatch ¶
func RunCreateTableRelForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, lookupNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunReadFileForBatch ¶
func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) (BatchStats, error)
func (*BatchStats) ToString ¶ added in v1.1.1
func (bs *BatchStats) ToString() string
type CustomProcessorRunner ¶
type CustomProcessorRunner interface {
Run(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArrayCallback func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}
type DataIdxSeqModeType ¶ added in v1.1.18
type DataIdxSeqModeType int
const ( DataIdxSeqModeDataFirst DataIdxSeqModeType = iota DataIdxSeqModeDistinctIdxFirst // Tells us to use idx as a uniqness vehicle for Distinct processor )
type FileInserter ¶
type FileInserter struct {
PCtx *ctx.MessageProcessingContext
FileCreator *sc.FileCreatorDef
CurrentBatch *WriteFileBatch
BatchCapacity int
BatchesIn chan *WriteFileBatch
RecordWrittenStatuses chan error
BatchesSent int
FinalFileUrl string
TempFilePath string
}
type FileRecordHeap ¶
type FileRecordHeap []*FileRecordHeapItem
func (FileRecordHeap) Len ¶
func (h FileRecordHeap) Len() int
func (FileRecordHeap) Less ¶
func (h FileRecordHeap) Less(i, j int) bool
func (*FileRecordHeap) Pop ¶
func (h *FileRecordHeap) Pop() any
func (*FileRecordHeap) Push ¶
func (h *FileRecordHeap) Push(x any)
func (FileRecordHeap) Swap ¶
func (h FileRecordHeap) Swap(i, j int)
type FileRecordHeapItem ¶
type IndexKeyItem ¶ added in v1.1.26
type InserterDrainStrategy ¶ added in v1.1.26
type InserterDrainStrategy int
const ( InserterDrainCompletely InserterDrainStrategy = iota InserterDrainSome )
type PreparedQuery ¶ added in v1.1.15
type PreparedQuery struct {
Qb *cql.QueryBuilder
Query string
}
type Rowset ¶
type Rowset struct {
Fields []sc.FieldRef
FieldsByFullAliasName map[string]int
FieldsByFieldName map[string]int
Rows []*[]any
RowCount int
}
func NewRowsetFromFieldRefs ¶
func (*Rowset) AppendFieldRefs ¶
func (*Rowset) AppendFieldRefsWithFilter ¶
func (*Rowset) ArrangeByRowid ¶
func (*Rowset) ExportToVars ¶
func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
func (*Rowset) ExportToVarsWithAlias ¶
func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error
TODO: consider passing just vars eval.VarValuesMap; maps are passed by ref in Go anyways
func (*Rowset) GetFieldNames ¶
func (*Rowset) GetTableRecord ¶
type TableInserter ¶
type TableInserter struct {
PCtx *ctx.MessageProcessingContext
TableCreator *sc.TableCreatorDef
RecordsIn chan WriteChannelItem // Channel to pass records from the main function like RunCreateTableForBatch, usig add(), to TableInserter
RecordWrittenStatuses chan error
RecordWrittenStatusesMutex sync.Mutex // Only to report on draining, otherwise useless
MachineHash int64
NumWorkers int
DrainerCapacity int
WorkerWaitGroup sync.WaitGroup
RecordsSent int // Records sent to RecordsIn
RecordsProcessed int // Number of items received in RecordWrittenStatuses
DoesNotExistPauseMillis int64 // millis
OperationTimedOutPauseMillis int64 // millis
ExpBackoffFactorMultiplier int64 // 2
MaxDbProblemRetries int // 5
MaxDuplicateRetries int // 5
DataIdxSeqMode DataIdxSeqModeType
MaxAllowedRowInsertionTimeMs int64
DrainerCancelSignal chan error
DrainerCompleteSignal chan error
DrainerDoneSignal chan error
}
type TableRecord ¶
type TableRecordBatch ¶
type TableRecordBatch []TableRecordPtr
type TableRecordItem ¶ added in v1.1.26
type TableRecordPtr ¶
type WriteChannelItem ¶
type WriteChannelItem struct {
TableRecordItems []TableRecordItem
IndexKeyItems []IndexKeyItem
}
type WriteFileBatch ¶
Click to show internal directories.
Click to hide internal directories.