---
name: duration-governance
description: Monitor and govern automated AI tasks for time spent, API costs, and error rates. Detect compounding errors, recommend kill switches, and generate monitoring code. Use when analyzing multi-step AI workflows, debugging expensive automations, or implementing cost controls.
---

# Duration Governance

Monitor, analyze, and control the **time and cost** of multi-step AI automation workflows.

## The Problem

Multi-step AI workflows suffer from **compounding error rates**:
- A workflow with 95% accuracy per step over 20 steps = **36% success rate**
- Small errors at Step 3 compound into expensive failures at Step 18
- By the time humans notice, agents have burned significant compute creating liabilities

**Example math:**
- Single agent with retry loops: $2,500/month
- 500 agents × $50/day errors = **$750k/month waste**

## What This Skill Teaches

How to implement **Duration Governance**: tracking time, cost, and errors across automated tasks to prevent invisible budget leaks.

### Core Metrics to Track

1. **Time Spent** — wall-clock duration per task
2. **API Costs** — cumulative $ spent (LLM calls, external APIs)
3. **Error Rate** — failures, retries, drift events

### When to Use This Skill

Trigger this skill when:
- Building multi-step AI automation
- Debugging expensive workflows
- Setting up cost monitoring
- Analyzing task performance
- Implementing kill switches
- User asks about "duration governance", "task budgets", "automation costs"

---

## 1. Instrumentation Patterns

### Basic Task Logging Schema

Every automated task should log:

```python
{
  "task_id": "uuid",
  "task_type": "content_pipeline | rank_check | audit | email_response",
  "started_at": "2026-02-10T14:30:00Z",
  "ended_at": "2026-02-10T14:35:23Z",
  "duration_seconds": 323,
  "status": "success | failed | killed",
  "cost_usd": 0.45,
  "steps_completed": 12,
  "steps_total": 12,
  "errors": [
    {
      "step": 4,
      "type": "api_error | hallucination | timeout",
      "message": "Vendor name drift detected",
      "cost_impact_usd": 0.12
    }
  ],
  "kill_reason": null  # or "budget_exceeded | timeout | error_threshold"
}
```

### Database Table (Recommended)

```sql
CREATE TABLE task_runs (
  id UUID PRIMARY KEY,
  task_type VARCHAR(100) NOT NULL,
  started_at TIMESTAMP NOT NULL,
  ended_at TIMESTAMP,
  duration_seconds INTEGER,
  status VARCHAR(20),
  cost_usd DECIMAL(10,4),
  steps_completed INTEGER,
  steps_total INTEGER,
  error_count INTEGER DEFAULT 0,
  kill_reason VARCHAR(100),
  metadata JSONB,
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_task_runs_type_status ON task_runs(task_type, status);
CREATE INDEX idx_task_runs_started ON task_runs(started_at DESC);
CREATE INDEX idx_task_runs_cost ON task_runs(cost_usd DESC);
```

---

## 2. Instrumentation Code Examples

### Python Decorator Pattern

```python
import time
from functools import wraps
from decimal import Decimal
import logging

logger = logging.getLogger(__name__)

class TaskMonitor:
    def __init__(self, db_session):
        self.db = db_session
        
    def track_task(self, task_type: str, cost_limit_usd: float = None, 
                   time_limit_seconds: int = None):
        """
        Decorator to track task duration, cost, and errors.
        
        Usage:
            @task_monitor.track_task("content_pipeline", cost_limit_usd=5.0)
            async def generate_content(topic: str):
                ...
        """
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                task_id = uuid.uuid4()
                started_at = datetime.utcnow()
                cost_tracker = CostTracker()
                error_log = []
                
                try:
                    # Inject cost tracker into function context
                    kwargs['_cost_tracker'] = cost_tracker
                    kwargs['_error_log'] = error_log
                    
                    result = await func(*args, **kwargs)
                    
                    # Check cost limit
                    if cost_limit_usd and cost_tracker.total > cost_limit_usd:
                        raise BudgetExceededError(
                            f"Task exceeded ${cost_limit_usd} limit (spent ${cost_tracker.total})"
                        )
                    
                    # Log success
                    self._log_task(
                        task_id=task_id,
                        task_type=task_type,
                        started_at=started_at,
                        ended_at=datetime.utcnow(),
                        status="success",
                        cost_usd=cost_tracker.total,
                        errors=error_log
                    )
                    
                    return result
                    
                except Exception as e:
                    # Log failure
                    self._log_task(
                        task_id=task_id,
                        task_type=task_type,
                        started_at=started_at,
                        ended_at=datetime.utcnow(),
                        status="failed",
                        cost_usd=cost_tracker.total,
                        errors=error_log + [{"type": "exception", "message": str(e)}],
                        kill_reason=type(e).__name__
                    )
                    raise
                    
            return wrapper
        return decorator
    
    def _log_task(self, task_id, task_type, started_at, ended_at, 
                  status, cost_usd, errors, kill_reason=None):
        duration = (ended_at - started_at).total_seconds()
        
        self.db.execute(
            """
            INSERT INTO task_runs 
            (id, task_type, started_at, ended_at, duration_seconds, 
             status, cost_usd, error_count, kill_reason, metadata)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """,
            (task_id, task_type, started_at, ended_at, int(duration),
             status, cost_usd, len(errors), kill_reason, json.dumps({"errors": errors}))
        )
        self.db.commit()
        
        logger.info(
            f"Task {task_type} [{task_id}]: {status} in {duration:.1f}s, "
            f"cost ${cost_usd:.4f}, {len(errors)} errors"
        )

class CostTracker:
    def __init__(self):
        self.total = Decimal('0.00')
        self.breakdown = []
    
    def add(self, cost: float, label: str):
        self.total += Decimal(str(cost))
        self.breakdown.append({"label": label, "cost": cost})
        
    def add_llm_call(self, model: str, input_tokens: int, output_tokens: int):
        cost = calculate_llm_cost(model, input_tokens, output_tokens)
        self.add(cost, f"LLM: {model}")
```

