executor

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2025 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildRegisteredFromOutput added in v1.0.0

func BuildRegisteredFromOutput(output pkg.ModuleOutput, changed bool) interface{}

BuildRegisteredFromOutput converts a ModuleOutput into a map suitable for `register:` and ensures consistent fields like "failed" and "changed" are present.

func CalculateExpectedResults added in v1.0.0

func CalculateExpectedResults(
	nodesInLevel []pkg.GraphNode,
	hostContexts map[string]*pkg.HostContext,
	cfg *config.Config,
) (int, error)

func CreateRunOnceResultsForAllHosts added in v1.0.0

func CreateRunOnceResultsForAllHosts(originalResult pkg.TaskResult, hostContexts map[string]*pkg.HostContext, executionHostName string) []pkg.TaskResult

CreateRunOnceResultsForAllHosts creates TaskResult copies for all hosts when run_once is used This ensures that facts and results are propagated to all hosts in the batch

func GetDelegatedHostContext added in v1.0.0

func GetDelegatedHostContext(task pkg.GraphNode, hostContexts map[string]*pkg.HostContext, closure *pkg.Closure, cfg *config.Config) (*pkg.HostContext, error)

func GetFirstAvailableHost added in v1.0.0

func GetFirstAvailableHost(hostContexts map[string]*pkg.HostContext) (*pkg.HostContext, string)

GetFirstAvailableHost returns the first host from the hostContexts map Used for run_once tasks to select the execution host. It sorts the hosts by name to ensure deterministic execution.

func GetTaskClosures added in v1.0.0

func GetTaskClosures(task pkg.GraphNode, c *pkg.HostContext, cfg *config.Config) ([]*pkg.Closure, error)

GetTaskClosures generates one or more Closures for a task, handling loops.

func InitializeRecapStats added in v1.0.0

func InitializeRecapStats(hostContexts map[string]*pkg.HostContext) map[string]map[string]int

func PPrintOutput added in v1.0.0

func PPrintOutput(output pkg.ModuleOutput, err error)

PPrintOutput prints the output of a module or an error in a readable format.

func ParseLoop added in v1.0.0

func ParseLoop(task pkg.GraphNode, c *pkg.HostContext, cfg *config.Config) ([]interface{}, error)

ParseLoop parses the loop directive of a task, evaluating expressions and returning items to iterate over.

func PrepareLevelHistoryAndGetCount added in v1.0.0

func PrepareLevelHistoryAndGetCount(
	nodesInLevel []pkg.GraphNode,
	hostContexts map[string]*pkg.HostContext,
	executionLevel int,
	cfg *config.Config,
) (map[string]chan pkg.GraphNode, int, error)

func PropagateRegisteredToAllHosts added in v1.0.0

func PropagateRegisteredToAllHosts(
	task pkg.GraphNode,
	registeredValue interface{},
	hostContexts map[string]*pkg.HostContext,
	workflowHostFacts map[string]map[string]interface{},
)

PropagateRegisteredToAllHosts stores the provided registered value into all host contexts, and also mirrors it into the workflow-wide facts map if provided (Temporal executor path).

func RunSpageTemporalWorkerAndWorkflow

func RunSpageTemporalWorkerAndWorkflow(opts RunSpageTemporalWorkerAndWorkflowOptions) error

RunSpageTemporalWorkerAndWorkflow sets up and runs a Temporal worker for Spage tasks, and can optionally trigger a workflow execution.

func SharedLoadLevelTasks added in v1.0.0

func SharedLoadLevelTasks(
	env DispatchEnv,
	runner RunnerAdapter,
	tasksInLevel []pkg.GraphNode,
	hostContexts map[string]*pkg.HostContext,
	cfg *config.Config,
	workflowHostFacts map[string]map[string]interface{},
)

SharedLoadLevelTasks contains the unified logic for dispatching tasks for a level

func SharedProcessLevelResults added in v1.0.0

func SharedProcessLevelResults(
	resultsCh ResultChannel,
	errCh ErrorChannel,
	logger Logger,
	executionLevel int,
	cfg *config.Config,
	numExpectedResultsOnLevel int,
	recapStats map[string]map[string]int,
	executionHistoryLevel map[string]chan pkg.GraphNode,
	onResult func(pkg.TaskResult) error,
) (bool, []pkg.TaskResult, error)

SharedProcessLevelResults contains the common logic for processing level results that can be used by both local and temporal executors

func SpageTemporalWorkflow

