package api import ( "context" "encoding/json" "fmt" "net/http" "strings" "sync" "time" ) type Filter interface { Name() string Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) } type FilterRequest struct { UserMessage string `json:"user_message"` Provider string `json:"provider"` Model string `json:"model"` Metadata map[string]string `json:"metadata,omitempty"` } type FilterResponse struct { Allowed bool `json:"allowed"` Modified string `json:"modified,omitempty"` Reason string `json:"reason,omitempty"` TokenCount int `json:"token_count,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` } type Pipeline struct { mu sync.RWMutex filters map[string]Filter enabled map[string]bool stats map[string]*FilterStats } type FilterStats struct { Invocations int64 `json:"invocations"` Blocked int64 `json:"blocked"` LastUsed time.Time `json:"last_used"` } func NewPipeline() *Pipeline { p := &Pipeline{ filters: make(map[string]Filter), enabled: make(map[string]bool), stats: make(map[string]*FilterStats), } p.Register(&RateLimitFilter{}) p.Register(&TokenCountFilter{}) p.Register(&LoggingFilter{}) p.Register(&ToxicityFilter{}) for name := range p.filters { p.enabled[name] = true } return p } func (p *Pipeline) Register(f Filter) { p.mu.Lock() defer p.mu.Unlock() p.filters[f.Name()] = f p.stats[f.Name()] = &FilterStats{} } func (p *Pipeline) Run(ctx context.Context, req *FilterRequest) (string, error) { p.mu.RLock() defer p.mu.RUnlock() for name, filter := range p.filters { if !p.enabled[name] { continue } resp, err := filter.Process(ctx, req) if p.stats[name] != nil { p.stats[name].Invocations++ p.stats[name].LastUsed = time.Now() } if err != nil { continue } if !resp.Allowed { if p.stats[name] != nil { p.stats[name].Blocked++ } return "", fmt.Errorf("blocked by filter %s: %s", name, resp.Reason) } if resp.Modified != "" { req.UserMessage = resp.Modified } } return req.UserMessage, nil } func (p *Pipeline) Toggle(name string, enabled bool) error { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.filters[name]; !ok { return fmt.Errorf("filter not found: %s", name) } p.enabled[name] = enabled return nil } func (p *Pipeline) IsEnabled(name string) bool { p.mu.RLock() defer p.mu.RUnlock() return p.enabled[name] } func (p *Pipeline) ListFilters() []map[string]interface{} { p.mu.RLock() defer p.mu.RUnlock() var result []map[string]interface{} for name, filter := range p.filters { entry := map[string]interface{}{ "name": name, "enabled": p.enabled[name], } if stats, ok := p.stats[name]; ok { entry["invocations"] = stats.Invocations entry["blocked"] = stats.Blocked entry["last_used"] = stats.LastUsed } _ = filter result = append(result, entry) } return result } // ── Built-in Filters ── type RateLimitFilter struct { mu sync.Mutex counters map[string][]time.Time } func (f *RateLimitFilter) Name() string { return "rate_limit" } func (f *RateLimitFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) { f.mu.Lock() defer f.mu.Unlock() if f.counters == nil { f.counters = make(map[string][]time.Time) } key := req.Provider now := time.Now() cutoff := now.Add(-time.Minute) var recent []time.Time for _, t := range f.counters[key] { if t.After(cutoff) { recent = append(recent, t) } } recent = append(recent, now) f.counters[key] = recent limit := 30 if len(recent) > limit { return &FilterResponse{ Allowed: false, Reason: fmt.Sprintf("rate limit exceeded: %d requests/minute (limit: %d)", len(recent), limit), }, nil } return &FilterResponse{Allowed: true}, nil } type TokenCountFilter struct{} func (f *TokenCountFilter) Name() string { return "token_count" } func (f *TokenCountFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) { count := len(req.UserMessage) / 4 if count > 50000 { return &FilterResponse{ Allowed: true, TokenCount: count, Reason: fmt.Sprintf("large message: ~%d tokens", count), }, nil } return &FilterResponse{Allowed: true, TokenCount: count}, nil } type LoggingFilter struct{} func (f *LoggingFilter) Name() string { return "logging" } func (f *LoggingFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) { return &FilterResponse{Allowed: true, Metadata: map[string]string{ "provider": req.Provider, "model": req.Model, }}, nil } type ToxicityFilter struct{} func (f *ToxicityFilter) Name() string { return "toxicity" } func (f *ToxicityFilter) Process(ctx context.Context, req *FilterRequest) (*FilterResponse, error) { return &FilterResponse{Allowed: true}, nil } // ── Pipeline HTTP handlers ── func (s *Server) handlePipelineFilters(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodGet { filters := s.pipeline.ListFilters() if filters == nil { filters = []map[string]interface{}{} } jsonResp(w, map[string]interface{}{"filters": filters}) return } http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) } func (s *Server) handlePipelineToggle(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed) return } name := "" if parts := splitPath(r.URL.Path); len(parts) > 0 { name = parts[len(parts)-1] } if strings.HasSuffix(r.URL.Path, "/toggle") { name = strings.TrimSuffix(name, "/toggle") } var req struct { Enabled bool `json:"enabled"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { jsonError(w, "invalid request") return } if err := s.pipeline.Toggle(name, req.Enabled); err != nil { jsonError(w, err.Error()) return } jsonResp(w, map[string]interface{}{"name": name, "enabled": req.Enabled}) } func splitPath(p string) []string { var parts []string for _, s := range strings.Split(p, "/") { if s != "" { parts = append(parts, s) } } return parts } func jsonResp(w http.ResponseWriter, v interface{}) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(v) } func jsonError(w http.ResponseWriter, msg string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadRequest) json.NewEncoder(w).Encode(map[string]string{"error": msg}) }