### Usage in Task Function

```python
@task_monitor.track_task("seo_content_pipeline", cost_limit_usd=5.0, time_limit_seconds=600)
async def generate_seo_article(topic: str, _cost_tracker=None, _error_log=None):
    # Step 1: Generate outline
    outline_response = await llm.complete("gpt-4", f"Create outline for {topic}")
    _cost_tracker.add_llm_call("gpt-4", outline_response.input_tokens, outline_response.output_tokens)
    
    # Step 2: Validate outline
    if not validate_outline(outline_response.text):
        _error_log.append({
            "step": 2,
            "type": "validation_failed",
            "message": "Outline missing key sections"
        })
        raise ValidationError("Invalid outline structure")
    
    # Step 3-12: Continue pipeline...
    # Each step logs cost and errors
    
    return final_article
```

---

## 3. Analysis Queries

### Daily Cost Summary

```sql
SELECT 
  task_type,
  DATE(started_at) as date,
  COUNT(*) as runs,
  SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
  SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
  ROUND(AVG(duration_seconds), 1) as avg_duration_sec,
  ROUND(SUM(cost_usd), 2) as total_cost_usd,
  ROUND(AVG(cost_usd), 4) as avg_cost_usd
FROM task_runs
WHERE started_at >= NOW() - INTERVAL '7 days'
GROUP BY task_type, DATE(started_at)
ORDER BY date DESC, total_cost_usd DESC;
```

### Find Expensive Failures

```sql
SELECT 
  id,
  task_type,
  started_at,
  duration_seconds,
  cost_usd,
  error_count,
  kill_reason,
  metadata->>'errors' as error_details
FROM task_runs
WHERE status = 'failed' 
  AND cost_usd > 1.0
ORDER BY cost_usd DESC
LIMIT 20;
```

### Calculate Compounding Error Rate

```python
def calculate_workflow_success_rate(per_step_accuracy: float, num_steps: int) -> float:
    """
    Calculate actual success rate for multi-step workflow.
    
    Example:
        95% per-step accuracy over 20 steps = 35.8% overall success
    """
    return per_step_accuracy ** num_steps

def analyze_pipeline_drift(task_type: str, days: int = 7):
    """
    Detect if error rates are increasing over time.
    """
    query = """
    SELECT 
      DATE(started_at) as date,
      COUNT(*) as total_runs,
      AVG(error_count) as avg_errors_per_run,
      SUM(cost_usd) as daily_cost
    FROM task_runs
    WHERE task_type = %s 
      AND started_at >= NOW() - INTERVAL '%s days'
    GROUP BY DATE(started_at)
    ORDER BY date;
    """
    
    results = db.execute(query, (task_type, days))
    
    # Detect if avg errors trending up
    error_trend = [r['avg_errors_per_run'] for r in results]
    if len(error_trend) >= 3:
        recent_avg = sum(error_trend[-3:]) / 3
        older_avg = sum(error_trend[:3]) / 3
        if recent_avg > older_avg * 1.5:
            return {
                "status": "WARNING",
                "message": f"Error rate increased 50%+ in last 3 days",
                "older_avg": older_avg,
                "recent_avg": recent_avg
            }
    
    return {"status": "OK"}
```

