package orchestrator import ( "bytes" "encoding/json" "fmt" "io" "net/http" "regexp" "strings" "sync" "time" "github.com/muyue/muyue/internal/config" "github.com/muyue/muyue/internal/workflow" ) var thinkRegex = regexp.MustCompile(`(?s)<[Tt]hink[^>]*>.*?`) const maxHistorySize = 100 type Message struct { Role string `json:"role"` Content string `json:"content"` } type ChatRequest struct { Model string `json:"model"` Messages []Message `json:"messages"` Stream bool `json:"stream"` } type ChatResponse struct { Choices []struct { Message struct { Content string `json:"content"` } `json:"message"` } `json:"choices"` Usage struct { TotalTokens int `json:"total_tokens"` } `json:"usage"` } type Orchestrator struct { config *config.MuyueConfig provider *config.AIProvider client *http.Client history []Message histMu sync.Mutex Workflow *workflow.Workflow } var sharedHTTPClient = &http.Client{ Timeout: 120 * time.Second, } func New(cfg *config.MuyueConfig) (*Orchestrator, error) { var provider *config.AIProvider for i := range cfg.AI.Providers { if cfg.AI.Providers[i].Active { provider = &cfg.AI.Providers[i] break } } if provider == nil { return nil, fmt.Errorf("no active AI provider configured") } if provider.APIKey == "" { return nil, fmt.Errorf("API key not set for %s", provider.Name) } return &Orchestrator{ config: cfg, provider: provider, client: sharedHTTPClient, history: []Message{}, Workflow: workflow.New(), }, nil } func (o *Orchestrator) Send(userMessage string) (string, error) { o.histMu.Lock() o.history = append(o.history, Message{ Role: "user", Content: userMessage, }) if len(o.history) > maxHistorySize { o.history = o.history[len(o.history)-maxHistorySize:] } reqBody := ChatRequest{ Model: o.provider.Model, Messages: o.history, Stream: false, } o.histMu.Unlock() body, err := json.Marshal(reqBody) if err != nil { return "", fmt.Errorf("marshal request: %w", err) } baseURL := o.provider.BaseURL if baseURL == "" { baseURL = getProviderBaseURL(o.provider.Name) } url := strings.TrimRight(baseURL, "/") + "/chat/completions" req, err := http.NewRequest("POST", url, bytes.NewReader(body)) if err != nil { return "", fmt.Errorf("create request: %w", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+o.provider.APIKey) resp, err := o.client.Do(req) if err != nil { return "", fmt.Errorf("send request: %w", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("read response: %w", err) } if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("API error (%d): %s", resp.StatusCode, string(respBody)) } var chatResp ChatResponse if err := json.Unmarshal(respBody, &chatResp); err != nil { return "", fmt.Errorf("parse response: %w", err) } if len(chatResp.Choices) == 0 { return "", fmt.Errorf("no response from AI") } content := cleanAIResponse(chatResp.Choices[0].Message.Content) o.histMu.Lock() o.history = append(o.history, Message{ Role: "assistant", Content: content, }) o.histMu.Unlock() return content, nil } func (o *Orchestrator) StartWorkflow(goal string) (string, error) { o.Workflow.Start(goal) prompt := fmt.Sprintf("I want to: %s\nWhat questions do you need to ask me to fully understand this requirement? Ask ALL questions at once.", goal) o.history = []Message{ {Role: "system", Content: workflow.BuildSystemPrompt(workflow.PhaseGathering, o.Workflow.Plan)}, {Role: "user", Content: prompt}, } reqBody := ChatRequest{ Model: o.provider.Model, Messages: o.history, Stream: false, } body, err := json.Marshal(reqBody) if err != nil { return "", fmt.Errorf("marshal request: %w", err) } baseURL := o.provider.BaseURL if baseURL == "" { baseURL = getProviderBaseURL(o.provider.Name) } url := strings.TrimRight(baseURL, "/") + "/chat/completions" req, err := http.NewRequest("POST", url, bytes.NewReader(body)) if err != nil { return "", fmt.Errorf("create request: %w", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+o.provider.APIKey) resp, err := o.client.Do(req) if err != nil { return "", fmt.Errorf("send request: %w", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("read response: %w", err) } if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("API error (%d): %s", resp.StatusCode, string(respBody)) } var chatResp ChatResponse if err := json.Unmarshal(respBody, &chatResp); err != nil { return "", fmt.Errorf("parse response: %w", err) } if len(chatResp.Choices) == 0 { return "", fmt.Errorf("no response from AI") } content := cleanAIResponse(chatResp.Choices[0].Message.Content) o.history = append(o.history, Message{ Role: "assistant", Content: content, }) return content, nil } func (o *Orchestrator) AnswerQuestion(answer string) (string, error) { o.Workflow.AddAnswer(answer) return o.Send(answer) } func (o *Orchestrator) GeneratePlan() (string, error) { o.Workflow.Phase = workflow.PhasePlanning o.history = append(o.history, Message{ Role: "system", Content: workflow.BuildSystemPrompt(workflow.PhasePlanning, o.Workflow.Plan), }) prompt := "All questions have been answered. Now create a detailed step-by-step execution plan as a JSON array. Each step should have: id, title, description, agent (crush/claude/muyue)." if len(o.Workflow.Plan.PreviewFiles) > 0 { prompt += "\nInclude visual previews where helpful using the PREVIEW_JSON format." } resp, err := o.Send(prompt) if err != nil { return "", err } steps, parseErr := workflow.ParsePlanResponse(resp) if parseErr == nil { o.Workflow.SetPlan("") o.Workflow.Plan.Steps = steps o.Workflow.Phase = workflow.PhaseReviewing } previewFiles := workflow.ParsePreviewFiles(resp) if len(previewFiles) > 0 { o.Workflow.SetPreviewFiles(previewFiles) } return resp, nil } func (o *Orchestrator) ReviewPlan(approved bool, feedback string) (string, error) { if approved { o.Workflow.Approve() return o.executeNextStep() } o.Workflow.Reject(feedback) return o.Send(fmt.Sprintf("The plan was rejected. Reason: %s. Please revise the plan.", feedback)) } func (o *Orchestrator) executeNextStep() (string, error) { step := o.Workflow.CurrentStep() if step == nil { return "All steps completed!", nil } o.history = append(o.history, Message{ Role: "system", Content: workflow.BuildSystemPrompt(workflow.PhaseExecuting, o.Workflow.Plan), }) return o.Send(fmt.Sprintf("Execute step %s: %s\n%s", step.ID, step.Title, step.Description)) } func (o *Orchestrator) ContinueExecution(output string) (string, error) { o.Workflow.AdvanceStep(output) if o.Workflow.Phase == workflow.PhaseDone { return "Workflow completed! All steps have been executed.", nil } return o.executeNextStep() } func (o *Orchestrator) History() []Message { o.histMu.Lock() defer o.histMu.Unlock() cp := make([]Message, len(o.history)) copy(cp, o.history) return cp } func (o *Orchestrator) ClearHistory() { o.histMu.Lock() o.history = []Message{} o.histMu.Unlock() o.Workflow.Reset() } func cleanAIResponse(content string) string { content = thinkRegex.ReplaceAllString(content, "") lines := strings.Split(content, "\n") var clean []string inBlock := false for _, line := range lines { trimmed := strings.TrimSpace(line) if trimmed == "<<" || trimmed == "<<<" { inBlock = true continue } if trimmed == ">>" || trimmed == ">>>" { inBlock = false continue } if inBlock { continue } clean = append(clean, line) } result := strings.TrimSpace(strings.Join(clean, "\n")) return result } func getProviderBaseURL(name string) string { switch name { case "minimax": return "https://api.minimax.io/v1" case "anthropic": return "https://api.anthropic.com/v1" case "openai": return "https://api.openai.com/v1" case "zai": return "https://api.z.ai/v1" default: return "" } }