Files
MuyueWorkspace/internal/workflow/engine.go
Muyue 6a7b4d8001
All checks were successful
PR Check / check (pull_request) Successful in 57s
release: v0.6.0 — security audit fixes + 7 new features
Audit corrections (security, concurrency, stability):
- chat_engine: bound resp.Choices[0] access, release tool slot per-iteration
- conversation_multi: synchronous save under existing lock (was racy fire-and-forget)
- workflow/engine: short-circuit on failed deps (no more infinite busy-wait); track failed/skipped status
- handlers_workflow: rune-aware truncate for plan goal (UTF-8 safe)
- server: CORS limited to localhost origins (was wildcard)
- handlers_info / terminal: mask API keys and SSH passwords as "***" in GET responses; preserve stored secret if "***" sent on update
- terminal: sshpass uses -e + SSHPASS env var (was both -p and -e)
- handlers_chat: MaxBytesReader 50 MB on /api/chat
- image_cache: 10 MB cap per image
- handlers_config: font size <= 72; profile-save unmarshal errors propagated
- handlers_info: /lsp/auto-install ProjectDir restricted to user home
- Shell.jsx: parenthesized resize-condition (operator precedence)
- orchestrator_test: CleanAIResponse capitalization (fixes failing vet)

New features:
- platform: detect OS name (Debian, Ubuntu, Windows 11, macOS X.Y) and inject in Studio system prompt next to the date
- agents: default timeout 30 min for crush_run/claude_run (cap also 30 min)
- agents: new cwd, wsl_distro, wsl_user params; on Windows hosts launch via "wsl -d <distro> -u <user> --cd <cwd> --"
- agents: new claude_run tool (mirror of crush_run for Claude Code CLI)
- terminal: list installed WSL distros individually in new-tab menu (Windows only)
- studio: system prompt rewritten around BMAD-METHOD personas + mandatory delegation template
- studio: "Réflexion avancée" toggle — inactive provider produces a preliminary report injected as [RAPPORT PRÉALABLE] context for the active provider
- studio: "Historique compressé" toggle — collapses past tool calls to last action only, with "Tout afficher" expansion
2026-04-27 10:12:11 +02:00

386 lines
8.2 KiB
Go