---

## 4. Kill Switch Patterns

### Budget Kill Switch

```python
class BudgetKillSwitch:
    def __init__(self, limit_usd: float):
        self.limit = limit_usd
        self.spent = Decimal('0.00')
    
    def check(self, new_cost: float):
        self.spent += Decimal(str(new_cost))
        if self.spent > self.limit:
            raise BudgetExceededError(
                f"Task budget ${self.limit} exceeded (spent ${self.spent})"
            )

# Usage in workflow
kill_switch = BudgetKillSwitch(limit_usd=5.0)

for step in pipeline_steps:
    result, cost = await step.execute()
    kill_switch.check(cost)
```

### Time Kill Switch

```python
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def time_limit(seconds: int):
    """
    Context manager to enforce time limits on tasks.
    
    Usage:
        async with time_limit(300):  # 5 minutes
            await long_running_task()
    """
    async def timeout():
        await asyncio.sleep(seconds)
        raise TimeoutError(f"Task exceeded {seconds}s limit")
    
    timeout_task = asyncio.create_task(timeout())
    
    try:
        yield
        timeout_task.cancel()
    except Exception:
        timeout_task.cancel()
        raise
```

### Drift Gate Pattern

```python
class DriftGate:
    """
    Detect when task behavior changes unexpectedly.
    """
    def __init__(self, task_type: str, window_size: int = 100):
        self.task_type = task_type
        self.window_size = window_size
        self.baseline = self._get_baseline()
    
    def _get_baseline(self):
        """Get average duration + cost for recent successful runs."""
        query = """
        SELECT 
          AVG(duration_seconds) as avg_duration,
          AVG(cost_usd) as avg_cost
        FROM task_runs
        WHERE task_type = %s 
          AND status = 'success'
          AND started_at >= NOW() - INTERVAL '7 days'
        """
        result = db.execute(query, (self.task_type,)).fetchone()
        return {
            "avg_duration": result['avg_duration'] or 60,
            "avg_cost": result['avg_cost'] or 0.50
        }
    
    def check(self, duration: float, cost: float):
        """Flag if current run deviates >2x from baseline."""
        if duration > self.baseline['avg_duration'] * 2:
            logger.warning(
                f"Drift detected: duration {duration}s vs baseline {self.baseline['avg_duration']}s"
            )
        
        if cost > self.baseline['avg_cost'] * 2:
            logger.warning(
                f"Drift detected: cost ${cost} vs baseline ${self.baseline['avg_cost']}"
            )
```

---

## 5. Monitoring Dashboard Queries

### Real-Time Task Health

```sql
-- Last 100 task runs with health indicators
SELECT 
  task_type,
  started_at,
  duration_seconds,
  cost_usd,
  status,
  error_count,
  CASE 
    WHEN cost_usd > (
      SELECT AVG(cost_usd) * 2 
      FROM task_runs t2 
      WHERE t2.task_type = t1.task_type 
        AND t2.status = 'success'
    ) THEN 'COST_ANOMALY'
    WHEN duration_seconds > (
      SELECT AVG(duration_seconds) * 2 
      FROM task_runs t2 
      WHERE t2.task_type = t1.task_type 
        AND t2.status = 'success'
    ) THEN 'DURATION_ANOMALY'
    WHEN error_count > 3 THEN 'HIGH_ERRORS'
    ELSE 'OK'
  END as health_flag
FROM task_runs t1
ORDER BY started_at DESC
LIMIT 100;
```

### Cost Leaderboard (Top Spenders)

```sql
SELECT 
  task_type,
  SUM(cost_usd) as total_cost_7d,
  COUNT(*) as runs,
  ROUND(AVG(cost_usd), 4) as avg_cost_per_run,
  SUM(CASE WHEN status = 'failed' THEN cost_usd ELSE 0 END) as wasted_on_failures
FROM task_runs
WHERE started_at >= NOW() - INTERVAL '7 days'
GROUP BY task_type
ORDER BY total_cost_7d DESC;
```

---

## 6. Alert Rules (When to Notify)

Trigger alerts when:

1. **Single task cost > $10** (possible runaway)
2. **Daily cost for task type > 2x weekly average**
3. **Error rate > 30% for task type** (over last hour)
4. **Any task duration > 30 minutes** (stuck?)
5. **Drift detected:** duration or cost > 2x baseline

Example alert function:

