Luigi Pipeline Agent Integration: Two Approaches
Context
PlanExe's core pipeline is implemented as a Luigi DAG (~4000 lines Python). External agent frameworks (Codebuff, OpenClaw, custom swarms) want to interact with individual pipeline stages — either to annotate them or to inject into them. The challenge: TypeScript agent definitions that duplicate Python logic drift and break silently.
Two proposals follow. They are not mutually exclusive.
Option A: @agent_meta Decorator (Metadata Layer)
Concept
Add a lightweight Python decorator to every Luigi task that declares machine-readable metadata. External frameworks read this metadata to understand what each task does without duplicating the logic.
Complete Implementation
File: planexe/agent_meta.py
"""
Agent metadata decorator for Luigi tasks.
Adds self-documenting metadata to pipeline stages for external agent frameworks.
"""
import json
from typing import Dict, List, Any, Callable, Optional
from functools import wraps
import inspect
# Global registry to track all decorated tasks
_AGENT_REGISTRY: Dict[str, Dict[str, Any]] = {}
def agent_meta(
name: str,
description: str,
tools: List[str],
outputs: List[str],
stage: str,
tags: List[str] = None,
timeout_seconds: int = 300,
max_retries: int = 1,
) -> Callable:
"""
Decorator to add agent metadata to a Luigi task.
Args:
name: Unique identifier for this task (e.g., "identify_risks")
description: Human-readable description of what this task does
tools: List of tools/resources this task reads from
outputs: List of outputs this task produces
stage: Pipeline stage name (e.g., "risk_assumptions")
tags: Optional list of tags for categorization
timeout_seconds: Maximum execution time
max_retries: How many times to retry on failure
Returns:
Decorator function
"""
def decorator(cls):
# Store metadata in the class
cls._agent_meta = {
"name": name,
"displayName": name.replace("_", " ").title(),
"description": description,
"tools": tools,
"outputs": outputs,
"stage": stage,
"tags": tags or [],
"timeoutSeconds": timeout_seconds,
"maxRetries": max_retries,
"pythonClass": f"{cls.__module__}.{cls.__name__}",
"inputSchema": extract_input_schema(cls),
"outputSchema": extract_output_schema(cls),
}
# Register in global registry
_AGENT_REGISTRY[name] = cls._agent_meta
# Add method to expose metadata
@classmethod
def get_agent_meta(cls_inner) -> Dict[str, Any]:
"""Return the agent metadata for this task."""
return cls._agent_meta
cls.get_agent_meta = get_agent_meta
return cls
return decorator
def extract_input_schema(cls) -> Dict[str, List[str]]:
"""Extract input parameters from the task's __init__ or requires() method."""
sig = inspect.signature(cls.__init__)
params = [p for p in sig.parameters.keys() if p not in ('self', 'args', 'kwargs')]
return {"parameters": params, "context": []}
def extract_output_schema(cls) -> Dict[str, Any]:
"""Extract output schema from the task's output() method."""
return {
"format": "json",
"fields": ["result"]
}
def generate_manifest(output_path: str = ".agents/manifest.json") -> None:
"""
Generate manifest.json from all registered agents.
Call this after importing all task modules.
"""
manifest = {
"version": "1.0",
"agents": sorted(
list(_AGENT_REGISTRY.values()),
key=lambda x: x["name"]
),
"count": len(_AGENT_REGISTRY),
}
# Ensure output directory exists
from pathlib import Path
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(manifest, f, indent=2)
print(f"Generated manifest with {manifest['count']} agents at {output_path}")
def list_agents() -> List[Dict[str, Any]]:
"""Return list of all registered agents."""
return list(_AGENT_REGISTRY.values())
def get_agent(name: str) -> Optional[Dict[str, Any]]:
"""Get metadata for a specific agent by name."""
return _AGENT_REGISTRY.get(name)
Using the decorator in a Luigi task:
import luigi
from planexe.agent_meta import agent_meta
@agent_meta(
name="identify_risks",
description="Identifies project risks from assumptions and context. Returns structured risk registry.",
tools=["read_context", "read_assumptions"],
outputs=["risk_registry"],
stage="risk_assumptions",
tags=["analysis", "risk"],
)
class IdentifyRisks(luigi.Task):
"""Analyzes plan assumptions to identify risks."""
context_file = luigi.PathParameter(default="./context.json")
def requires(self):
return ResolveAssumptions()
def output(self):
return luigi.LocalTarget(f"./artifacts/risks.json")
def run(self):
with self.input().open('r') as f:
assumptions = json.load(f)
with open(self.context_file, 'r') as f:
context = json.load(f)
# Your analysis logic here
risks = analyze_risks(assumptions, context)
with self.output().open('w') as f:
json.dump(risks, f, indent=2)
Generating the manifest:
# In your main.py or __init__.py
from planexe.agent_meta import generate_manifest
import planexe.pipeline.tasks # Import all task modules
# Generate manifest after all tasks are imported
if __name__ == "__main__":
generate_manifest(".agents/manifest.json")
luigi.build([...)
Auto-generated manifest (.agents/manifest.json):
{
"version": "1.0",
"agents": [
{
"name": "identify_risks",
"displayName": "Identify Risks",
"description": "Identifies project risks from assumptions and context. Returns structured risk registry.",
"tools": ["read_context", "read_assumptions"],
"outputs": ["risk_registry"],
"stage": "risk_assumptions",
"tags": ["analysis", "risk"],
"timeoutSeconds": 300,
"maxRetries": 1,
"pythonClass": "planexe.pipeline.tasks.IdentifyRisks",
"inputSchema": {
"parameters": ["context_file"],
"context": []
},
"outputSchema": {
"format": "json",
"fields": ["risk_registry"]
}
},
{
"name": "resolve_assumptions",
"displayName": "Resolve Assumptions",
"description": "Validates and resolves plan assumptions against known data.",
"tools": ["read_knowledge_base"],
"outputs": ["assumptions_registry"],
"stage": "risk_assumptions",
"tags": ["analysis"],
"timeoutSeconds": 300,
"maxRetries": 1,
"pythonClass": "planexe.pipeline.tasks.ResolveAssumptions",
"inputSchema": {
"parameters": ["plan_file"],
"context": []
},
"outputSchema": {
"format": "json",
"fields": ["assumptions_registry"]
}
}
],
"count": 2
}
Benefits
- Single source of truth: metadata lives next to the Python code
- Framework-agnostic: any agent framework can read the manifest
- Low engineering cost: decorator pattern, ~50 lines to implement
- Drift-proof: changing the task forces the developer to look at the decorator
Limitations
- Still read-only from agent frameworks — they can read what tasks exist but can't inject into execution
- Requires discipline: developers must keep metadata accurate when changing task logic
Option C: RPC Injection Interface
Concept
Expose each Luigi task stage as an RPC endpoint. External agent frameworks call the Python pipeline directly rather than duplicating logic in TypeScript. The TypeScript agent definitions become thin RPC wrappers.
Complete Implementation
File: planexe/rpc/task_runner.py
"""
FastAPI RPC interface for Luigi pipeline tasks.
Allows external frameworks (TypeScript, Java, etc.) to invoke pipeline stages via HTTP.
"""
import json
import asyncio
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import luigi
from planexe.agent_meta import list_agents, get_agent
from planexe.pipeline import TASK_REGISTRY, build_luigi_task
logger = logging.getLogger("task-rpc")
# Request/Response models
class TaskInvocationRequest(BaseModel):
task_name: str
parameters: Dict[str, Any] = {}
context: Dict[str, Any] = {}
timeout_seconds: int = 300
class TaskResult(BaseModel):
task_name: str
status: str # "success", "failed", "timeout"
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
execution_time_ms: int
tokens_used: int = 0
class TaskMetadata(BaseModel):
name: str
displayName: str
description: str
stage: str
inputs: list
outputs: list
timeout_seconds: int
# Create FastAPI app
app = FastAPI(title="PlanExe Task RPC", version="1.0")
# Task registry and results cache (can be backed by Redis)
_task_registry: Dict[str, Any] = {}
_results_cache: Dict[str, TaskResult] = {}
async def task_executor(task_name: str, parameters: Dict[str, Any], context: Dict[str, Any]) -> TaskResult:
"""
Execute a Luigi task and return the result.
This runs the task in isolation, capturing output and errors.
"""
start_time = datetime.now()
try:
if task_name not in TASK_REGISTRY:
raise ValueError(f"Unknown task: {task_name}")
# Build the task instance
task_class = TASK_REGISTRY[task_name]
task_instance = build_luigi_task(task_class, parameters, context)
logger.info(f"Executing task: {task_name}")
# Run the task with a timeout
loop = asyncio.get_event_loop()
result = await asyncio.wait_for(
loop.run_in_executor(None, luigi.build, [task_instance]),
timeout=300.0
)
execution_time = int((datetime.now() - start_time).total_seconds() * 1000)
if result:
# Extract output from the task
output_data = extract_task_output(task_instance)
return TaskResult(
task_name=task_name,
status="success",
result=output_data,
execution_time_ms=execution_time,
tokens_used=estimate_tokens(str(output_data))
)
else:
return TaskResult(
task_name=task_name,
status="failed",
error="Task execution returned False",
execution_time_ms=execution_time,
)
except asyncio.TimeoutError:
execution_time = int((datetime.now() - start_time).total_seconds() * 1000)
return TaskResult(
task_name=task_name,
status="timeout",
error=f"Task execution exceeded 300 seconds",
execution_time_ms=execution_time,
)
except Exception as e:
execution_time = int((datetime.now() - start_time).total_seconds() * 1000)
logger.error(f"Task {task_name} failed: {str(e)}")
return TaskResult(
task_name=task_name,
status="failed",
error=str(e),
execution_time_ms=execution_time,
)
def extract_task_output(task_instance) -> Dict[str, Any]:
"""Extract output from a completed task."""
try:
output = task_instance.output()
if hasattr(output, 'path'):
with open(output.path, 'r') as f:
return json.load(f)
return {"message": "Task completed"}
except Exception as e:
logger.warning(f"Could not extract task output: {e}")
return {"message": "Task completed"}
def estimate_tokens(text: str) -> int:
"""Rough estimate of tokens (1 token ≈ 4 characters)."""
return len(text) // 4
# API Endpoints
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok", "service": "task-rpc"}
@app.get("/tasks")
async def list_tasks():
"""List all available tasks and their metadata."""
agents = list_agents()
return {
"count": len(agents),
"tasks": agents,
}
@app.get("/tasks/{task_name}")
async def get_task_metadata(task_name: str):
"""Get metadata for a specific task."""
agent = get_agent(task_name)
if not agent:
raise HTTPException(status_code=404, detail=f"Task not found: {task_name}")
return agent
@app.post("/run")
async def run_task(request: TaskInvocationRequest, background_tasks: BackgroundTasks):
"""
Execute a task and return the result.
Request body:
{
"task_name": "identify_risks",
"parameters": {"context_file": "./context.json"},
"context": {"user_id": "user-123"},
"timeout_seconds": 300
}
"""
# Validate task exists
if request.task_name not in TASK_REGISTRY:
raise HTTPException(status_code=404, detail=f"Unknown task: {request.task_name}")
# Execute the task
result = await task_executor(
request.task_name,
request.parameters,
request.context
)
# Cache the result
result_id = f"{request.task_name}-{datetime.now().timestamp()}"
_results_cache[result_id] = result
return result
@app.post("/run/{task_name}")
async def run_task_by_name(task_name: str, parameters: Dict[str, Any] = None, context: Dict[str, Any] = None):
"""
Shorthand endpoint to run a task by name.
Example:
POST /run/identify_risks
{
"context_file": "./context.json"
}
"""
parameters = parameters or {}
context = context or {}
result = await task_executor(task_name, parameters, context)
return result
@app.get("/results/{result_id}")
async def get_result(result_id: str):
"""Retrieve a cached task result."""
if result_id not in _results_cache:
raise HTTPException(status_code=404, detail="Result not found")
return _results_cache[result_id]
# Startup event
@app.on_event("startup")
async def startup_event():
"""Load task registry on startup."""
logger.info(f"Task RPC Server starting")
logger.info(f"Loaded {len(TASK_REGISTRY)} tasks from registry")
for task_name in sorted(TASK_REGISTRY.keys()):
logger.info(f" - {task_name}")
File: planexe/rpc/client.py
"""
Python client for the Task RPC server.
For local use without HTTP overhead.
"""
from typing import Dict, Any
from planexe.rpc.task_runner import task_executor, TaskResult
import asyncio
class TaskRPCClient:
"""Synchronous client for task execution."""
def __init__(self, base_url: str = "http://localhost:8001"):
self.base_url = base_url
def run_task(
self,
task_name: str,
parameters: Dict[str, Any] = None,
context: Dict[str, Any] = None,
timeout_seconds: int = 300,
) -> TaskResult:
"""Execute a task synchronously."""
parameters = parameters or {}
context = context or {}
loop = asyncio.get_event_loop()
return loop.run_until_complete(
task_executor(task_name, parameters, context)
)
async def run_task_async(
self,
task_name: str,
parameters: Dict[str, Any] = None,
context: Dict[str, Any] = None,
) -> TaskResult:
"""Execute a task asynchronously."""
parameters = parameters or {}
context = context or {}
return await task_executor(task_name, parameters, context)
File: planexe/rpc/agents.ts (TypeScript)
Thin wrapper agents that call the RPC interface:
/**
* Auto-generated TypeScript agents that wrap RPC calls to Python tasks.
* These agents are framework-agnostic and can be used with any LLM framework.
*/
interface TaskAgent {
id: string;
displayName: string;
async invoke(context: any): Promise<any>;
}
// Base RPC agent
class RPCAgent implements TaskAgent {
constructor(
public id: string,
public displayName: string,
private taskName: string,
private rpcUrl: string = "http://localhost:8001"
) {}
async invoke(context: any): Promise<any> {
const response = await fetch(`${this.rpcUrl}/run/${this.taskName}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(context),
});
if (!response.ok) {
throw new Error(`Task failed: ${response.statusText}`);
}
const result = await response.json();
return result.result;
}
}
// Auto-generated agent instances
export const identifyRisksAgent = new RPCAgent(
"identify-risks",
"Identify Risks",
"identify_risks"
);
export const resolveAssumptionsAgent = new RPCAgent(
"resolve-assumptions",
"Resolve Assumptions",
"resolve_assumptions"
);
// Export factory for creating agents dynamically
export function createRPCAgent(taskName: string, rpcUrl?: string): TaskAgent {
return new RPCAgent(
taskName,
taskName.replace(/_/g, " ").split(" ").map(w => w[0].toUpperCase() + w.slice(1)).join(" "),
taskName,
rpcUrl
);
}
// Usage example
// const result = await identifyRisksAgent.invoke({ context: {...} });
Setup & Deployment:
# Install dependencies
pip install fastapi uvicorn pydantic
# Run the RPC server
uvicorn planexe.rpc.task_runner:app --host 0.0.0.0 --port 8001
# Or add to docker-compose.yml:
services:
task-rpc:
build: .
ports:
- "8001:8001"
command: uvicorn planexe.rpc.task_runner:app --host 0.0.0.0 --port 8001
depends_on:
- postgres
Example Usage:
# List available tasks
curl http://localhost:8001/tasks
# Get task metadata
curl http://localhost:8001/tasks/identify_risks
# Run a task
curl -X POST http://localhost:8001/run/identify_risks \
-H "Content-Type: application/json" \
-d '{
"context_file": "./context.json"
}'
# Expected response
{
"task_name": "identify_risks",
"status": "success",
"result": {
"risk_registry": [
{
"id": "RISK-001",
"title": "Technical debt in core module",
"severity": "high",
"mitigation": "Refactor module X in Q2"
}
]
},
"execution_time_ms": 2500,
"tokens_used": 1250
}
Benefits
- No drift: TypeScript agents call Python, never duplicate it
- Framework-agnostic: any HTTP client can invoke pipeline stages
- Enables true injection: agents can pre/post-process at any stage
- Composable: stages can be called independently or as part of the full DAG
Limitations
- Higher engineering cost than Option A
- Requires careful API design (context schema, error handling, auth)
- Full DAG execution still goes through Luigi; RPC is for individual stage invocation
Recommendation
Do A now — cheap, immediate, makes the codebase self-documenting for agent frameworks. Generate manifest.json and it becomes the source of truth for any TypeScript definitions.
Plan C for next quarter — RPC injection is the right long-term answer once the manifest (Option A) has proven out which stages external frameworks actually want to call. Build the RPC surface around real usage, not speculation.
Implementation Roadmap
Phase 1: Option A (Week 1)
- Create
planexe/agent_meta.pywith decorator and manifest generation - Decorate 3-5 key tasks (IdentifyRisks, ResolveAssumptions, etc.)
- Generate manifest at build time
- Verify manifest correctness with existing TypeScript agent definitions
- Document decorator pattern for new tasks
Commands:
# Create the decorator module
touch planexe/agent_meta.py
# Decorate key tasks in planexe/pipeline/tasks.py
# Then generate manifest
python -c "from planexe.agent_meta import generate_manifest; generate_manifest()"
# Verify manifest was created
ls -la .agents/manifest.json
cat .agents/manifest.json
Phase 2: Option C (Week 2-3)
- Create
planexe/rpc/task_runner.pywith FastAPI app - Start RPC server alongside Luigi runner
- Add to docker-compose.yml for automated deployment
- Create TypeScript client library to wrap RPC calls
- Test with sample external agents (Codebuff, OpenClaw)
Commands:
# Install RPC dependencies
pip install fastapi uvicorn
# Create RPC module directory
mkdir -p planexe/rpc
touch planexe/rpc/__init__.py
touch planexe/rpc/task_runner.py
touch planexe/rpc/client.py
# Run RPC server
python -m uvicorn planexe.rpc.task_runner:app --host 0.0.0.0 --port 8001
# Test it
curl http://localhost:8001/health
curl http://localhost:8001/tasks
Phase 3: Integration (Week 3-4)
- Update TypeScript agent definitions to use auto-generated manifests
- Create agent wrapper factories from manifest
- Test end-to-end with external frameworks
- Document usage in AGENTS.md
Full Example: Migrate One Task
Before (Task + Duplicate TypeScript Agent)
Python task (planexe/pipeline/tasks.py):
class IdentifyRisks(luigi.Task):
context_file = luigi.PathParameter(default="./context.json")
def requires(self):
return ResolveAssumptions()
def output(self):
return luigi.LocalTarget("./artifacts/risks.json")
def run(self):
# 50 lines of logic
pass
TypeScript agent (duplicate logic):
export const identifyRisksAgent = {
id: "identify-risks",
async invoke(context) {
// DUPLICATED: 50 lines of logic from Python
const risks = [];
for (const assumption of context.assumptions) {
// ... analyze
risks.push(...);
}
return risks;
}
};
Problem: When Python logic changes, TypeScript diverges.
After (Task + Decorator + Thin Agent Wrapper)
Python task (unchanged, but decorated):
@agent_meta(
name="identify_risks",
description="Identifies project risks from assumptions and context.",
tools=["read_context", "read_assumptions"],
outputs=["risk_registry"],
stage="risk_assumptions",
tags=["analysis", "risk"],
)
class IdentifyRisks(luigi.Task):
context_file = luigi.PathParameter(default="./context.json")
def requires(self):
return ResolveAssumptions()
def output(self):
return luigi.LocalTarget("./artifacts/risks.json")
def run(self):
# Same 50 lines of logic
pass
TypeScript agent (thin wrapper, no duplicated logic):
export const identifyRisksAgent = new RPCAgent(
"identify-risks",
"Identify Risks",
"identify_risks"
);
// Or from manifest:
// const manifest = await fetch("/.agents/manifest.json").then(r => r.json());
// const identifyRisks = manifest.agents.find(a => a.name === "identify_risks");
// export const identifyRisksAgent = createRPCAgent(identifyRisks.name);
Manifest (auto-generated):
{
"agents": [
{
"name": "identify_risks",
"displayName": "Identify Risks",
"description": "Identifies project risks from assumptions and context.",
"pythonClass": "planexe.pipeline.tasks.IdentifyRisks",
...
}
]
}
Result: Single source of truth. TypeScript never diverges from Python.