func SpageTemporalWorkflow(ctx workflow.Context, graphInput *pkg.Graph, inventoryFile string, spageConfigInput *config.Config, limitPattern string) error

SpageTemporalWorkflow defines the main workflow logic.

Types

type ContextAwareErrorChannel added in v1.0.0

type ContextAwareErrorChannel struct {
	// contains filtered or unexported fields
}

ContextAwareErrorChannel wraps a LocalErrorChannel to handle context cancellation

func NewContextAwareErrorChannel added in v1.0.0

func NewContextAwareErrorChannel(ctx context.Context, ch *LocalErrorChannel) *ContextAwareErrorChannel

func (*ContextAwareErrorChannel) IsClosed added in v1.0.0

func (c *ContextAwareErrorChannel) IsClosed() bool

func (*ContextAwareErrorChannel) ReceiveError added in v1.0.0

func (c *ContextAwareErrorChannel) ReceiveError() (error, bool, error)

type ContextAwareResultChannel added in v1.0.0

type ContextAwareResultChannel struct {
	// contains filtered or unexported fields
}

ContextAwareResultChannel wraps a LocalResultChannel to handle context cancellation

func NewContextAwareResultChannel added in v1.0.0

func NewContextAwareResultChannel(ctx context.Context, ch *LocalResultChannel) *ContextAwareResultChannel

func (*ContextAwareResultChannel) IsClosed added in v1.0.0

func (c *ContextAwareResultChannel) IsClosed() bool

func (*ContextAwareResultChannel) ReceiveResult added in v1.0.0

func (c *ContextAwareResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)

type DispatchEnv added in v1.0.0

type DispatchEnv interface {
	IsParallel() bool
	Go(fn func())
	SendResult(result pkg.TaskResult)
	SendError(err error)
	Inc()
	Done()
	Wait()
}

DispatchEnv abstracts concurrency and channel operations for shared load-level logic

type ErrorChannel added in v1.0.0

type ErrorChannel interface {
	ReceiveError() (error, bool, error)
	IsClosed() bool
}

ErrorChannel abstracts over different error channel types

type FormattedGenericOutput added in v1.0.0

type FormattedGenericOutput struct {
	// contains filtered or unexported fields
}

FormattedGenericOutput preserves the formatted string output from modules while still implementing the ModuleOutput interface and providing map access

func NewFormattedGenericOutput added in v1.0.0

func NewFormattedGenericOutput(output string, moduleMap map[string]interface{}, changed bool) FormattedGenericOutput

NewFormattedGenericOutput creates a FormattedGenericOutput from activity result

func (FormattedGenericOutput) Changed added in v1.0.0

func (f FormattedGenericOutput) Changed() bool

Changed returns the changed status from the original module

func (FormattedGenericOutput) Facts added in v1.0.0

func (f FormattedGenericOutput) Facts() map[string]interface{}

Facts returns the module output map for fact registration

func (FormattedGenericOutput) String added in v1.0.0

func (f FormattedGenericOutput) String() string

String returns the already formatted string from the original module

type GenericOutput added in v1.0.0

type GenericOutput map[string]interface{}

GenericOutput is a flexible map-based implementation of pkg.ModuleOutput.

func (GenericOutput) Changed added in v1.0.0

func (g GenericOutput) Changed() bool

Changed checks for a "changed" key in the map.

func (GenericOutput) Facts added in v1.0.0

func (g GenericOutput) Facts() map[string]interface{}

Facts returns the output map itself, so all keys become facts.

func (GenericOutput) String added in v1.0.0

func (g GenericOutput) String() string

String provides a simple string representation of the map.

type GraphNodeDTO added in v1.0.0

type GraphNodeDTO struct {
	Kind       string             `json:"kind"` // "task" | "collection"
	Task       *pkg.Task          `json:"task,omitempty"`
	Collection *TaskCollectionDTO `json:"collection,omitempty"`
}

GraphNodeDTO provides a serializable wrapper for pkg.GraphNode (Task or TaskCollection)

type LocalDispatchEnv added in v1.0.0

type LocalDispatchEnv struct {
	// contains filtered or unexported fields
}

LocalDispatchEnv implements DispatchEnv for the local executor

func NewLocalDispatchEnv added in v1.0.0

func NewLocalDispatchEnv(ctx context.Context, wg *sync.WaitGroup, resultsCh chan pkg.TaskResult, errCh chan error, parallel bool) *LocalDispatchEnv