```python
def check_alerts():
    alerts = []
    
    # Check for expensive single tasks
    expensive = db.execute("""
        SELECT id, task_type, cost_usd 
        FROM task_runs 
        WHERE cost_usd > 10 
          AND started_at >= NOW() - INTERVAL '1 hour'
    """).fetchall()
    
    for task in expensive:
        alerts.append({
            "severity": "HIGH",
            "message": f"Task {task['id']} cost ${task['cost_usd']:.2f}",
            "type": "expensive_task"
        })
    
    # Check error rate spike
    error_rate = db.execute("""
        SELECT 
          task_type,
          COUNT(*) as total,
          SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures
        FROM task_runs
        WHERE started_at >= NOW() - INTERVAL '1 hour'
        GROUP BY task_type
        HAVING SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END)::float / COUNT(*) > 0.3
    """).fetchall()
    
    for task in error_rate:
        alerts.append({
            "severity": "MEDIUM",
            "message": f"{task['task_type']} error rate: {task['failures']}/{task['total']}",
            "type": "high_error_rate"
        })
    
    return alerts
```

---

## 7. Cost Calculation Helpers

### LLM Pricing Table (2026)

```python
LLM_PRICING = {
    # Model: (input_cost_per_1M_tokens, output_cost_per_1M_tokens)
    "gpt-4": (30.00, 60.00),
    "gpt-4-turbo": (10.00, 30.00),
    "gpt-3.5-turbo": (0.50, 1.50),
    "claude-opus-4": (15.00, 75.00),
    "claude-sonnet-4": (3.00, 15.00),
    "gemini-pro": (0.50, 1.50),
}

def calculate_llm_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """Calculate cost for LLM API call."""
    if model not in LLM_PRICING:
        logger.warning(f"Unknown model {model}, using default pricing")
        input_cost, output_cost = 5.00, 15.00
    else:
        input_cost, output_cost = LLM_PRICING[model]
    
    total_cost = (
        (input_tokens / 1_000_000) * input_cost +
        (output_tokens / 1_000_000) * output_cost
    )
    
    return round(total_cost, 6)
```

---

## Quick Start Checklist

To add Duration Governance to a project:

1. **Create task_runs table** (see schema above)
2. **Add TaskMonitor class** to your project
3. **Wrap automated functions** with `@track_task` decorator
4. **Set cost/time limits** based on expected task value
5. **Add kill switches** for budget and time
6. **Create alerts** for expensive failures
7. **Dashboard** → query task_runs for daily summaries

---

## Key Gotchas

1. **Don't log synchronously** — use async inserts or queue to avoid blocking tasks
2. **Timestamp precision** — use UTC always, store with timezone
3. **Cost attribution** — track which step/API caused each cost, not just totals
4. **Baseline drift** — recalculate baselines weekly as workflows improve
5. **Alert fatigue** — start with high thresholds, tune down based on real failures

---

## Example Output (When Analyzing a Workflow)

```
📊 Duration Governance Report: seo_content_pipeline

Time Range: Last 7 days
Total Runs: 247
Success Rate: 78% (193 success, 54 failed)

💰 Cost Analysis:
  Total Spent: $1,247.80
  Avg per Run: $5.05
  Wasted (failures): $312.40 (25%)

⏱️ Duration Analysis:
  Avg Duration: 4m 23s
  Longest: 18m 12s (killed: timeout)
  Shortest: 1m 45s

⚠️ Compounding Error Analysis:
  Steps per Run: 12
  Per-Step Accuracy: 92%
  Predicted Success Rate: 36.9%
  Actual Success Rate: 78%
  ✅ Better than predicted (good breakpoint validation)

🚨 Issues Detected:
  - 14 tasks exceeded $10 budget (avg: $12.40)
  - 3 tasks exceeded 15min timeout
  - Error rate spiked 40% on Feb 8 (vendor API change)

💡 Recommendations:
  1. Add budget kill switch at $8 (before $10 failures)
  2. Add breakpoint validation at Step 6 (highest error step)
  3. Reduce timeout to 10min (18min outliers are failures anyway)
  4. Monitor vendor API response times (new drift pattern)

Estimated Monthly Savings: $468/month
```

---

## When NOT to Use This

- Single-step, one-off tasks (overhead not worth it)
- Tasks that cost <$0.01 (logging costs more than insights)
- Purely deterministic workflows with no API calls

---

## Further Reading

- Original concept: Srini Annamaraju (LinkedIn, Feb 2026)
- Compounding probability: https://en.wikipedia.org/wiki/Compound_probability
- OpenTelemetry for distributed tracing: https://opentelemetry.io/

---

**Status:** Production-ready  
**Last Updated:** 2026-02-10  
**Maintained by:** Sandeep Kelvadi / Thrivemattic
