All checks were successful
CI / build (push) Successful in 2m41s
Split monolithic app.go into focused modules (dashboard, chat, workflow, config, agents, terminal, commands, handlers). Add proper error handling for installer commands, proxy pipes, and MCP config parsing. Fix daemon channel buffer, cap orchestrator history, compile think regex once, and set HTTP timeouts on preview server. Improve CI with Go module caching, dependency download step, and test stage with race detection. 😘 Generated with Crush Assisted-by: GLM-5-Turbo via Crush <crush@charm.land>
330 lines
7.8 KiB
Go
330 lines
7.8 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/muyue/muyue/internal/config"
|
|
"github.com/muyue/muyue/internal/workflow"
|
|
)
|
|
|
|
var thinkRegex = regexp.MustCompile(`(?s)<[Tt]hink[^>]*>.*?</[Tt]hink>`)
|
|
|
|
const maxHistorySize = 100
|
|
|
|
type Message struct {
|
|
Role string `json:"role"`
|
|
Content string `json:"content"`
|
|
}
|
|
|
|
type ChatRequest struct {
|
|
Model string `json:"model"`
|
|
Messages []Message `json:"messages"`
|
|
Stream bool `json:"stream"`
|
|
}
|
|
|
|
type ChatResponse struct {
|
|
Choices []struct {
|
|
Message struct {
|
|
Content string `json:"content"`
|
|
} `json:"message"`
|
|
} `json:"choices"`
|
|
Usage struct {
|
|
TotalTokens int `json:"total_tokens"`
|
|
} `json:"usage"`
|
|
}
|
|
|
|
type Orchestrator struct {
|
|
config *config.MuyueConfig
|
|
provider *config.AIProvider
|
|
client *http.Client
|
|
history []Message
|
|
Workflow *workflow.Workflow
|
|
}
|
|
|
|
func New(cfg *config.MuyueConfig) (*Orchestrator, error) {
|
|
var provider *config.AIProvider
|
|
for i := range cfg.AI.Providers {
|
|
if cfg.AI.Providers[i].Active {
|
|
provider = &cfg.AI.Providers[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
if provider == nil {
|
|
return nil, fmt.Errorf("no active AI provider configured")
|
|
}
|
|
|
|
if provider.APIKey == "" {
|
|
return nil, fmt.Errorf("API key not set for %s", provider.Name)
|
|
}
|
|
|
|
return &Orchestrator{
|
|
config: cfg,
|
|
provider: provider,
|
|
client: &http.Client{
|
|
Timeout: 120 * time.Second,
|
|
},
|
|
history: []Message{},
|
|
Workflow: workflow.New(),
|
|
}, nil
|
|
}
|
|
|
|
func (o *Orchestrator) Send(userMessage string) (string, error) {
|
|
o.history = append(o.history, Message{
|
|
Role: "user",
|
|
Content: userMessage,
|
|
})
|
|
|
|
if len(o.history) > maxHistorySize {
|
|
o.history = o.history[len(o.history)-maxHistorySize:]
|
|
}
|
|
|
|
reqBody := ChatRequest{
|
|
Model: o.provider.Model,
|
|
Messages: o.history,
|
|
Stream: false,
|
|
}
|
|
|
|
body, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
baseURL := o.provider.BaseURL
|
|
if baseURL == "" {
|
|
baseURL = getProviderBaseURL(o.provider.Name)
|
|
}
|
|
|
|
url := strings.TrimRight(baseURL, "/") + "/chat/completions"
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return "", fmt.Errorf("create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Authorization", "Bearer "+o.provider.APIKey)
|
|
|
|
resp, err := o.client.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("send request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read response: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return "", fmt.Errorf("API error (%d): %s", resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
var chatResp ChatResponse
|
|
if err := json.Unmarshal(respBody, &chatResp); err != nil {
|
|
return "", fmt.Errorf("parse response: %w", err)
|
|
}
|
|
|
|
if len(chatResp.Choices) == 0 {
|
|
return "", fmt.Errorf("no response from AI")
|
|
}
|
|
|
|
content := cleanAIResponse(chatResp.Choices[0].Message.Content)
|
|
o.history = append(o.history, Message{
|
|
Role: "assistant",
|
|
Content: content,
|
|
})
|
|
|
|
return content, nil
|
|
}
|
|
|
|
func (o *Orchestrator) StartWorkflow(goal string) (string, error) {
|
|
o.Workflow.Start(goal)
|
|
prompt := fmt.Sprintf("I want to: %s\nWhat questions do you need to ask me to fully understand this requirement? Ask ALL questions at once.", goal)
|
|
o.history = []Message{
|
|
{Role: "system", Content: workflow.BuildSystemPrompt(workflow.PhaseGathering, o.Workflow.Plan)},
|
|
{Role: "user", Content: prompt},
|
|
}
|
|
|
|
reqBody := ChatRequest{
|
|
Model: o.provider.Model,
|
|
Messages: o.history,
|
|
Stream: false,
|
|
}
|
|
|
|
body, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
baseURL := o.provider.BaseURL
|
|
if baseURL == "" {
|
|
baseURL = getProviderBaseURL(o.provider.Name)
|
|
}
|
|
|
|
url := strings.TrimRight(baseURL, "/") + "/chat/completions"
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return "", fmt.Errorf("create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Authorization", "Bearer "+o.provider.APIKey)
|
|
|
|
resp, err := o.client.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("send request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read response: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return "", fmt.Errorf("API error (%d): %s", resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
var chatResp ChatResponse
|
|
if err := json.Unmarshal(respBody, &chatResp); err != nil {
|
|
return "", fmt.Errorf("parse response: %w", err)
|
|
}
|
|
|
|
if len(chatResp.Choices) == 0 {
|
|
return "", fmt.Errorf("no response from AI")
|
|
}
|
|
|
|
content := cleanAIResponse(chatResp.Choices[0].Message.Content)
|
|
o.history = append(o.history, Message{
|
|
Role: "assistant",
|
|
Content: content,
|
|
})
|
|
|
|
return content, nil
|
|
}
|
|
|
|
func (o *Orchestrator) AnswerQuestion(answer string) (string, error) {
|
|
o.Workflow.AddAnswer(answer)
|
|
return o.Send(answer)
|
|
}
|
|
|
|
func (o *Orchestrator) GeneratePlan() (string, error) {
|
|
o.Workflow.Phase = workflow.PhasePlanning
|
|
o.history = append(o.history, Message{
|
|
Role: "system",
|
|
Content: workflow.BuildSystemPrompt(workflow.PhasePlanning, o.Workflow.Plan),
|
|
})
|
|
|
|
prompt := "All questions have been answered. Now create a detailed step-by-step execution plan as a JSON array. Each step should have: id, title, description, agent (crush/claude/muyue)."
|
|
if len(o.Workflow.Plan.PreviewFiles) > 0 {
|
|
prompt += "\nInclude visual previews where helpful using the PREVIEW_JSON format."
|
|
}
|
|
|
|
resp, err := o.Send(prompt)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
steps, parseErr := workflow.ParsePlanResponse(resp)
|
|
if parseErr == nil {
|
|
o.Workflow.SetPlan("")
|
|
o.Workflow.Plan.Steps = steps
|
|
o.Workflow.Phase = workflow.PhaseReviewing
|
|
}
|
|
|
|
previewFiles := workflow.ParsePreviewFiles(resp)
|
|
if len(previewFiles) > 0 {
|
|
o.Workflow.SetPreviewFiles(previewFiles)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (o *Orchestrator) ReviewPlan(approved bool, feedback string) (string, error) {
|
|
if approved {
|
|
o.Workflow.Approve()
|
|
return o.executeNextStep()
|
|
}
|
|
o.Workflow.Reject(feedback)
|
|
return o.Send(fmt.Sprintf("The plan was rejected. Reason: %s. Please revise the plan.", feedback))
|
|
}
|
|
|
|
func (o *Orchestrator) executeNextStep() (string, error) {
|
|
step := o.Workflow.CurrentStep()
|
|
if step == nil {
|
|
return "All steps completed!", nil
|
|
}
|
|
|
|
o.history = append(o.history, Message{
|
|
Role: "system",
|
|
Content: workflow.BuildSystemPrompt(workflow.PhaseExecuting, o.Workflow.Plan),
|
|
})
|
|
|
|
return o.Send(fmt.Sprintf("Execute step %s: %s\n%s", step.ID, step.Title, step.Description))
|
|
}
|
|
|
|
func (o *Orchestrator) ContinueExecution(output string) (string, error) {
|
|
o.Workflow.AdvanceStep(output)
|
|
if o.Workflow.Phase == workflow.PhaseDone {
|
|
return "Workflow completed! All steps have been executed.", nil
|
|
}
|
|
return o.executeNextStep()
|
|
}
|
|
|
|
func (o *Orchestrator) History() []Message {
|
|
return o.history
|
|
}
|
|
|
|
func (o *Orchestrator) ClearHistory() {
|
|
o.history = []Message{}
|
|
o.Workflow.Reset()
|
|
}
|
|
|
|
func cleanAIResponse(content string) string {
|
|
content = thinkRegex.ReplaceAllString(content, "")
|
|
lines := strings.Split(content, "\n")
|
|
var clean []string
|
|
inBlock := false
|
|
for _, line := range lines {
|
|
trimmed := strings.TrimSpace(line)
|
|
if trimmed == "<<" || trimmed == "<<<" {
|
|
inBlock = true
|
|
continue
|
|
}
|
|
if trimmed == ">>" || trimmed == ">>>" {
|
|
inBlock = false
|
|
continue
|
|
}
|
|
if inBlock {
|
|
continue
|
|
}
|
|
clean = append(clean, line)
|
|
}
|
|
result := strings.TrimSpace(strings.Join(clean, "\n"))
|
|
return result
|
|
}
|
|
|
|
func getProviderBaseURL(name string) string {
|
|
switch name {
|
|
case "minimax":
|
|
return "https://api.minimax.io/v1"
|
|
case "anthropic":
|
|
return "https://api.anthropic.com/v1"
|
|
case "openai":
|
|
return "https://api.openai.com/v1"
|
|
case "zai":
|
|
return "https://api.z.ai/v1"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|