package workflow import ( "context" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" "github.com/muyue/muyue/internal/agent" "github.com/muyue/muyue/internal/config" ) type Status string const ( StatusPending Status = "pending" StatusRunning Status = "running" StatusDone Status = "done" StatusFailed Status = "failed" StatusSkipped Status = "skipped" StatusAwaiting Status = "awaiting_approval" ) type StepType string const ( TypeToolCall StepType = "tool_call" TypeCondition StepType = "condition" TypeParallel StepType = "parallel" TypeApproval StepType = "approval" ) type Step struct { ID string `json:"id"` Name string `json:"name"` Type StepType `json:"type"` Tool string `json:"tool,omitempty"` Args json.RawMessage `json:"args,omitempty"` Status Status `json:"status"` Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` Condition string `json:"condition,omitempty"` DependsOn []string `json:"depends_on,omitempty"` ApproveRole string `json:"approve_role,omitempty"` StartedAt *time.Time `json:"started_at,omitempty"` EndedAt *time.Time `json:"ended_at,omitempty"` } type Workflow struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Type string `json:"type"` Steps []Step `json:"steps"` Status Status `json:"status"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type Engine struct { mu sync.RWMutex workflows map[string]*Workflow agentRegistry *agent.Registry storePath string } func NewEngine(registry *agent.Registry) (*Engine, error) { dir, err := config.ConfigDir() if err != nil { dir = "/tmp/muyue" } storePath := filepath.Join(dir, "workflows.json") engine := &Engine{ workflows: make(map[string]*Workflow), agentRegistry: registry, storePath: storePath, } engine.load() return engine, nil } func (e *Engine) load() { data, err := os.ReadFile(e.storePath) if err != nil { return } var workflows []*Workflow if err := json.Unmarshal(data, &workflows); err != nil { return } for _, w := range workflows { e.workflows[w.ID] = w } } func (e *Engine) save() error { dir := filepath.Dir(e.storePath) os.MkdirAll(dir, 0755) e.mu.RLock() workflows := make([]*Workflow, 0, len(e.workflows)) for _, w := range e.workflows { workflows = append(workflows, w) } e.mu.RUnlock() data, err := json.MarshalIndent(workflows, "", " ") if err != nil { return err } return os.WriteFile(e.storePath, data, 0600) } func (e *Engine) Create(name, description, wfType string, steps []Step) *Workflow { wf := &Workflow{ ID: fmt.Sprintf("wf-%d", time.Now().UnixNano()), Name: name, Description: description, Type: wfType, Steps: steps, Status: StatusPending, CreatedAt: time.Now(), UpdatedAt: time.Now(), } for i := range wf.Steps { if wf.Steps[i].ID == "" { wf.Steps[i].ID = fmt.Sprintf("step-%d", i) } if wf.Steps[i].Status == "" { wf.Steps[i].Status = StatusPending } } e.mu.Lock() e.workflows[wf.ID] = wf e.mu.Unlock() e.save() return wf } func (e *Engine) Get(id string) (*Workflow, bool) { e.mu.RLock() defer e.mu.RUnlock() wf, ok := e.workflows[id] return wf, ok } func (e *Engine) List() []*Workflow { e.mu.RLock() defer e.mu.RUnlock() result := make([]*Workflow, 0, len(e.workflows)) for _, w := range e.workflows { result = append(result, w) } return result } func (e *Engine) Delete(id string) error { e.mu.Lock() defer e.mu.Unlock() if _, ok := e.workflows[id]; !ok { return fmt.Errorf("workflow not found: %s", id) } delete(e.workflows, id) return e.save() } func (e *Engine) UpdateStep(workflowID, stepID string, update func(*Step)) error { e.mu.Lock() defer e.mu.Unlock() wf, ok := e.workflows[workflowID] if !ok { return fmt.Errorf("workflow not found: %s", workflowID) } for i := range wf.Steps { if wf.Steps[i].ID == stepID { update(&wf.Steps[i]) wf.UpdatedAt = time.Now() e.save() return nil } } return fmt.Errorf("step not found: %s", stepID) } func (e *Engine) UpdateWorkflowStatus(workflowID string, status Status) error { e.mu.Lock() defer e.mu.Unlock() wf, ok := e.workflows[workflowID] if !ok { return fmt.Errorf("workflow not found: %s", workflowID) } wf.Status = status wf.UpdatedAt = time.Now() return e.save() } func (e *Engine) Execute(ctx context.Context, workflowID string, onStep func(step *Step, event string)) error { wf, ok := e.Get(workflowID) if !ok { return fmt.Errorf("workflow not found: %s", workflowID) } if err := e.UpdateWorkflowStatus(workflowID, StatusRunning); err != nil { return err } stepStatuses := make(map[string]Status) for _, step := range wf.Steps { stepStatuses[step.ID] = StatusPending } resolveDeps := func(stepID string) bool { step := wf.findStep(stepID) if step == nil { return false } for _, dep := range step.DependsOn { if stepStatuses[dep] != StatusDone { return false } } return true } executeStep := func(step *Step) error { now := time.Now() e.UpdateStep(workflowID, step.ID, func(s *Step) { s.Status = StatusRunning s.StartedAt = &now }) if onStep != nil { onStep(step, "started") } var result string var stepErr error switch step.Type { case TypeToolCall: if step.Tool == "" { stepErr = fmt.Errorf("tool not specified for step %s", step.ID) } else { call := agent.ToolCall{ ID: step.ID, Name: step.Tool, Arguments: step.Args, } resp, err := e.agentRegistry.Execute(ctx, call) if err != nil { stepErr = err } else { result = resp.Content if resp.IsError { stepErr = fmt.Errorf("%s", result) } } } case TypeApproval: e.UpdateStep(workflowID, step.ID, func(s *Step) { s.Status = StatusAwaiting }) if onStep != nil { onStep(step, "awaiting_approval") } return nil case TypeCondition: result = fmt.Sprintf("condition '%s' evaluated", step.Condition) default: stepErr = fmt.Errorf("unknown step type: %s", step.Type) } endTime := time.Now() if stepErr != nil { e.UpdateStep(workflowID, step.ID, func(s *Step) { s.Status = StatusFailed s.Error = stepErr.Error() s.EndedAt = &endTime }) if onStep != nil { onStep(step, "failed") } } else { e.UpdateStep(workflowID, step.ID, func(s *Step) { s.Status = StatusDone s.Result = result s.EndedAt = &endTime }) stepStatuses[step.ID] = StatusDone if onStep != nil { onStep(step, "done") } } return stepErr } hasFailures := false for _, step := range wf.Steps { if step.Type == TypeParallel { continue } for !resolveDeps(step.ID) { time.Sleep(100 * time.Millisecond) } if err := executeStep(&step); err != nil { hasFailures = true break } } if hasFailures { e.UpdateWorkflowStatus(workflowID, StatusFailed) } else { e.UpdateWorkflowStatus(workflowID, StatusDone) } return nil } func (w *Workflow) findStep(id string) *Step { for i := range w.Steps { if w.Steps[i].ID == id { return &w.Steps[i] } } return nil } func (e *Engine) ApproveStep(workflowID, stepID string) error { return e.UpdateStep(workflowID, stepID, func(s *Step) { s.Status = StatusDone }) } func (e *Engine) SkipStep(workflowID, stepID string) error { return e.UpdateStep(workflowID, stepID, func(s *Step) { s.Status = StatusSkipped }) }