func (*LocalDispatchEnv) Done added in v1.0.0

func (e *LocalDispatchEnv) Done()

func (*LocalDispatchEnv) Go added in v1.0.0

func (e *LocalDispatchEnv) Go(fn func())

func (*LocalDispatchEnv) Inc added in v1.0.0

func (e *LocalDispatchEnv) Inc()

func (*LocalDispatchEnv) IsParallel added in v1.0.0

func (e *LocalDispatchEnv) IsParallel() bool

func (*LocalDispatchEnv) SendError added in v1.0.0

func (e *LocalDispatchEnv) SendError(err error)

func (*LocalDispatchEnv) SendResult added in v1.0.0

func (e *LocalDispatchEnv) SendResult(result pkg.TaskResult)

func (*LocalDispatchEnv) Wait added in v1.0.0

func (e *LocalDispatchEnv) Wait()

type LocalErrorChannel added in v1.0.0

type LocalErrorChannel struct {
	// contains filtered or unexported fields
}

LocalErrorChannel implements ErrorChannel for standard Go channels

func NewLocalErrorChannel added in v1.0.0

func NewLocalErrorChannel(ch chan error) *LocalErrorChannel

func (*LocalErrorChannel) IsClosed added in v1.0.0

func (c *LocalErrorChannel) IsClosed() bool

func (*LocalErrorChannel) ReceiveError added in v1.0.0

func (c *LocalErrorChannel) ReceiveError() (error, bool, error)

type LocalGraphExecutor added in v1.0.0

type LocalGraphExecutor struct {
	Runner pkg.TaskRunner
}

LocalGraphExecutor provides common logic for executing a Spage graph. It relies on a TaskRunner to perform the actual execution of individual tasks.

func NewLocalGraphExecutor added in v1.0.0

func NewLocalGraphExecutor(runner pkg.TaskRunner) *LocalGraphExecutor

NewLocalGraphExecutor creates a new LocalGraphExecutor with the given TaskRunner.

func (*LocalGraphExecutor) Execute added in v1.0.0

func (e *LocalGraphExecutor) Execute(hostContexts map[string]*pkg.HostContext, orderedGraph [][]pkg.GraphNode, cfg *config.Config) error

func (*LocalGraphExecutor) Revert added in v1.0.0

func (e *LocalGraphExecutor) Revert(ctx context.Context, executedTasks []map[string]chan pkg.GraphNode, hostContexts map[string]*pkg.HostContext, cfg *config.Config) error

type LocalLogger added in v1.0.0

type LocalLogger struct{}

LocalLogger implements Logger for standard logging

func NewLocalLogger added in v1.0.0

func NewLocalLogger() *LocalLogger

func (*LocalLogger) Debug added in v1.0.0

func (l *LocalLogger) Debug(msg string, args ...interface{})

func (*LocalLogger) Error added in v1.0.0

func (l *LocalLogger) Error(msg string, args ...interface{})

func (*LocalLogger) Info added in v1.0.0

func (l *LocalLogger) Info(msg string, args ...interface{})

func (*LocalLogger) Warn added in v1.0.0

func (l *LocalLogger) Warn(msg string, args ...interface{})

type LocalResultChannel added in v1.0.0

type LocalResultChannel struct {
	// contains filtered or unexported fields
}

LocalResultChannel implements ResultChannel for standard Go channels

func NewLocalResultChannel added in v1.0.0

func NewLocalResultChannel(ch chan pkg.TaskResult) *LocalResultChannel

func (*LocalResultChannel) IsClosed added in v1.0.0

func (c *LocalResultChannel) IsClosed() bool

func (*LocalResultChannel) ReceiveResult added in v1.0.0

func (c *LocalResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)

type LocalRunnerAdapter added in v1.0.0

type LocalRunnerAdapter struct {
	Ctx    context.Context
	Runner pkg.TaskRunner
	Config *config.Config
}

LocalRunnerAdapter adapts pkg.TaskRunner to RunnerAdapter

func NewLocalRunnerAdapter added in v1.0.0

func NewLocalRunnerAdapter(ctx context.Context, runner pkg.TaskRunner, cfg *config.Config) *LocalRunnerAdapter

func (*LocalRunnerAdapter) Execute added in v1.0.0

func (a *LocalRunnerAdapter) Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))

type LocalTaskRunner

type LocalTaskRunner struct{}

LocalTaskRunner implements the TaskRunner interface for local execution. It directly calls the task's ExecuteModule method.

