Files
MuyueWorkspace/internal/workflow/engine.go
Augustin 2e50366cd8
All checks were successful
Beta Release / beta (push) Successful in 2m24s
feat: add Cobra CLI, LSP/MCP registries, workflow engine, and enriched dashboard
Major changes:
- Refactor CLI entry point to Cobra commands (root, setup, scan, doctor, install, update, lsp, mcp, skills, config, version)
- Add LSP registry with health checks, auto-install, and editor config generation
- Add MCP registry with editor detection, status tracking, and per-editor configuration
- Add workflow engine with planner and step execution for automated task chains
- Add conversation search, export (Markdown/JSON), and detailed token counting
- Add streaming shell chat handler with tool call/result events
- Add skill validation, dry-run testing, and export endpoints
- Enrich dashboard with Tools/Activity/Status tabs and tool cards grid
- Add PRD documentation
- Complete i18n for both EN and FR

💘 Generated with Crush

Assisted-by: GLM-5.1 via Crush <crush@charm.land>
2026-04-22 22:22:05 +02:00

362 lines
7.6 KiB
Go

package workflow
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/muyue/muyue/internal/agent"
"github.com/muyue/muyue/internal/config"
)
type Status string
const (
StatusPending Status = "pending"
StatusRunning Status = "running"
StatusDone Status = "done"
StatusFailed Status = "failed"
StatusSkipped Status = "skipped"
StatusAwaiting Status = "awaiting_approval"
)
type StepType string
const (
TypeToolCall StepType = "tool_call"
TypeCondition StepType = "condition"
TypeParallel StepType = "parallel"
TypeApproval StepType = "approval"
)
type Step struct {
ID string `json:"id"`
Name string `json:"name"`
Type StepType `json:"type"`
Tool string `json:"tool,omitempty"`
Args json.RawMessage `json:"args,omitempty"`
Status Status `json:"status"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Condition string `json:"condition,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
ApproveRole string `json:"approve_role,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
EndedAt *time.Time `json:"ended_at,omitempty"`
}
type Workflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type string `json:"type"`
Steps []Step `json:"steps"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Engine struct {
mu sync.RWMutex
workflows map[string]*Workflow
agentRegistry *agent.Registry
storePath string
}
func NewEngine(registry *agent.Registry) (*Engine, error) {
dir, err := config.ConfigDir()
if err != nil {
dir = "/tmp/muyue"
}
storePath := filepath.Join(dir, "workflows.json")
engine := &Engine{
workflows: make(map[string]*Workflow),
agentRegistry: registry,
storePath: storePath,
}
engine.load()
return engine, nil
}
func (e *Engine) load() {
data, err := os.ReadFile(e.storePath)
if err != nil {
return
}
var workflows []*Workflow
if err := json.Unmarshal(data, &workflows); err != nil {
return
}
for _, w := range workflows {
e.workflows[w.ID] = w
}
}
func (e *Engine) save() error {
dir := filepath.Dir(e.storePath)
os.MkdirAll(dir, 0755)
e.mu.RLock()
workflows := make([]*Workflow, 0, len(e.workflows))
for _, w := range e.workflows {
workflows = append(workflows, w)
}
e.mu.RUnlock()
data, err := json.MarshalIndent(workflows, "", " ")
if err != nil {
return err
}
return os.WriteFile(e.storePath, data, 0600)
}
func (e *Engine) Create(name, description, wfType string, steps []Step) *Workflow {
wf := &Workflow{
ID: fmt.Sprintf("wf-%d", time.Now().UnixNano()),
Name: name,
Description: description,
Type: wfType,
Steps: steps,
Status: StatusPending,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
for i := range wf.Steps {
if wf.Steps[i].ID == "" {
wf.Steps[i].ID = fmt.Sprintf("step-%d", i)
}
if wf.Steps[i].Status == "" {
wf.Steps[i].Status = StatusPending
}
}
e.mu.Lock()
e.workflows[wf.ID] = wf
e.mu.Unlock()
e.save()
return wf
}
func (e *Engine) Get(id string) (*Workflow, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
wf, ok := e.workflows[id]
return wf, ok
}
func (e *Engine) List() []*Workflow {
e.mu.RLock()
defer e.mu.RUnlock()
result := make([]*Workflow, 0, len(e.workflows))
for _, w := range e.workflows {
result = append(result, w)
}
return result
}
func (e *Engine) Delete(id string) error {
e.mu.Lock()
defer e.mu.Unlock()
if _, ok := e.workflows[id]; !ok {
return fmt.Errorf("workflow not found: %s", id)
}
delete(e.workflows, id)
return e.save()
}
func (e *Engine) UpdateStep(workflowID, stepID string, update func(*Step)) error {
e.mu.Lock()
defer e.mu.Unlock()
wf, ok := e.workflows[workflowID]
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
for i := range wf.Steps {
if wf.Steps[i].ID == stepID {
update(&wf.Steps[i])
wf.UpdatedAt = time.Now()
e.save()
return nil
}
}
return fmt.Errorf("step not found: %s", stepID)
}
func (e *Engine) UpdateWorkflowStatus(workflowID string, status Status) error {
e.mu.Lock()
defer e.mu.Unlock()
wf, ok := e.workflows[workflowID]
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
wf.Status = status
wf.UpdatedAt = time.Now()
return e.save()
}
func (e *Engine) Execute(ctx context.Context, workflowID string, onStep func(step *Step, event string)) error {
wf, ok := e.Get(workflowID)
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
if err := e.UpdateWorkflowStatus(workflowID, StatusRunning); err != nil {
return err
}
stepStatuses := make(map[string]Status)
for _, step := range wf.Steps {
stepStatuses[step.ID] = StatusPending
}
resolveDeps := func(stepID string) bool {
step := wf.findStep(stepID)
if step == nil {
return false
}
for _, dep := range step.DependsOn {
if stepStatuses[dep] != StatusDone {
return false
}
}
return true
}
executeStep := func(step *Step) error {
now := time.Now()
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusRunning
s.StartedAt = &now
})
if onStep != nil {
onStep(step, "started")
}
var result string
var stepErr error
switch step.Type {
case TypeToolCall:
if step.Tool == "" {
stepErr = fmt.Errorf("tool not specified for step %s", step.ID)
} else {
call := agent.ToolCall{
ID: step.ID,
Name: step.Tool,
Arguments: step.Args,
}
resp, err := e.agentRegistry.Execute(ctx, call)
if err != nil {
stepErr = err
} else {
result = resp.Content
if resp.IsError {
stepErr = fmt.Errorf("%s", result)
}
}
}
case TypeApproval:
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusAwaiting
})
if onStep != nil {
onStep(step, "awaiting_approval")
}
return nil
case TypeCondition:
result = fmt.Sprintf("condition '%s' evaluated", step.Condition)
default:
stepErr = fmt.Errorf("unknown step type: %s", step.Type)
}
endTime := time.Now()
if stepErr != nil {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusFailed
s.Error = stepErr.Error()
s.EndedAt = &endTime
})
if onStep != nil {
onStep(step, "failed")
}
} else {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusDone
s.Result = result
s.EndedAt = &endTime
})
stepStatuses[step.ID] = StatusDone
if onStep != nil {
onStep(step, "done")
}
}
return stepErr
}
hasFailures := false
for _, step := range wf.Steps {
if step.Type == TypeParallel {
continue
}
for !resolveDeps(step.ID) {
time.Sleep(100 * time.Millisecond)
}
if err := executeStep(&step); err != nil {
hasFailures = true
break
}
}
if hasFailures {
e.UpdateWorkflowStatus(workflowID, StatusFailed)
} else {
e.UpdateWorkflowStatus(workflowID, StatusDone)
}
return nil
}
func (w *Workflow) findStep(id string) *Step {
for i := range w.Steps {
if w.Steps[i].ID == id {
return &w.Steps[i]
}
}
return nil
}
func (e *Engine) ApproveStep(workflowID, stepID string) error {
return e.UpdateStep(workflowID, stepID, func(s *Step) {
s.Status = StatusDone
})
}
func (e *Engine) SkipStep(workflowID, stepID string) error {
return e.UpdateStep(workflowID, stepID, func(s *Step) {
s.Status = StatusSkipped
})
}