feat(filters): Add configurable filters for incoming jobs

Signed-off-by: Richard Palethorpe <io@richiejp.com>
This commit is contained in:
Richard Palethorpe
2025-04-29 21:07:40 +01:00
parent 02c6b5ad4e
commit f2c3b9dbdb
19 changed files with 511 additions and 5 deletions

View File

@@ -492,6 +492,73 @@ func (a *Agent) processUserInputs(job *types.Job, role string, conv Messages) Me
return conv
}
func (a *Agent) filterJob(job *types.Job) (ok bool, err error) {
hasTriggers := false
triggeredBy := ""
failedBy := ""
if job.DoneFilter {
return true, nil
}
job.DoneFilter = true
if len(a.options.jobFilters) < 1 {
xlog.Debug("No filters")
return true, nil
}
for _, filter := range a.options.jobFilters {
name := filter.Name()
if triggeredBy != "" && filter.IsTrigger() {
continue
}
ok, err = filter.Apply(job)
if err != nil {
xlog.Error("Error in job filter", "filter", name, "error", err)
failedBy = name
break
}
if filter.IsTrigger() {
hasTriggers = true
if ok {
triggeredBy = name
xlog.Info("Job triggered by filter", "filter", name)
}
} else if !ok {
failedBy = name
xlog.Info("Job failed filter", "filter", name)
break
} else {
xlog.Debug("Job passed filter", "filter", name)
}
}
if a.Observer() != nil {
obs := a.Observer().NewObservable()
obs.Name = "filter"
obs.Icon = "shield"
obs.ParentID = job.Obs.ID
if err == nil {
obs.Completion = &types.Completion{
FilterResult: &types.FilterResult{
HasTriggers: hasTriggers,
TriggeredBy: triggeredBy,
FailedBy: failedBy,
},
}
} else {
obs.Completion = &types.Completion{
Error: err.Error(),
}
}
a.Observer().Update(*obs)
}
return failedBy == "" && (!hasTriggers || triggeredBy != ""), nil
}
func (a *Agent) consumeJob(job *types.Job, role string, retries int) {
if err := job.GetContext().Err(); err != nil {
@@ -533,6 +600,14 @@ func (a *Agent) consumeJob(job *types.Job, role string, retries int) {
}
conv = a.processPrompts(conv)
if ok, err := a.filterJob(job); !ok || err != nil {
if err != nil {
job.Result.Finish(fmt.Errorf("Error in job filter: %w", err))
} else {
job.Result.Finish(nil)
}
return
}
conv = a.processUserInputs(job, role, conv)
// RAG

View File

@@ -24,6 +24,7 @@ type options struct {
randomIdentityGuidance string
randomIdentity bool
userActions types.Actions
jobFilters types.JobFilters
enableHUD, standaloneJob, showCharacter, enableKB, enableSummaryMemory, enableLongTermMemory bool
stripThinkingTags bool
@@ -373,6 +374,13 @@ func WithActions(actions ...types.Action) Option {
}
}
func WithJobFilters(filters ...types.JobFilter) Option {
return func(o *options) error {
o.jobFilters = filters
return nil
}
}
func WithObserver(observer Observer) Option {
return func(o *options) error {
o.observer = observer

View File

@@ -31,6 +31,11 @@ func (d DynamicPromptsConfig) ToMap() map[string]string {
return config
}
type FiltersConfig struct {
Type string `json:"type"`
Config string `json:"config"`
}
type AgentConfig struct {
Connector []ConnectorConfig `json:"connectors" form:"connectors" `
Actions []ActionsConfig `json:"actions" form:"actions"`
@@ -39,6 +44,7 @@ type AgentConfig struct {
MCPSTDIOServers []agent.MCPSTDIOServer `json:"mcp_stdio_servers" form:"mcp_stdio_servers"`
MCPPrepareScript string `json:"mcp_prepare_script" form:"mcp_prepare_script"`
MCPBoxURL string `json:"mcp_box_url" form:"mcp_box_url"`
Filters []FiltersConfig `json:"filters" form:"filters"`
Description string `json:"description" form:"description"`
@@ -71,6 +77,7 @@ type AgentConfig struct {
}
type AgentConfigMeta struct {
Filters []config.FieldGroup
Fields []config.Field
Connectors []config.FieldGroup
Actions []config.FieldGroup
@@ -82,6 +89,7 @@ func NewAgentConfigMeta(
actionsConfig []config.FieldGroup,
connectorsConfig []config.FieldGroup,
dynamicPromptsConfig []config.FieldGroup,
filtersConfig []config.FieldGroup,
) AgentConfigMeta {
return AgentConfigMeta{
Fields: []config.Field{
@@ -319,6 +327,7 @@ func NewAgentConfigMeta(
DynamicPrompts: dynamicPromptsConfig,
Connectors: connectorsConfig,
Actions: actionsConfig,
Filters: filtersConfig,
}
}

View File

@@ -38,6 +38,7 @@ type AgentPool struct {
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action
connectors func(*AgentConfig) []Connector
dynamicPrompt func(*AgentConfig) []DynamicPrompt
filters func(*AgentConfig) types.JobFilters
timeout string
conversationLogs string
}
@@ -78,6 +79,7 @@ func NewAgentPool(
availableActions func(*AgentConfig) func(ctx context.Context, pool *AgentPool) []types.Action,
connectors func(*AgentConfig) []Connector,
promptBlocks func(*AgentConfig) []DynamicPrompt,
filters func(*AgentConfig) types.JobFilters,
timeout string,
withLogs bool,
) (*AgentPool, error) {
@@ -110,6 +112,7 @@ func NewAgentPool(
connectors: connectors,
availableActions: availableActions,
dynamicPrompt: promptBlocks,
filters: filters,
timeout: timeout,
conversationLogs: conversationPath,
}, nil
@@ -135,6 +138,7 @@ func NewAgentPool(
connectors: connectors,
localRAGAPI: LocalRAGAPI,
dynamicPrompt: promptBlocks,
filters: filters,
availableActions: availableActions,
timeout: timeout,
conversationLogs: conversationPath,
@@ -337,6 +341,8 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
if config.Model != "" {
model = config.Model
} else {
config.Model = model
}
if config.MCPBoxURL != "" {
@@ -347,12 +353,17 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
config.PeriodicRuns = "10m"
}
// XXX: Why do we update the pool config from an Agent's config?
if config.APIURL != "" {
a.apiURL = config.APIURL
} else {
config.APIURL = a.apiURL
}
if config.APIKey != "" {
a.apiKey = config.APIKey
} else {
config.APIKey = a.apiKey
}
if config.LocalRAGURL != "" {
@@ -366,6 +377,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
connectors := a.connectors(config)
promptBlocks := a.dynamicPrompt(config)
actions := a.availableActions(config)(ctx, a)
filters := a.filters(config)
stateFile, characterFile := a.stateFiles(name)
actionsLog := []string{}
@@ -378,6 +390,11 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
connectorLog = append(connectorLog, fmt.Sprintf("%+v", connector))
}
filtersLog := []string{}
for _, filter := range filters {
filtersLog = append(filtersLog, filter.Name())
}
xlog.Info(
"Creating agent",
"name", name,
@@ -385,6 +402,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
"api_url", a.apiURL,
"actions", actionsLog,
"connectors", connectorLog,
"filters", filtersLog,
)
// dynamicPrompts := []map[string]string{}
@@ -406,6 +424,7 @@ func (a *AgentPool) startAgentWithConfig(name string, config *AgentConfig, obs O
WithMCPSTDIOServers(config.MCPSTDIOServers...),
WithMCPBoxURL(a.mcpBoxURL),
WithPrompts(promptBlocks...),
WithJobFilters(filters...),
WithMCPPrepareScript(config.MCPPrepareScript),
// WithDynamicPrompts(dynamicPrompts...),
WithCharacter(Character{

15
core/types/filters.go Normal file
View File

@@ -0,0 +1,15 @@
package types
type JobFilter interface {
Name() string
Apply(job *Job) (bool, error)
IsTrigger() bool
}
type JobFilters []JobFilter
type FilterResult struct {
HasTriggers bool `json:"has_triggers"`
TriggeredBy string `json:"triggered_by,omitempty"`
FailedBy string `json:"failed_by,omitempty"`
}

View File

@@ -19,6 +19,7 @@ type Job struct {
ConversationHistory []openai.ChatCompletionMessage
UUID string
Metadata map[string]interface{}
DoneFilter bool
pastActions []*ActionRequest
nextAction *Action

View File

@@ -24,7 +24,8 @@ type Completion struct {
ChatCompletionResponse *openai.ChatCompletionResponse `json:"chat_completion_response,omitempty"`
Conversation []openai.ChatCompletionMessage `json:"conversation,omitempty"`
ActionResult string `json:"action_result,omitempty"`
AgentState *AgentInternalState `json:"agent_state"`
AgentState *AgentInternalState `json:"agent_state,omitempty"`
FilterResult *FilterResult `json:"filter_result,omitempty"`
}
type Observable struct {