func (*LocalTaskRunner) ExecuteTask

func (r *LocalTaskRunner) ExecuteTask(ctx context.Context, task pkg.GraphNode, closure *pkg.Closure, cfg *config.Config) chan pkg.TaskResult

RunTask executes a task locally. It directly calls task.ExecuteModule and returns its result. The TaskResult from ExecuteModule is expected to be populated by handleResult (called within ExecuteModule).

func (*LocalTaskRunner) RevertTask

func (r *LocalTaskRunner) RevertTask(ctx context.Context, task pkg.GraphNode, closure *pkg.Closure, cfg *config.Config) chan pkg.TaskResult

type Logger added in v1.0.0

type Logger interface {
	Error(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
	Info(msg string, args ...interface{})
	Debug(msg string, args ...interface{})
}

Logger abstracts over different logging implementations

type ResultChannel added in v1.0.0

type ResultChannel interface {
	ReceiveResult() (pkg.TaskResult, bool, error)
	IsClosed() bool
}

ResultChannel abstracts over different channel types used by local and temporal executors

type ResultProcessor added in v1.0.0

type ResultProcessor struct {
	ExecutionLevel int
	Logger         Logger
	Config         *config.Config
}

ResultProcessor handles the common logic for processing individual task results

func (*ResultProcessor) ProcessHandlerResult added in v1.0.0

func (rp *ResultProcessor) ProcessHandlerResult(
	result pkg.TaskResult,
	recapStats map[string]map[string]int,
) bool

ProcessHandlerResult handles the common logic for processing a single handler result Returns whether the result represents a hard error (always false for handlers - they continue on error)

func (*ResultProcessor) ProcessTaskResult added in v1.0.0

func (rp *ResultProcessor) ProcessTaskResult(
	result pkg.TaskResult,
	recapStats map[string]map[string]int,
	executionHistoryLevel map[string]chan pkg.GraphNode,
) bool

ProcessTaskResult handles the common logic for processing a single task result Returns whether the result represents a hard error that should stop execution

type RunSpageTemporalWorkerAndWorkflowOptions

type RunSpageTemporalWorkerAndWorkflowOptions struct {
	Graph            *pkg.Graph
	InventoryPath    string
	LoadedConfig     *config.Config // Changed from ConfigPath to break import cycle with cmd
	WorkflowIDPrefix string
	LimitPattern     string
}

RunSpageTemporalWorkerAndWorkflowOptions defines options for RunSpageTemporalWorkerAndWorkflow.

type RunnerAdapter added in v1.0.0

type RunnerAdapter interface {
	Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))
}

RunnerAdapter abstracts task execution for different executors

type SpageActivityInput

type SpageActivityInput struct {
	TaskDefinition   pkg.Task
	TargetHost       pkg.Host
	LoopItem         interface{} // nil if not a loop task or for the main item
	CurrentHostFacts map[string]interface{}
	SpageCoreConfig  *config.Config // Pass necessary config parts
	TaskHistory      map[string]interface{}
	Handlers         []GraphNodeDTO // Handlers from the graph (DTO for serialization)
}

SpageActivityInput defines the input for our generic Spage task activity.

type SpageActivityResult

type SpageActivityResult struct {
	HostName          string
	TaskName          string
	Output            string
	Changed           bool
	Error             string // Store error message if any
	Skipped           bool
	Ignored           bool
	RegisteredVars    map[string]interface{}
	HostFactsSnapshot map[string]interface{}
	ModuleOutputMap   map[string]interface{}
	NotifiedHandlers  []string // Handler names that were notified during this activity
}

SpageActivityResult defines the output from our generic Spage task activity.

func ExecuteSpageTaskActivity

func ExecuteSpageTaskActivity(ctx context.Context, input SpageActivityInput) (*SpageActivityResult, error)

ExecuteSpageTaskActivity is the generic activity that runs a Spage task.

func RevertSpageTaskActivity

func RevertSpageTaskActivity(ctx context.Context, input SpageActivityInput) (*SpageActivityResult, error)

RevertSpageTaskActivity is the generic activity that runs a Spage task's revert action.

type SpageMetaWorkflowInput added in v1.0.0

type SpageMetaWorkflowInput struct {
	Meta             GraphNodeDTO
	TargetHost       pkg.Host
	CurrentHostFacts map[string]interface{}
	SpageCoreConfig  *config.Config
	Handlers         []GraphNodeDTO
}

SpageMetaWorkflowInput defines the input for executing a MetaTask as a child workflow

