OpenFlow Protocol

Core Concepts

Flow Execution Lifecycle

PhaseOperationsValidationNext Phase
Validation• Schema validation of flow JSON
• Dependency analysis and cycle detection
• Variable reference validation
All checks must passQueue or Reject
Queueing• Flow added to in-memory execution queue
• Assigned unique identifier (timestamp + execution number)
• Waits for available execution slot
Concurrency limitsExecute when slot available
Execution• Flow execution context created with isolated variable registry
• Input variables merged with flow variables (input variables take precedence)
• Nodes execute sequentially according to dependency order
• Variables resolved just-in-time during node execution
• Parallel branches handle failures independently
Runtime validation per nodeComplete or Fail
Completion• Temporary files cleaned up
• Variable registry cleared
• Flow removed from active execution tracking
Resource cleanup verificationReturn results

Model Context Protocol (MCP) Integration

The OpenFlow Protocol supports integration with Model Context Protocol (MCP) servers, enabling LLMs to access external tools and data sources in real-time. MCP integration extends LLM capabilities beyond their training data.

MCP Server Configuration

{
  name: "server_identifier (alphanumeric_underscore, unique across all MCP servers, e.g., 'github_api', 'database_tools')",
  url: "mcp_endpoint_url (https_websocket_url, e.g., 'wss://api.example.com/mcp')",
  description: "server_description (human_readable, optional, explains server capabilities)",
  timeout: "request_timeout_ms (integer, 5000-120000, default: 30000)",
  retry_attempts: "max_retry_attempts (integer, 0-5, default: 3)",
  auth: "authentication_config (MCPAuthConfig object, see schema below)"
}

MCP Authentication Configuration

{
  type: "auth_type (enum: 'none' | 'api_key' | 'bearer' | 'basic' | 'custom_headers' | 'query_params')",
  api_key: "api_key_value (base64_string, optional, required when type='api_key')",
  header_name: "custom_header_name (string, optional, default: 'X-API-Key' for api_key type)",
  token: "bearer_token (jwt_string or access_token, optional, required when type='bearer')",
  username: "basic_auth_username (string, optional, required when type='basic')",
  password: "basic_auth_password (string, optional, required when type='basic')",
  headers: "custom_headers (key_value_object, optional, e.g., {'X-Custom': 'value', 'X-Version': '1.0'})",
  params: "query_parameters (key_value_object, optional, e.g., {'key': 'value', 'version': '2'})"
}

MCP Tools Configuration

{
  mcp_servers: "enabled_server_names (array of server_identifiers, optional, e.g., ['github_api', 'database_tools'])",
  builtin_tools: "included_builtin_tools (array of tool_names, optional, e.g., ['file_read', 'web_search'])",
  available_tools: "specific_tool_whitelist (array of tool_names, optional, overrides auto_discover)",
  auto_discover: "enable_auto_discovery (boolean, default: true, automatically includes all discovered tools)",
  filter: "excluded_tool_names (array of tool_names, optional, blacklist for auto-discovered tools)",
  tool_selection: "selection_mode (enum: 'auto' | 'manual', default: 'auto', how LLM selects tools)"
}

Core Components

Component Overview

ComponentPurposeScopeCharacteristics
FlowComplete AI workflow definitionGlobalSelf-contained, executable, versioned
NodeSingle processing stepFlow-scopedUnique ID, input/output schema, configurable
VariableData binding mechanismFlow/Node-scopedTyped, resolvable, interpolated
ProviderExternal service integrationSystem-wideAuthenticated, rate-limited, standardized
Execution ContextRuntime state containerExecution-scopedIsolated, temporary, tracked

Flow Structure

ElementDescriptionRequiredExamples
MetadataName, version, description, author"document_analyzer", "v1.2.0"
VariablesInput/output parameters with types{"id": "user_input", "type": "string"}
NodesOrdered processing steps[{"id": "llm_1", "type": "LLM"}]
DependenciesImplicit from variable referencesAuto-derived{{llm_1.output}} → dependency

Variable Types

Variable TypePurposeScopeResolution
Flow VariablesDeclared parameters with defaultsFlow-levelAt flow definition
Node OutputsGenerated by node executionFlow-levelRuntime, sequential
Input VariablesRuntime values from executionFlow-levelAt execution start
Scoped VariablesTemporary within control flowNode-levelWithin control structures

Provider Categories

Provider TypeServicesCapabilitiesIntegration Pattern
LLM ProvidersOpenAI, Anthropic, Grok, BedrockText generation, chat, visionAPI key + endpoint
Vector DatabasePinecone, Weaviate, ChromaEmbedding CRUD, similarity searchConnection + index
EmbeddingOpenAI, Nvidia, MicrosoftText-to-vector conversionModel selection + API
CustomUser-defined APIsDomain-specific operationsCustom interface implementation

