package api import ( "context" "encoding/json" "net/http" "strings" "github.com/muyue/muyue/internal/workflow" ) func (s *Server) handleWorkflowCreate(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeError(w, "POST only", http.StatusMethodNotAllowed) return } var body struct { Name string `json:"name"` Description string `json:"description"` Type string `json:"type"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { writeError(w, err.Error(), http.StatusBadRequest) return } if body.Name == "" { writeError(w, "name is required", http.StatusBadRequest) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } wf := engine.Create(body.Name, body.Description, body.Type, []workflow.Step{}) writeJSON(w, wf) } func (s *Server) handleWorkflowList(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { writeError(w, "GET only", http.StatusMethodNotAllowed) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } workflows := engine.List() writeJSON(w, map[string]interface{}{ "workflows": workflows, "count": len(workflows), }) } func (s *Server) handleWorkflowGet(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { writeError(w, "GET only", http.StatusMethodNotAllowed) return } id := strings.TrimPrefix(r.URL.Path, "/api/workflow/") if id == "" { writeError(w, "workflow id required", http.StatusBadRequest) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } wf, ok := engine.Get(id) if !ok { writeError(w, "workflow not found", http.StatusNotFound) return } writeJSON(w, wf) } func (s *Server) handleWorkflowDelete(w http.ResponseWriter, r *http.Request) { if r.Method != "DELETE" { writeError(w, "DELETE only", http.StatusMethodNotAllowed) return } id := strings.TrimPrefix(r.URL.Path, "/api/workflow/") if id == "" { writeError(w, "workflow id required", http.StatusBadRequest) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } if err := engine.Delete(id); err != nil { writeError(w, err.Error(), http.StatusNotFound) return } writeJSON(w, map[string]string{"status": "deleted"}) } func (s *Server) handleWorkflowPlan(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeError(w, "POST only", http.StatusMethodNotAllowed) return } var body struct { Goal string `json:"goal"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { writeError(w, err.Error(), http.StatusBadRequest) return } if body.Goal == "" { writeError(w, "goal is required", http.StatusBadRequest) return } planner, err := workflow.NewPlanner(s.config) if err != nil { writeError(w, err.Error(), http.StatusServiceUnavailable) return } steps, err := planner.GeneratePlan(context.Background(), body.Goal) if err != nil { writeError(w, err.Error(), http.StatusInternalServerError) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } wf := engine.Create("Plan: "+truncateString(body.Goal, 30), body.Goal, "plan_execute", steps) writeJSON(w, wf) } func (s *Server) handleWorkflowExecute(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeError(w, "POST only", http.StatusMethodNotAllowed) return } id := strings.TrimPrefix(r.URL.Path, "/api/workflow/execute/") if id == "" { writeError(w, "workflow id required", http.StatusBadRequest) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } wf, ok := engine.Get(id) if !ok { writeError(w, "workflow not found", http.StatusNotFound) return } if r.URL.Query().Get("stream") == "true" { s.handleWorkflowExecuteStream(w, engine, wf) } else { err := engine.Execute(context.Background(), id, nil) if err != nil { writeError(w, err.Error(), http.StatusInternalServerError) return } wf, _ = engine.Get(id) writeJSON(w, wf) } } func (s *Server) handleWorkflowExecuteStream(w http.ResponseWriter, engine *workflow.Engine, wf *workflow.Workflow) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.WriteHeader(http.StatusOK) flusher, canFlush := w.(http.Flusher) writeSSE := func(data map[string]interface{}) { b, _ := json.Marshal(data) w.Write([]byte("data: " + string(b) + "\n\n")) if canFlush { flusher.Flush() } } go func() { engine.Execute(context.Background(), wf.ID, func(step *workflow.Step, event string) { writeSSE(map[string]interface{}{ "event": event, "step": step, }) }) wf, _ = engine.Get(wf.ID) writeSSE(map[string]interface{}{ "event": "workflow_done", "status": wf.Status, "workflow": wf, }) }() } func (s *Server) handleWorkflowApprove(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { writeError(w, "POST only", http.StatusMethodNotAllowed) return } id := strings.TrimPrefix(r.URL.Path, "/api/workflow/approve/") if id == "" { writeError(w, "workflow id required", http.StatusBadRequest) return } var body struct { StepID string `json:"step_id"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { writeError(w, err.Error(), http.StatusBadRequest) return } engine := s.workflowEngine if engine == nil { engine, _ = workflow.NewEngine(s.agentRegistry) } if err := engine.ApproveStep(id, body.StepID); err != nil { writeError(w, err.Error(), http.StatusInternalServerError) return } writeJSON(w, map[string]string{"status": "approved"}) } func truncateString(s string, max int) string { runes := []rune(s) if len(runes) <= max { return s } return string(runes[:max]) }