type SpageMetaWorkflowResult added in v1.0.0

type SpageMetaWorkflowResult struct {
	Changed           bool
	Error             string
	HostFactsSnapshot map[string]interface{}
	NotifiedHandlers  []string
}

SpageMetaWorkflowResult summarizes the child workflow execution

func SpageMetaWorkflow added in v1.0.0

func SpageMetaWorkflow(ctx workflow.Context, input SpageMetaWorkflowInput) (SpageMetaWorkflowResult, error)

SpageMetaWorkflow executes a MetaTask's children sequentially for a single host

type SpageRunOnceLoopActivityInput added in v1.0.0

type SpageRunOnceLoopActivityInput struct {
	TaskDefinition   pkg.Task
	TargetHost       pkg.Host
	LoopItems        []interface{} // All loop items to execute
	CurrentHostFacts map[string]interface{}
	SpageCoreConfig  *config.Config
	Handlers         []GraphNodeDTO // Handlers from the graph (DTO)
}

SpageRunOnceLoopActivityInput defines the input for run_once tasks with loops.

type SpageRunOnceLoopActivityResult added in v1.0.0

type SpageRunOnceLoopActivityResult struct {
	HostName          string
	TaskName          string
	LoopResults       []SpageActivityResult // Results for each loop iteration
	HostFactsSnapshot map[string]interface{}
}

SpageRunOnceLoopActivityResult defines the output from run_once loop activity.

func ExecuteSpageRunOnceLoopActivity added in v1.0.0

func ExecuteSpageRunOnceLoopActivity(ctx context.Context, input SpageRunOnceLoopActivityInput) (*SpageRunOnceLoopActivityResult, error)

type TaskCollectionDTO added in v1.0.0

type TaskCollectionDTO struct {
	Id     int            `json:"id"`
	Name   string         `json:"name"`
	Tasks  []GraphNodeDTO `json:"tasks"`
	Rescue []GraphNodeDTO `json:"rescue,omitempty"`
	Always []GraphNodeDTO `json:"always,omitempty"`
}

TaskCollectionDTO serializes TaskCollection

type TemporalDispatchEnv added in v1.0.0

type TemporalDispatchEnv struct {
	// contains filtered or unexported fields
}

TemporalDispatchEnv implements DispatchEnv for the Temporal executor

func NewTemporalDispatchEnv added in v1.0.0

func NewTemporalDispatchEnv(ctx workflow.Context, resultsCh, errCh, completionCh workflow.Channel, parallel bool, adapter *TemporalRunnerAdapter) *TemporalDispatchEnv

func (*TemporalDispatchEnv) Done added in v1.0.0

func (e *TemporalDispatchEnv) Done()

func (*TemporalDispatchEnv) Go added in v1.0.0

func (e *TemporalDispatchEnv) Go(fn func())

func (*TemporalDispatchEnv) Inc added in v1.0.0

func (e *TemporalDispatchEnv) Inc()

func (*TemporalDispatchEnv) IsParallel added in v1.0.0

func (e *TemporalDispatchEnv) IsParallel() bool

func (*TemporalDispatchEnv) SendError added in v1.0.0

func (e *TemporalDispatchEnv) SendError(err error)

func (*TemporalDispatchEnv) SendResult added in v1.0.0

func (e *TemporalDispatchEnv) SendResult(result pkg.TaskResult)

func (*TemporalDispatchEnv) Wait added in v1.0.0

func (e *TemporalDispatchEnv) Wait()

type TemporalErrorChannel added in v1.0.0

type TemporalErrorChannel struct {
	// contains filtered or unexported fields
}

TemporalErrorChannel implements ErrorChannel for Temporal workflow channels

func NewTemporalErrorChannel added in v1.0.0

func NewTemporalErrorChannel(ctx workflow.Context, ch workflow.ReceiveChannel) *TemporalErrorChannel

func (*TemporalErrorChannel) IsClosed added in v1.0.0

func (c *TemporalErrorChannel) IsClosed() bool

func (*TemporalErrorChannel) ReceiveError added in v1.0.0

func (c *TemporalErrorChannel) ReceiveError() (error, bool, error)

type TemporalGraphExecutor

type TemporalGraphExecutor struct {
	Runner TemporalTaskRunner
}

func NewTemporalGraphExecutor

func NewTemporalGraphExecutor(runner TemporalTaskRunner) *TemporalGraphExecutor

func (*TemporalGraphExecutor) Execute

