feat: RAG, memory, plugins, lessons, file editor, split panes, Markdown rendering, PWA + UI overhaul
All checks were successful
Beta Release / beta (push) Successful in 5m9s
All checks were successful
Beta Release / beta (push) Successful in 5m9s
Major additions: - RAG pipeline (indexing, chunking, search) with sidebar upload button - Memory system with CRUD API - Plugins and lessons modules - MCP discovery and MCP server - Advanced skills (auto-create, conditional, improver) - Agent browser/image support, delegate, sessions - File editor with CodeMirror in split panes - Markdown rendering via react-markdown + KaTeX + highlight.js - Raw markdown toggle - PWA manifest + service worker - Extension UI redesign with new design tokens and studio-style chat - Pipeline API for chat streaming - Mobile responsive layout 💘 Generated with Crush Assisted-by: GLM-5.1 via Crush <crush@charm.land>
This commit is contained in:
283
internal/api/pipeline.go
Normal file
283
internal/api/pipeline.go
Normal file
@@ -0,0 +1,283 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Filter interface {
|
||||
Name() string
|
||||
Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error)
|
||||
}
|
||||
|
||||
type FilterRequest struct {
|
||||
UserMessage string `json:"user_message"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type FilterResponse struct {
|
||||
Allowed bool `json:"allowed"`
|
||||
Modified string `json:"modified,omitempty"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
TokenCount int `json:"token_count,omitempty"`
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type Pipeline struct {
|
||||
mu sync.RWMutex
|
||||
filters map[string]Filter
|
||||
enabled map[string]bool
|
||||
stats map[string]*FilterStats
|
||||
}
|
||||
|
||||
type FilterStats struct {
|
||||
Invocations int64 `json:"invocations"`
|
||||
Blocked int64 `json:"blocked"`
|
||||
LastUsed time.Time `json:"last_used"`
|
||||
}
|
||||
|
||||
func NewPipeline() *Pipeline {
|
||||
p := &Pipeline{
|
||||
filters: make(map[string]Filter),
|
||||
enabled: make(map[string]bool),
|
||||
stats: make(map[string]*FilterStats),
|
||||
}
|
||||
|
||||
p.Register(&RateLimitFilter{})
|
||||
p.Register(&TokenCountFilter{})
|
||||
p.Register(&LoggingFilter{})
|
||||
p.Register(&ToxicityFilter{})
|
||||
|
||||
for name := range p.filters {
|
||||
p.enabled[name] = true
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Pipeline) Register(f Filter) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.filters[f.Name()] = f
|
||||
p.stats[f.Name()] = &FilterStats{}
|
||||
}
|
||||
|
||||
func (p *Pipeline) Run(ctx context.Context, req *FilterRequest) (string, error) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
for name, filter := range p.filters {
|
||||
if !p.enabled[name] {
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := filter.Process(ctx, req)
|
||||
if p.stats[name] != nil {
|
||||
p.stats[name].Invocations++
|
||||
p.stats[name].LastUsed = time.Now()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !resp.Allowed {
|
||||
if p.stats[name] != nil {
|
||||
p.stats[name].Blocked++
|
||||
}
|
||||
return "", fmt.Errorf("blocked by filter %s: %s", name, resp.Reason)
|
||||
}
|
||||
|
||||
if resp.Modified != "" {
|
||||
req.UserMessage = resp.Modified
|
||||
}
|
||||
}
|
||||
|
||||
return req.UserMessage, nil
|
||||
}
|
||||
|
||||
func (p *Pipeline) Toggle(name string, enabled bool) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if _, ok := p.filters[name]; !ok {
|
||||
return fmt.Errorf("filter not found: %s", name)
|
||||
}
|
||||
p.enabled[name] = enabled
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pipeline) IsEnabled(name string) bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.enabled[name]
|
||||
}
|
||||
|
||||
func (p *Pipeline) ListFilters() []map[string]interface{} {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
var result []map[string]interface{}
|
||||
for name, filter := range p.filters {
|
||||
entry := map[string]interface{}{
|
||||
"name": name,
|
||||
"enabled": p.enabled[name],
|
||||
}
|
||||
if stats, ok := p.stats[name]; ok {
|
||||
entry["invocations"] = stats.Invocations
|
||||
entry["blocked"] = stats.Blocked
|
||||
entry["last_used"] = stats.LastUsed
|
||||
}
|
||||
_ = filter
|
||||
result = append(result, entry)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ── Built-in Filters ──
|
||||
|
||||
type RateLimitFilter struct {
|
||||
mu sync.Mutex
|
||||
counters map[string][]time.Time
|
||||
}
|
||||
|
||||
func (f *RateLimitFilter) Name() string { return "rate_limit" }
|
||||
|
||||
func (f *RateLimitFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if f.counters == nil {
|
||||
f.counters = make(map[string][]time.Time)
|
||||
}
|
||||
|
||||
key := req.Provider
|
||||
now := time.Now()
|
||||
cutoff := now.Add(-time.Minute)
|
||||
|
||||
var recent []time.Time
|
||||
for _, t := range f.counters[key] {
|
||||
if t.After(cutoff) {
|
||||
recent = append(recent, t)
|
||||
}
|
||||
}
|
||||
recent = append(recent, now)
|
||||
f.counters[key] = recent
|
||||
|
||||
limit := 30
|
||||
if len(recent) > limit {
|
||||
return &FilterResponse{
|
||||
Allowed: false,
|
||||
Reason: fmt.Sprintf("rate limit exceeded: %d requests/minute (limit: %d)", len(recent), limit),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &FilterResponse{Allowed: true}, nil
|
||||
}
|
||||
|
||||
type TokenCountFilter struct{}
|
||||
|
||||
func (f *TokenCountFilter) Name() string { return "token_count" }
|
||||
|
||||
func (f *TokenCountFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) {
|
||||
count := len(req.UserMessage) / 4
|
||||
if count > 50000 {
|
||||
return &FilterResponse{
|
||||
Allowed: true,
|
||||
TokenCount: count,
|
||||
Reason: fmt.Sprintf("large message: ~%d tokens", count),
|
||||
}, nil
|
||||
}
|
||||
return &FilterResponse{Allowed: true, TokenCount: count}, nil
|
||||
}
|
||||
|
||||
type LoggingFilter struct{}
|
||||
|
||||
func (f *LoggingFilter) Name() string { return "logging" }
|
||||
|
||||
func (f *LoggingFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) {
|
||||
return &FilterResponse{Allowed: true, Metadata: map[string]string{
|
||||
"provider": req.Provider,
|
||||
"model": req.Model,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
type ToxicityFilter struct{}
|
||||
|
||||
func (f *ToxicityFilter) Name() string { return "toxicity" }
|
||||
|
||||
func (f *ToxicityFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) {
|
||||
return &FilterResponse{Allowed: true}, nil
|
||||
}
|
||||
|
||||
// ── Pipeline HTTP handlers ──
|
||||
|
||||
func (s *Server) handlePipelineFilters(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
filters := s.pipeline.ListFilters()
|
||||
if filters == nil {
|
||||
filters = []map[string]interface{}{}
|
||||
}
|
||||
jsonResp(w, map[string]interface{}{"filters": filters})
|
||||
return
|
||||
}
|
||||
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
func (s *Server) handlePipelineToggle(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
name := ""
|
||||
if parts := splitPath(r.URL.Path); len(parts) > 0 {
|
||||
name = parts[len(parts)-1]
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "/toggle") {
|
||||
name = strings.TrimSuffix(name, "/toggle")
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
jsonError(w, "invalid request")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.pipeline.Toggle(name, req.Enabled); err != nil {
|
||||
jsonError(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
jsonResp(w, map[string]interface{}{"name": name, "enabled": req.Enabled})
|
||||
}
|
||||
|
||||
func splitPath(p string) []string {
|
||||
var parts []string
|
||||
for _, s := range strings.Split(p, "/") {
|
||||
if s != "" {
|
||||
parts = append(parts, s)
|
||||
}
|
||||
}
|
||||
return parts
|
||||
}
|
||||
|
||||
func jsonResp(w http.ResponseWriter, v interface{}) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
func jsonError(w http.ResponseWriter, msg string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(map[string]string{"error": msg})
|
||||
}
|
||||
Reference in New Issue
Block a user