package workflow
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/muyue/muyue/internal/agent"
"github.com/muyue/muyue/internal/config"
)
type Status string
const (
StatusPending Status = "pending"
StatusRunning Status = "running"
StatusDone Status = "done"
StatusFailed Status = "failed"
StatusSkipped Status = "skipped"
StatusAwaiting Status = "awaiting_approval"
)
type StepType string
const (
TypeToolCall StepType = "tool_call"
TypeCondition StepType = "condition"
TypeParallel StepType = "parallel"
TypeApproval StepType = "approval"
)
type Step struct {
ID string `json:"id"`
Name string `json:"name"`
Type StepType `json:"type"`
Tool string `json:"tool,omitempty"`
Args json.RawMessage `json:"args,omitempty"`
Status Status `json:"status"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Condition string `json:"condition,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
ApproveRole string `json:"approve_role,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
EndedAt *time.Time `json:"ended_at,omitempty"`
}
type Workflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type string `json:"type"`
Steps []Step `json:"steps"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Engine struct {
mu sync.RWMutex
workflows map[string]*Workflow
agentRegistry *agent.Registry
storePath string
}
func NewEngine(registry *agent.Registry) (*Engine, error) {
dir, err := config.ConfigDir()
if err != nil {
dir = "/tmp/muyue"
}
storePath := filepath.Join(dir, "workflows.json")
engine := &Engine{
workflows: make(map[string]*Workflow),
agentRegistry: registry,
storePath: storePath,
}
engine.load()
return engine, nil
}
func (e *Engine) load() {
data, err := os.ReadFile(e.storePath)
if err != nil {
return
}
var workflows []*Workflow
if err := json.Unmarshal(data, &workflows); err != nil {
return
}
for _, w := range workflows {
e.workflows[w.ID] = w
}
}
func (e *Engine) save() error {
dir := filepath.Dir(e.storePath)
os.MkdirAll(dir, 0755)
e.mu.RLock()
workflows := make([]*Workflow, 0, len(e.workflows))
for _, w := range e.workflows {
workflows = append(workflows, w)
}
e.mu.RUnlock()
data, err := json.MarshalIndent(workflows, "", " ")
if err != nil {
return err
}
return os.WriteFile(e.storePath, data, 0600)
}
func (e *Engine) Create(name, description, wfType string, steps []Step) *Workflow {
wf := &Workflow{
ID: fmt.Sprintf("wf-%d", time.Now().UnixNano()),
Name: name,
Description: description,
Type: wfType,
Steps: steps,
Status: StatusPending,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
for i := range wf.Steps {
if wf.Steps[i].ID == "" {
wf.Steps[i].ID = fmt.Sprintf("step-%d", i)
}
if wf.Steps[i].Status == "" {
wf.Steps[i].Status = StatusPending
}
}
e.mu.Lock()
e.workflows[wf.ID] = wf
e.mu.Unlock()
e.save()
return wf
}
func (e *Engine) Get(id string) (*Workflow, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
wf, ok := e.workflows[id]
return wf, ok
}
func (e *Engine) List() []*Workflow {
e.mu.RLock()
defer e.mu.RUnlock()
result := make([]*Workflow, 0, len(e.workflows))
for _, w := range e.workflows {
result = append(result, w)
}
return result
}
func (e *Engine) Delete(id string) error {
e.mu.Lock()
defer e.mu.Unlock()
if _, ok := e.workflows[id]; !ok {
return fmt.Errorf("workflow not found: %s", id)
}
delete(e.workflows, id)
return e.save()
}
func (e *Engine) UpdateStep(workflowID, stepID string, update func(*Step)) error {
e.mu.Lock()
defer e.mu.Unlock()
wf, ok := e.workflows[workflowID]
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
for i := range wf.Steps {
if wf.Steps[i].ID == stepID {
update(&wf.Steps[i])
wf.UpdatedAt = time.Now()
e.save()
return nil
}
}
return fmt.Errorf("step not found: %s", stepID)
}
func (e *Engine) UpdateWorkflowStatus(workflowID string, status Status) error {
e.mu.Lock()
defer e.mu.Unlock()
wf, ok := e.workflows[workflowID]
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
wf.Status = status
wf.UpdatedAt = time.Now()
return e.save()
}
func (e *Engine) Execute(ctx context.Context, workflowID string, onStep func(step *Step, event string)) error {
wf, ok := e.Get(workflowID)
if !ok {
return fmt.Errorf("workflow not found: %s", workflowID)
}
if err := e.UpdateWorkflowStatus(workflowID, StatusRunning); err != nil {
return err
}
stepStatuses := make(map[string]Status)
for _, step := range wf.Steps {
stepStatuses[step.ID] = StatusPending
}
resolveDeps := func(stepID string) (ready bool, blocked bool) {
step := wf.findStep(stepID)
if step == nil {
return false, true
}
for _, dep := range step.DependsOn {
depStatus := stepStatuses[dep]
if depStatus == StatusFailed || depStatus == StatusSkipped {
return false, true
}
if depStatus != StatusDone {
return false, false
}
}
return true, false
}
executeStep := func(step *Step) error {
now := time.Now()
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusRunning
s.StartedAt = &now
})
if onStep != nil {
onStep(step, "started")
}
var result string
var stepErr error
switch step.Type {
case TypeToolCall:
if step.Tool == "" {
stepErr = fmt.Errorf("tool not specified for step %s", step.ID)
} else {
call := agent.ToolCall{
ID: step.ID,
Name: step.Tool,
Arguments: step.Args,
}
resp, err := e.agentRegistry.Execute(ctx, call)
if err != nil {
stepErr = err
} else {
result = resp.Content
if resp.IsError {
stepErr = fmt.Errorf("%s", result)
}
}
}
case TypeApproval:
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusAwaiting
})
if onStep != nil {
onStep(step, "awaiting_approval")
}
return nil
case TypeCondition:
result = fmt.Sprintf("condition '%s' evaluated", step.Condition)
default:
stepErr = fmt.Errorf("unknown step type: %s", step.Type)
}
endTime := time.Now()
if stepErr != nil {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusFailed
s.Error = stepErr.Error()
s.EndedAt = &endTime
})
stepStatuses[step.ID] = StatusFailed
if onStep != nil {
onStep(step, "failed")
}
} else {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusDone
s.Result = result
s.EndedAt = &endTime
})
stepStatuses[step.ID] = StatusDone
if onStep != nil {
onStep(step, "done")
}
}
return stepErr
}
hasFailures := false
for _, step := range wf.Steps {
if step.Type == TypeParallel {
continue
}
ready, blocked := resolveDeps(step.ID)
if blocked {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusSkipped
})
stepStatuses[step.ID] = StatusSkipped
if onStep != nil {
onStep(&step, "skipped")
}
continue
}
if !ready {
e.UpdateStep(workflowID, step.ID, func(s *Step) {
s.Status = StatusSkipped
s.Error = "dependency not satisfied at execution time"
})
stepStatuses[step.ID] = StatusSkipped
if onStep != nil {
onStep(&step, "skipped")
}
continue
}
if err := executeStep(&step); err != nil {
hasFailures = true
break
}
}
if hasFailures {
e.UpdateWorkflowStatus(workflowID, StatusFailed)
} else {
e.UpdateWorkflowStatus(workflowID, StatusDone)
}
return nil
}
func (w *Workflow) findStep(id string) *Step {
for i := range w.Steps {
if w.Steps[i].ID == id {
return &w.Steps[i]
}
}
return nil
}
func (e *Engine) ApproveStep(workflowID, stepID string) error {
return e.UpdateStep(workflowID, stepID, func(s *Step) {
s.Status = StatusDone
})
}
func (e *Engine) SkipStep(workflowID, stepID string) error {
return e.UpdateStep(workflowID, stepID, func(s *Step) {
s.Status = StatusSkipped
})
}