func (e *TemporalGraphExecutor) Execute(
	hostContexts map[string]*pkg.HostContext,
	orderedGraph [][]pkg.GraphNode,
	cfg *config.Config,
) error

func (*TemporalGraphExecutor) Revert added in v1.0.0

func (e *TemporalGraphExecutor) Revert(
	ctx context.Context,
	executedTasksHistory []map[string]chan pkg.GraphNode,
	hostContexts map[string]*pkg.HostContext,
	cfg *config.Config,
) error

Revert implements the GraphExecutor interface for Temporal. It requires a workflow context, which must be passed in via the context.Context argument.

type TemporalLogger added in v1.0.0

type TemporalLogger struct {
	// contains filtered or unexported fields
}

TemporalLogger implements Logger for Temporal workflow logging

func NewTemporalLogger added in v1.0.0

func NewTemporalLogger(ctx workflow.Context) *TemporalLogger

func (*TemporalLogger) Debug added in v1.0.0

func (l *TemporalLogger) Debug(msg string, args ...interface{})

func (*TemporalLogger) Error added in v1.0.0

func (l *TemporalLogger) Error(msg string, args ...interface{})

func (*TemporalLogger) Info added in v1.0.0

func (l *TemporalLogger) Info(msg string, args ...interface{})

func (*TemporalLogger) Warn added in v1.0.0

func (l *TemporalLogger) Warn(msg string, args ...interface{})

type TemporalResultChannel added in v1.0.0

type TemporalResultChannel struct {
	// contains filtered or unexported fields
}

TemporalResultChannel implements ResultChannel for Temporal workflow channels

func NewTemporalResultChannel added in v1.0.0

func NewTemporalResultChannel(ctx workflow.Context, ch workflow.ReceiveChannel) *TemporalResultChannel

func (*TemporalResultChannel) IsClosed added in v1.0.0

func (c *TemporalResultChannel) IsClosed() bool

func (*TemporalResultChannel) ReceiveResult added in v1.0.0

func (c *TemporalResultChannel) ReceiveResult() (pkg.TaskResult, bool, error)

type TemporalRunnerAdapter added in v1.0.0

type TemporalRunnerAdapter struct {
	WorkflowCtx workflow.Context
	CurrentCtx  workflow.Context
	Runner      interface {
		ExecuteTask(workflow.Context, pkg.GraphNode, *pkg.Closure, *config.Config) pkg.TaskResult
	}
	Config *config.Config
}

TemporalRunnerAdapter adapts TemporalTaskRunner-like runners to RunnerAdapter

func NewTemporalRunnerAdapter added in v1.0.0

func NewTemporalRunnerAdapter(ctx workflow.Context, runner interface {
	ExecuteTask(workflow.Context, pkg.GraphNode, *pkg.Closure, *config.Config) pkg.TaskResult
}, cfg *config.Config) *TemporalRunnerAdapter

func (*TemporalRunnerAdapter) Execute added in v1.0.0

func (a *TemporalRunnerAdapter) Execute(task pkg.GraphNode, closure *pkg.Closure, handle func(pkg.TaskResult))

type TemporalTaskRunner

type TemporalTaskRunner struct {
	WorkflowCtx workflow.Context // The Temporal workflow context
}

TemporalTaskRunner implements the TaskRunner interface for Temporal activity execution. It requires access to the workflow.Context to execute activities. Note: This runner is conceptual for showing how Temporal fits the TaskRunner pattern. The SpageTemporalWorkflow will still manage its own execution loop due to differences in fact/state management compared to LocalGraphExecutor's assumptions.

func NewTemporalTaskRunner

func NewTemporalTaskRunner(workflowCtx workflow.Context) *TemporalTaskRunner

NewTemporalTaskRunner creates a new TemporalTaskRunner.

func (*TemporalTaskRunner) ExecuteTask

func (r *TemporalTaskRunner) ExecuteTask(execCtx workflow.Context, task pkg.GraphNode, closure *pkg.Closure, cfg *config.Config) pkg.TaskResult

RunTask for Temporal dispatches the task as a Temporal activity. It converts the SpageActivityResult from the activity into a TaskResult. The original SpageActivityResult is stored in TaskResult.ExecutionSpecificOutput.

func (*TemporalTaskRunner) RevertTask

func (r *TemporalTaskRunner) RevertTask(execCtx workflow.Context, task pkg.Task, closure *pkg.Closure, cfg *config.Config) pkg.TaskResult

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL