Documentation
¶
Index ¶
- func BuildRegisteredFromOutput(output pkg.ModuleOutput, changed bool) interface{}
- func CalculateExpectedResults(nodesInLevel []pkg.GraphNode, hostContexts map[string]*pkg.HostContext, ...) (int, error)
- func CreateRunOnceResultsForAllHosts(originalResult pkg.TaskResult, hostContexts map[string]*pkg.HostContext, ...) []pkg.TaskResult
- func GetDelegatedHostContext(task pkg.GraphNode, hostContexts map[string]*pkg.HostContext, ...) (*pkg.HostContext, error)
- func GetFirstAvailableHost(hostContexts map[string]*pkg.HostContext) (*pkg.HostContext, string)
- func GetTaskClosures(task pkg.GraphNode, c *pkg.HostContext, cfg *config.Config) ([]*pkg.Closure, error)
- func InitializeRecapStats(hostContexts map[string]*pkg.HostContext) map[string]map[string]int
- func PPrintOutput(output pkg.ModuleOutput, err error)
- func ParseLoop(task pkg.GraphNode, c *pkg.HostContext, cfg *config.Config) ([]interface{}, error)
- func PrepareLevelHistoryAndGetCount(nodesInLevel []pkg.GraphNode, hostContexts map[string]*pkg.HostContext, ...) (map[string]chan pkg.GraphNode, int, error)
- func PropagateRegisteredToAllHosts(task pkg.GraphNode, registeredValue interface{}, ...)
- func RunSpageTemporalWorkerAndWorkflow(opts RunSpageTemporalWorkerAndWorkflowOptions) error
- func SharedLoadLevelTasks(env DispatchEnv, runner RunnerAdapter, tasksInLevel []pkg.GraphNode, ...)
- func SharedProcessLevelResults(resultsCh ResultChannel, errCh ErrorChannel, logger Logger, executionLevel int, ...) (bool, []pkg.TaskResult, error)
- func SpageTemporalWorkflow(ctx workflow.Context, graphInput *pkg.Graph, inventoryFile string, ...) error
- type ContextAwareErrorChannel
- type ContextAwareResultChannel
- type DispatchEnv
- type ErrorChannel
- type FormattedGenericOutput
- type GenericOutput
- type GraphNodeDTO
- type LocalDispatchEnv
- type LocalErrorChannel
- type LocalGraphExecutor
- type LocalLogger
- type LocalResultChannel
- type LocalRunnerAdapter
- type LocalTaskRunner
- type Logger
- type ResultChannel
- type ResultProcessor
- type RunSpageTemporalWorkerAndWorkflowOptions
- type RunnerAdapter
- type SpageActivityInput
- type SpageActivityResult
- type SpageMetaWorkflowInput
- type SpageMetaWorkflowResult
- type SpageRunOnceLoopActivityInput
- type SpageRunOnceLoopActivityResult
- type TaskCollectionDTO
- type TemporalDispatchEnv
- func (e *TemporalDispatchEnv) Done()
- func (e *TemporalDispatchEnv) Go(fn func())
- func (e *TemporalDispatchEnv) Inc()
- func (e *TemporalDispatchEnv) IsParallel() bool
- func (e *TemporalDispatchEnv) SendError(err error)
- func (e *TemporalDispatchEnv) SendResult(result pkg.TaskResult)
- func (e *TemporalDispatchEnv) Wait()
- type TemporalErrorChannel
- type TemporalGraphExecutor
- type TemporalLogger
- type TemporalResultChannel
- type TemporalRunnerAdapter
- type TemporalTaskRunner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildRegisteredFromOutput ¶ added in v1.0.0
func BuildRegisteredFromOutput(output pkg.ModuleOutput, changed bool) interface{}
BuildRegisteredFromOutput converts a ModuleOutput into a map suitable for `register:` and ensures consistent fields like "failed" and "changed" are present.
func CalculateExpectedResults ¶ added in v1.0.0
func CreateRunOnceResultsForAllHosts ¶ added in v1.0.0
func CreateRunOnceResultsForAllHosts(originalResult pkg.TaskResult, hostContexts map[string]*pkg.HostContext, executionHostName string) []pkg.TaskResult
CreateRunOnceResultsForAllHosts creates TaskResult copies for all hosts when run_once is used This ensures that facts and results are propagated to all hosts in the batch
func GetDelegatedHostContext ¶ added in v1.0.0
func GetFirstAvailableHost ¶ added in v1.0.0
func GetFirstAvailableHost(hostContexts map[string]*pkg.HostContext) (*pkg.HostContext, string)
GetFirstAvailableHost returns the first host from the hostContexts map Used for run_once tasks to select the execution host. It sorts the hosts by name to ensure deterministic execution.
func GetTaskClosures ¶ added in v1.0.0
func GetTaskClosures(task pkg.GraphNode, c *pkg.HostContext, cfg *config.Config) ([]*pkg.Closure, error)
GetTaskClosures generates one or more Closures for a task, handling loops.
func InitializeRecapStats ¶ added in v1.0.0
func PPrintOutput ¶ added in v1.0.0
func PPrintOutput(output pkg.ModuleOutput, err error)
PPrintOutput prints the output of a module or an error in a readable format.
func ParseLoop ¶ added in v1.0.0
ParseLoop parses the loop directive of a task, evaluating expressions and returning items to iterate over.
func PrepareLevelHistoryAndGetCount ¶ added in v1.0.0
func PropagateRegisteredToAllHosts ¶ added in v1.0.0
func PropagateRegisteredToAllHosts( task pkg.GraphNode, registeredValue interface{}, hostContexts map[string]*pkg.HostContext, workflowHostFacts map[string]map[string]interface{}, )
PropagateRegisteredToAllHosts stores the provided registered value into all host contexts, and also mirrors it into the workflow-wide facts map if provided (Temporal executor path).
func RunSpageTemporalWorkerAndWorkflow ¶
func RunSpageTemporalWorkerAndWorkflow(opts RunSpageTemporalWorkerAndWorkflowOptions) error
RunSpageTemporalWorkerAndWorkflow sets up and runs a Temporal worker for Spage tasks, and can optionally trigger a workflow execution.
func SharedLoadLevelTasks ¶ added in v1.0.0
func SharedLoadLevelTasks( env DispatchEnv, runner RunnerAdapter, tasksInLevel []pkg.GraphNode, hostContexts map[string]*pkg.HostContext, cfg *config.Config, workflowHostFacts map[string]map[string]interface{}, )
SharedLoadLevelTasks contains the unified logic for dispatching tasks for a level
func SharedProcessLevelResults ¶ added in v1.0.0
func SharedProcessLevelResults( resultsCh ResultChannel, errCh ErrorChannel, logger Logger, executionLevel int, cfg *config.Config, numExpectedResultsOnLevel int, recapStats map[string]map[string]int, executionHistoryLevel map[string]chan pkg.GraphNode, onResult func(pkg.TaskResult) error, ) (bool, []pkg.TaskResult, error)
SharedProcessLevelResults contains the common logic for processing level results that can be used by both local and temporal executors
Types ¶
type ContextAwareErrorChannel ¶ added in v1.0.0
type ContextAwareErrorChannel struct {
// contains filtered or unexported fields
}
ContextAwareErrorChannel wraps a LocalErrorChannel to handle context cancellation
func NewContextAwareErrorChannel ¶ added in v1.0.0
func NewContextAwareErrorChannel(ctx context.Context, ch *LocalErrorChannel) *ContextAwareErrorChannel
func (*ContextAwareErrorChannel) IsClosed ¶ added in v1.0.0
func (c *ContextAwareErrorChannel) IsClosed() bool
func (*ContextAwareErrorChannel) ReceiveError ¶ added in v1.0.0
func (c *ContextAwareErrorChannel) ReceiveError() (error, bool, error)
type ContextAwareResultChannel ¶ added in v1.0.0
type ContextAwareResultChannel struct {
// contains filtered or unexported fields
}
ContextAwareResultChannel wraps a LocalResultChannel to handle context cancellation
func NewContextAwareResultChannel ¶ added in v1.0.0
func NewContextAwareResultChannel(ctx context.Context, ch *LocalResultChannel) *ContextAwareResultChannel
func (*ContextAwareResultChannel) IsClosed ¶ added in v1.0.0
func (c *ContextAwareResultChannel) IsClosed() bool
func (*ContextAwareResultChannel) ReceiveResult ¶ added in v1.0.0
func (c *ContextAwareResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)
type DispatchEnv ¶ added in v1.0.0
type DispatchEnv interface {
IsParallel() bool
Go(fn func())
SendResult(result pkg.TaskResult)
SendError(err error)
Inc()
Done()
Wait()
}
DispatchEnv abstracts concurrency and channel operations for shared load-level logic
type ErrorChannel ¶ added in v1.0.0
ErrorChannel abstracts over different error channel types
type FormattedGenericOutput ¶ added in v1.0.0
type FormattedGenericOutput struct {
// contains filtered or unexported fields
}
FormattedGenericOutput preserves the formatted string output from modules while still implementing the ModuleOutput interface and providing map access
func NewFormattedGenericOutput ¶ added in v1.0.0
func NewFormattedGenericOutput(output string, moduleMap map[string]interface{}, changed bool) FormattedGenericOutput
NewFormattedGenericOutput creates a FormattedGenericOutput from activity result
func (FormattedGenericOutput) Changed ¶ added in v1.0.0
func (f FormattedGenericOutput) Changed() bool
Changed returns the changed status from the original module
func (FormattedGenericOutput) Facts ¶ added in v1.0.0
func (f FormattedGenericOutput) Facts() map[string]interface{}
Facts returns the module output map for fact registration
func (FormattedGenericOutput) String ¶ added in v1.0.0
func (f FormattedGenericOutput) String() string
String returns the already formatted string from the original module
type GenericOutput ¶ added in v1.0.0
type GenericOutput map[string]interface{}
GenericOutput is a flexible map-based implementation of pkg.ModuleOutput.
func (GenericOutput) Changed ¶ added in v1.0.0
func (g GenericOutput) Changed() bool
Changed checks for a "changed" key in the map.
func (GenericOutput) Facts ¶ added in v1.0.0
func (g GenericOutput) Facts() map[string]interface{}
Facts returns the output map itself, so all keys become facts.
func (GenericOutput) String ¶ added in v1.0.0
func (g GenericOutput) String() string
String provides a simple string representation of the map.
type GraphNodeDTO ¶ added in v1.0.0
type GraphNodeDTO struct {
Kind string `json:"kind"` // "task" | "collection"
Task *pkg.Task `json:"task,omitempty"`
Collection *TaskCollectionDTO `json:"collection,omitempty"`
}
GraphNodeDTO provides a serializable wrapper for pkg.GraphNode (Task or TaskCollection)
type LocalDispatchEnv ¶ added in v1.0.0
type LocalDispatchEnv struct {
// contains filtered or unexported fields
}
LocalDispatchEnv implements DispatchEnv for the local executor
func NewLocalDispatchEnv ¶ added in v1.0.0
func NewLocalDispatchEnv(ctx context.Context, wg *sync.WaitGroup, resultsCh chan pkg.TaskResult, errCh chan error, parallel bool) *LocalDispatchEnv
func (*LocalDispatchEnv) Done ¶ added in v1.0.0
func (e *LocalDispatchEnv) Done()
func (*LocalDispatchEnv) Go ¶ added in v1.0.0
func (e *LocalDispatchEnv) Go(fn func())
func (*LocalDispatchEnv) Inc ¶ added in v1.0.0
func (e *LocalDispatchEnv) Inc()
func (*LocalDispatchEnv) IsParallel ¶ added in v1.0.0
func (e *LocalDispatchEnv) IsParallel() bool
func (*LocalDispatchEnv) SendError ¶ added in v1.0.0
func (e *LocalDispatchEnv) SendError(err error)
func (*LocalDispatchEnv) SendResult ¶ added in v1.0.0
func (e *LocalDispatchEnv) SendResult(result pkg.TaskResult)
func (*LocalDispatchEnv) Wait ¶ added in v1.0.0
func (e *LocalDispatchEnv) Wait()
type LocalErrorChannel ¶ added in v1.0.0
type LocalErrorChannel struct {
// contains filtered or unexported fields
}
LocalErrorChannel implements ErrorChannel for standard Go channels
func NewLocalErrorChannel ¶ added in v1.0.0
func NewLocalErrorChannel(ch chan error) *LocalErrorChannel
func (*LocalErrorChannel) IsClosed ¶ added in v1.0.0
func (c *LocalErrorChannel) IsClosed() bool
func (*LocalErrorChannel) ReceiveError ¶ added in v1.0.0
func (c *LocalErrorChannel) ReceiveError() (error, bool, error)
type LocalGraphExecutor ¶ added in v1.0.0
type LocalGraphExecutor struct {
Runner pkg.TaskRunner
}
LocalGraphExecutor provides common logic for executing a Spage graph. It relies on a TaskRunner to perform the actual execution of individual tasks.
func NewLocalGraphExecutor ¶ added in v1.0.0
func NewLocalGraphExecutor(runner pkg.TaskRunner) *LocalGraphExecutor
NewLocalGraphExecutor creates a new LocalGraphExecutor with the given TaskRunner.
type LocalLogger ¶ added in v1.0.0
type LocalLogger struct{}
LocalLogger implements Logger for standard logging
func NewLocalLogger ¶ added in v1.0.0
func NewLocalLogger() *LocalLogger
func (*LocalLogger) Debug ¶ added in v1.0.0
func (l *LocalLogger) Debug(msg string, args ...interface{})
func (*LocalLogger) Error ¶ added in v1.0.0
func (l *LocalLogger) Error(msg string, args ...interface{})
func (*LocalLogger) Info ¶ added in v1.0.0
func (l *LocalLogger) Info(msg string, args ...interface{})
func (*LocalLogger) Warn ¶ added in v1.0.0
func (l *LocalLogger) Warn(msg string, args ...interface{})
type LocalResultChannel ¶ added in v1.0.0
type LocalResultChannel struct {
// contains filtered or unexported fields
}
LocalResultChannel implements ResultChannel for standard Go channels
func NewLocalResultChannel ¶ added in v1.0.0
func NewLocalResultChannel(ch chan pkg.TaskResult) *LocalResultChannel
func (*LocalResultChannel) IsClosed ¶ added in v1.0.0
func (c *LocalResultChannel) IsClosed() bool
func (*LocalResultChannel) ReceiveResult ¶ added in v1.0.0
func (c *LocalResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)
type LocalRunnerAdapter ¶ added in v1.0.0
LocalRunnerAdapter adapts pkg.TaskRunner to RunnerAdapter
func NewLocalRunnerAdapter ¶ added in v1.0.0
func NewLocalRunnerAdapter(ctx context.Context, runner pkg.TaskRunner, cfg *config.Config) *LocalRunnerAdapter
func (*LocalRunnerAdapter) Execute ¶ added in v1.0.0
func (a *LocalRunnerAdapter) Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))
type LocalTaskRunner ¶
type LocalTaskRunner struct{}
LocalTaskRunner implements the TaskRunner interface for local execution. It directly calls the task's ExecuteModule method.
func (*LocalTaskRunner) ExecuteTask ¶
func (r *LocalTaskRunner) ExecuteTask(ctx context.Context, task pkg.GraphNode, closure *pkg.Closure, cfg *config.Config) chan pkg.TaskResult
RunTask executes a task locally. It directly calls task.ExecuteModule and returns its result. The TaskResult from ExecuteModule is expected to be populated by handleResult (called within ExecuteModule).
func (*LocalTaskRunner) RevertTask ¶
type Logger ¶ added in v1.0.0
type Logger interface {
Error(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Info(msg string, args ...interface{})
Debug(msg string, args ...interface{})
}
Logger abstracts over different logging implementations
type ResultChannel ¶ added in v1.0.0
type ResultChannel interface {
ReceiveResult() (pkg.TaskResult, bool, error)
IsClosed() bool
}
ResultChannel abstracts over different channel types used by local and temporal executors
type ResultProcessor ¶ added in v1.0.0
ResultProcessor handles the common logic for processing individual task results
func (*ResultProcessor) ProcessHandlerResult ¶ added in v1.0.0
func (rp *ResultProcessor) ProcessHandlerResult( result pkg.TaskResult, recapStats map[string]map[string]int, ) bool
ProcessHandlerResult handles the common logic for processing a single handler result Returns whether the result represents a hard error (always false for handlers - they continue on error)
func (*ResultProcessor) ProcessTaskResult ¶ added in v1.0.0
func (rp *ResultProcessor) ProcessTaskResult( result pkg.TaskResult, recapStats map[string]map[string]int, executionHistoryLevel map[string]chan pkg.GraphNode, ) bool
ProcessTaskResult handles the common logic for processing a single task result Returns whether the result represents a hard error that should stop execution
type RunSpageTemporalWorkerAndWorkflowOptions ¶
type RunSpageTemporalWorkerAndWorkflowOptions struct {
Graph *pkg.Graph
InventoryPath string
LoadedConfig *config.Config // Changed from ConfigPath to break import cycle with cmd
WorkflowIDPrefix string
LimitPattern string
}
RunSpageTemporalWorkerAndWorkflowOptions defines options for RunSpageTemporalWorkerAndWorkflow.
type RunnerAdapter ¶ added in v1.0.0
type RunnerAdapter interface {
Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))
}
RunnerAdapter abstracts task execution for different executors
type SpageActivityInput ¶
type SpageActivityInput struct {
TaskDefinition pkg.Task
TargetHost pkg.Host
LoopItem interface{} // nil if not a loop task or for the main item
CurrentHostFacts map[string]interface{}
SpageCoreConfig *config.Config // Pass necessary config parts
TaskHistory map[string]interface{}
Handlers []GraphNodeDTO // Handlers from the graph (DTO for serialization)
}
SpageActivityInput defines the input for our generic Spage task activity.
type SpageActivityResult ¶
type SpageActivityResult struct {
HostName string
TaskName string
Output string
Changed bool
Error string // Store error message if any
Skipped bool
Ignored bool
RegisteredVars map[string]interface{}
HostFactsSnapshot map[string]interface{}
ModuleOutputMap map[string]interface{}
NotifiedHandlers []string // Handler names that were notified during this activity
}
SpageActivityResult defines the output from our generic Spage task activity.
func ExecuteSpageTaskActivity ¶
func ExecuteSpageTaskActivity(ctx context.Context, input SpageActivityInput) (*SpageActivityResult, error)
ExecuteSpageTaskActivity is the generic activity that runs a Spage task.
func RevertSpageTaskActivity ¶
func RevertSpageTaskActivity(ctx context.Context, input SpageActivityInput) (*SpageActivityResult, error)
RevertSpageTaskActivity is the generic activity that runs a Spage task's revert action.
type SpageMetaWorkflowInput ¶ added in v1.0.0
type SpageMetaWorkflowInput struct {
Meta GraphNodeDTO
TargetHost pkg.Host
CurrentHostFacts map[string]interface{}
SpageCoreConfig *config.Config
Handlers []GraphNodeDTO
}
SpageMetaWorkflowInput defines the input for executing a MetaTask as a child workflow
type SpageMetaWorkflowResult ¶ added in v1.0.0
type SpageMetaWorkflowResult struct {
Changed bool
Error string
HostFactsSnapshot map[string]interface{}
NotifiedHandlers []string
}
SpageMetaWorkflowResult summarizes the child workflow execution
func SpageMetaWorkflow ¶ added in v1.0.0
func SpageMetaWorkflow(ctx workflow.Context, input SpageMetaWorkflowInput) (SpageMetaWorkflowResult, error)
SpageMetaWorkflow executes a MetaTask's children sequentially for a single host
type SpageRunOnceLoopActivityInput ¶ added in v1.0.0
type SpageRunOnceLoopActivityInput struct {
TaskDefinition pkg.Task
TargetHost pkg.Host
LoopItems []interface{} // All loop items to execute
CurrentHostFacts map[string]interface{}
SpageCoreConfig *config.Config
Handlers []GraphNodeDTO // Handlers from the graph (DTO)
}
SpageRunOnceLoopActivityInput defines the input for run_once tasks with loops.
type SpageRunOnceLoopActivityResult ¶ added in v1.0.0
type SpageRunOnceLoopActivityResult struct {
HostName string
TaskName string
LoopResults []SpageActivityResult // Results for each loop iteration
HostFactsSnapshot map[string]interface{}
}
SpageRunOnceLoopActivityResult defines the output from run_once loop activity.
func ExecuteSpageRunOnceLoopActivity ¶ added in v1.0.0
func ExecuteSpageRunOnceLoopActivity(ctx context.Context, input SpageRunOnceLoopActivityInput) (*SpageRunOnceLoopActivityResult, error)
type TaskCollectionDTO ¶ added in v1.0.0
type TaskCollectionDTO struct {
Id int `json:"id"`
Name string `json:"name"`
Tasks []GraphNodeDTO `json:"tasks"`
Rescue []GraphNodeDTO `json:"rescue,omitempty"`
Always []GraphNodeDTO `json:"always,omitempty"`
}
TaskCollectionDTO serializes TaskCollection
type TemporalDispatchEnv ¶ added in v1.0.0
type TemporalDispatchEnv struct {
// contains filtered or unexported fields
}
TemporalDispatchEnv implements DispatchEnv for the Temporal executor
func NewTemporalDispatchEnv ¶ added in v1.0.0
func NewTemporalDispatchEnv(ctx workflow.Context, resultsCh, errCh, completionCh workflow.Channel, parallel bool, adapter *TemporalRunnerAdapter) *TemporalDispatchEnv
func (*TemporalDispatchEnv) Done ¶ added in v1.0.0
func (e *TemporalDispatchEnv) Done()
func (*TemporalDispatchEnv) Go ¶ added in v1.0.0
func (e *TemporalDispatchEnv) Go(fn func())
func (*TemporalDispatchEnv) Inc ¶ added in v1.0.0
func (e *TemporalDispatchEnv) Inc()
func (*TemporalDispatchEnv) IsParallel ¶ added in v1.0.0
func (e *TemporalDispatchEnv) IsParallel() bool
func (*TemporalDispatchEnv) SendError ¶ added in v1.0.0
func (e *TemporalDispatchEnv) SendError(err error)
func (*TemporalDispatchEnv) SendResult ¶ added in v1.0.0
func (e *TemporalDispatchEnv) SendResult(result pkg.TaskResult)
func (*TemporalDispatchEnv) Wait ¶ added in v1.0.0
func (e *TemporalDispatchEnv) Wait()
type TemporalErrorChannel ¶ added in v1.0.0
type TemporalErrorChannel struct {
// contains filtered or unexported fields
}
TemporalErrorChannel implements ErrorChannel for Temporal workflow channels
func NewTemporalErrorChannel ¶ added in v1.0.0
func NewTemporalErrorChannel(ctx workflow.Context, ch workflow.ReceiveChannel) *TemporalErrorChannel
func (*TemporalErrorChannel) IsClosed ¶ added in v1.0.0
func (c *TemporalErrorChannel) IsClosed() bool
func (*TemporalErrorChannel) ReceiveError ¶ added in v1.0.0
func (c *TemporalErrorChannel) ReceiveError() (error, bool, error)
type TemporalGraphExecutor ¶
type TemporalGraphExecutor struct {
Runner TemporalTaskRunner
}
func NewTemporalGraphExecutor ¶
func NewTemporalGraphExecutor(runner TemporalTaskRunner) *TemporalGraphExecutor
func (*TemporalGraphExecutor) Execute ¶
func (e *TemporalGraphExecutor) Execute( hostContexts map[string]*pkg.HostContext, orderedGraph [][]pkg.GraphNode, cfg *config.Config, ) error
func (*TemporalGraphExecutor) Revert ¶ added in v1.0.0
func (e *TemporalGraphExecutor) Revert( ctx context.Context, executedTasksHistory []map[string]chan pkg.GraphNode, hostContexts map[string]*pkg.HostContext, cfg *config.Config, ) error
Revert implements the GraphExecutor interface for Temporal. It requires a workflow context, which must be passed in via the context.Context argument.
type TemporalLogger ¶ added in v1.0.0
type TemporalLogger struct {
// contains filtered or unexported fields
}
TemporalLogger implements Logger for Temporal workflow logging
func NewTemporalLogger ¶ added in v1.0.0
func NewTemporalLogger(ctx workflow.Context) *TemporalLogger
func (*TemporalLogger) Debug ¶ added in v1.0.0
func (l *TemporalLogger) Debug(msg string, args ...interface{})
func (*TemporalLogger) Error ¶ added in v1.0.0
func (l *TemporalLogger) Error(msg string, args ...interface{})
func (*TemporalLogger) Info ¶ added in v1.0.0
func (l *TemporalLogger) Info(msg string, args ...interface{})
func (*TemporalLogger) Warn ¶ added in v1.0.0
func (l *TemporalLogger) Warn(msg string, args ...interface{})
type TemporalResultChannel ¶ added in v1.0.0
type TemporalResultChannel struct {
// contains filtered or unexported fields
}
TemporalResultChannel implements ResultChannel for Temporal workflow channels
func NewTemporalResultChannel ¶ added in v1.0.0
func NewTemporalResultChannel(ctx workflow.Context, ch workflow.ReceiveChannel) *TemporalResultChannel
func (*TemporalResultChannel) IsClosed ¶ added in v1.0.0
func (c *TemporalResultChannel) IsClosed() bool
func (*TemporalResultChannel) ReceiveResult ¶ added in v1.0.0
func (c *TemporalResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)
type TemporalRunnerAdapter ¶ added in v1.0.0
type TemporalRunnerAdapter struct {
WorkflowCtx workflow.Context
CurrentCtx workflow.Context
Runner interface {
ExecuteTask(workflow.Context, pkg.GraphNode, *pkg.Closure, *config.Config) pkg.TaskResult
}
Config *config.Config
}
TemporalRunnerAdapter adapts TemporalTaskRunner-like runners to RunnerAdapter
func NewTemporalRunnerAdapter ¶ added in v1.0.0
func (*TemporalRunnerAdapter) Execute ¶ added in v1.0.0
func (a *TemporalRunnerAdapter) Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))
type TemporalTaskRunner ¶
TemporalTaskRunner implements the TaskRunner interface for Temporal activity execution. It requires access to the workflow.Context to execute activities. Note: This runner is conceptual for showing how Temporal fits the TaskRunner pattern. The SpageTemporalWorkflow will still manage its own execution loop due to differences in fact/state management compared to LocalGraphExecutor's assumptions.
func NewTemporalTaskRunner ¶
func NewTemporalTaskRunner(workflowCtx workflow.Context) *TemporalTaskRunner
NewTemporalTaskRunner creates a new TemporalTaskRunner.
func (*TemporalTaskRunner) ExecuteTask ¶
func (r *TemporalTaskRunner) ExecuteTask(execCtx workflow.Context, task pkg.GraphNode, closure *pkg.Closure, cfg *config.Config) pkg.TaskResult
RunTask for Temporal dispatches the task as a Temporal activity. It converts the SpageActivityResult from the activity into a TaskResult. The original SpageActivityResult is stored in TaskResult.ExecutionSpecificOutput.