Execution Model

Concurrency Control

AspectImplementationConfigurationBehavior
Global LimitMax concurrent flows system-wideglobal_limit: 3Queue overflow flows
Flow QueueIn-memory FIFO queueAuto-managedWait for available slots
Flow IsolationSeparate variable registriesPer-execution contextPrevent cross-flow contamination
Flow IdentityTimestamp + execution number20240115_143022_001Unique tracking identifier

Node Execution Patterns

PatternBehaviorFailure HandlingUse Cases
SequentialNodes execute in dependency orderStop on first failureLinear processing pipelines
Parallel BranchesIndependent execution pathsContinue other branchesConcurrent operations
FOR_EACHSequential iteration processingStop on iteration failureBatch processing
ConditionalBranch based on conditionsSkip false branchesDecision trees

Variable Resolution

AspectMechanismFormatTiming
ResolutionJust-in-time evaluation{{variable_name}}Before node execution
RegistryFlow-scoped storage{{node_id.output.property}}Runtime maintenance
InterpolationTemplate string replacement{{llm_1.output.text}}Dynamic substitution
IsolationPer-execution contextSeparate namespacesPrevent conflicts

Error and Resource Management

Management TypeStrategyScopeCleanup
Error HandlingFail-fast, no automatic retryNode/Flow levelImmediate termination
Temporary FilesAuto-cleanup after completionFlow executionPost-execution cleanup
Registry StateClear after completionPer-executionMemory management
Status TrackingReal-time execution monitoringSystem-wideObservability support

Flow Definition Schema

Schema Overview

Every OpenFlow flow MUST conform to the following JSON schema structure. The schema enforces type safety, validates relationships between components, and ensures protocol compliance.

Root Flow Object

{
  name: "flow_name (alphanumeric_underscore, unique identifier, e.g., 'document_analysis_pipeline')",
  version: "semantic_version (semver format, e.g., '1.2.0', '2.0.0-beta.1')",
  description: "flow_description (detailed explanation of flow purpose and functionality)",
  author: "author_identifier (string, e.g., 'team@company.com' or 'john.doe')",
  variables: "flow_variables (array of FlowVariable objects, defines flow interface)",
  input: "input_variable_names (array of variable_ids that must be provided at runtime)",
  output: "output_variable_names (array of variable_ids that will be returned after execution)",
  nodes: "processing_nodes (array of FlowNode objects, ordered by dependencies)",
  metadata: "additional_metadata (key_value_object, optional, e.g., {category: 'nlp', complexity: 'medium'})",
  tags: "classification_tags (array of strings, optional, e.g., ['nlp', 'document-processing', 'ai'])",
  timeout: "execution_timeout_ms (integer, 10000-3600000, optional, max flow execution time)",
  concurrency: "concurrency_settings (ConcurrencyConfig object, optional, controls execution limits)"
}

FlowVariable Schema

Flow variables define the data interface of the flow:

{
  id: "variable_identifier (alphanumeric_underscore, unique within flow scope, e.g., 'input_document', 'llm_response')",
  type: "data_type (enum: 'string'|'number'|'boolean'|'array'|'object'|'file'|'null'|'any', default: 'string')",
  description: "variable_description (explains purpose and expected content, optional but recommended)",
  default: "default_value (any type matching the specified type annotation, optional)",
  required: "is_required (boolean, default: true if no default value provided, false if default exists)",
  validation: "validation_rules (ValidationRule object, optional, adds constraints and format requirements)"
}

Supported Data Types

TypeDescriptionExamplesValidation
stringText values, template strings"Hello World", "{{user_input}}"Length, pattern, format
numberNumeric values (int/float)42, 3.14, -100Min/max, range validation
booleanTrue/false valuestrue, falseExact match only
arrayOrdered collections["a", "b", "c"], [1, 2, 3]Item type, min/max length
objectKey-value structures{"key": "value", "nested": {}}Property validation
fileFile paths for processing"/path/to/file.pdf"Path validation, existence
nullExplicit null valuesnullNull check only
anyDynamic typing (use sparingly)Any valid JSONMinimal validation

Validation Rules

{
  pattern: "regex_pattern (string, for string validation, e.g., '^[a-zA-Z0-9_]+$', '^\\d{4}-\\d{2}-\\d{2}$')",
  minLength: "minimum_string_length (integer, 0-10000, for string types)",
  maxLength: "maximum_string_length (integer, 1-100000, for string types)",
  minimum: "minimum_numeric_value (number, for number types, e.g., 0, -100.5)",
  maximum: "maximum_numeric_value (number, for number types, e.g., 1000, 99.99)",
  enum: "allowed_values (array of valid options, e.g., ['draft', 'published', 'archived'])",
  format: "format_constraint (predefined format, e.g., 'email', 'url', 'date', 'uuid', 'base64')"
}

On this page