Metadata-Version: 2.4
Name: acgp
Version: 0.1.2
Summary: Agentic Cognitive Governance Protocol - Add AI agent oversight with real-time feedback in 5 lines of code
Project-URL: Homepage, https://meaningstack.com
Project-URL: Documentation, https://github.com/meaningstack/acgp/tree/main/docs
Project-URL: Repository, https://github.com/meaningstack/acgp
Project-URL: Issues, https://github.com/meaningstack/acgp/issues
Project-URL: Changelog, https://github.com/meaningstack/acgp/blob/main/CHANGELOG.md
Author-email: MeaningStack <Admin@meaningstack.com>
Maintainer-email: MeaningStack <Admin@meaningstack.com>
License-Expression: MIT
License-File: LICENSE
Keywords: agents,ai,autogen,cognitive,compliance,crewai,feedback,governance,gpt-researcher,langchain,llm,monitoring,oversight,quality,real-time,sse
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Monitoring
Classifier: Typing :: Typed
Requires-Python: >=3.9
Requires-Dist: httpx>=0.25.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: pyyaml>=6.0
Provides-Extra: all
Requires-Dist: anthropic>=0.17.0; extra == 'all'
Requires-Dist: crewai>=0.28.0; extra == 'all'
Requires-Dist: langchain>=0.1.0; extra == 'all'
Provides-Extra: anthropic
Requires-Dist: anthropic>=0.17.0; extra == 'anthropic'
Provides-Extra: crewai
Requires-Dist: crewai>=0.28.0; extra == 'crewai'
Provides-Extra: dev
Requires-Dist: black>=23.0.0; extra == 'dev'
Requires-Dist: mypy>=1.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.0.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Provides-Extra: langchain
Requires-Dist: langchain>=0.1.0; extra == 'langchain'
Description-Content-Type: text/markdown

# ACGP

**Agentic Cognitive Governance Protocol** - Lightweight Python SDK for AI agent oversight integration.

> **Developed and maintained by [MeaningStack](https://meaningstack.com)**

[![PyPI version](https://badge.fury.io/py/acgp.svg)](https://badge.fury.io/py/acgp)
[![Python 3.9+](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

## Overview

ACGP provides a minimal-footprint way to integrate Steward Agent oversight into any AI agent pipeline. It works with any Python agent framework (CrewAI, LangChain, custom agents) without blocking your agent's execution.

```
Client Code -> ACGP (lightweight wrapper) -> Steward API -> Backend Infrastructure
                                                    |
                                                    v
                                          SSE Stream (real-time feedback)
```

## Features

- **Framework-agnostic**: Works with any Python agent (CrewAI, LangChain, AutoGen, custom)
- **Non-blocking**: Async queue ensures traces never slow down your agent
- **Real-time feedback**: SSE streaming delivers oversight feedback via callbacks
- **Auto-batching**: Accumulates events, flushes at configurable thresholds
- **Retry & circuit breaker**: Resilient transport with exponential backoff
- **Graceful degradation**: Works offline, queues events locally
- **Tiny footprint**: ~1.5MB core dependencies

## Installation

```bash
pip install acgp

# With optional framework support
pip install acgp[crewai]
pip install acgp[langchain]
pip install acgp[all]
```

## Quick Start

### Real-time Feedback (Recommended)

The recommended pattern uses a callback for real-time feedback. Feedback arrives automatically via SSE streaming as traces are processed.

```python
from acgp import ACGPClient

# Define callback to handle feedback
def handle_feedback(trace_id: str, feedback_data: dict):
    print(f"Received feedback for trace {trace_id}")

    ctq = feedback_data.get('ctq_scores', {})
    print(f"  Composite Score: {ctq.get('composite_score')}")
    print(f"  Feedback: {feedback_data.get('feedback')}")

    # React to low quality scores
    if ctq.get('composite_score', 1.0) < 0.5:
        print("  Warning: Low quality score - consider adjustments")

    # Check for human-in-the-loop requirement
    if feedback_data.get('requires_hitl'):
        print("  ALERT: Human review required!")

# Initialize with callback
client = ACGPClient(
    api_key="sk_live_...",
    endpoint="https://api.steward-agent.com",
    on_feedback=handle_feedback,  # Enables real-time SSE streaming
)

# Trace your agents - use unique agent_tag for each agent
with client.trace(
    agent_type="planner",
    agent_tag="myapp-query-planner",  # Unique identifier for this agent
    goal="Analyze user query"
) as trace:
    result = my_planner_agent.run(query)
    trace.set_output(result)
    trace.set_reasoning(my_planner_agent.reasoning_log)

with client.trace(
    agent_type="executor",
    agent_tag="myapp-task-executor",  # Unique identifier for this agent
    goal="Execute the plan"
) as trace:
    result = my_executor_agent.run(plan)
    trace.set_output(result)

# Wait for all feedback before shutdown (auto-flushes pending traces)
client.wait_for_feedback(timeout=30.0)
client.close()
```

### Manual Polling (Alternative)

For simpler use cases or when callbacks are not suitable:

```python
from acgp import ACGPClient

client = ACGPClient(
    api_key="sk_live_...",
    endpoint="https://api.steward-agent.com",
)

# Capture trace
with client.trace(agent_type="planner", goal="Analyze query") as trace:
    result = my_agent.run(query)
    trace.set_output(result)

# Flush and poll for results
client.flush()
score = client.get_score(trace.trace_id)      # Auto-retries until processed
feedback = client.get_feedback(trace.trace_id)

print(f"Score: {score}")
print(f"Feedback: {feedback}")

client.close()
```

## API Reference

### ACGPClient

The main client for interacting with the Steward Agent backend.

```python
client = ACGPClient(
    api_key="sk_live_...",           # Required: Your API key
    endpoint="https://...",           # Required: Backend URL
    on_feedback=callback_fn,          # Optional: Real-time feedback callback
    on_flush_complete=flush_callback, # Optional: Called after each batch flush
    sse_config=SSEConfig(...),        # Optional: SSE configuration
    debug=False,                      # Optional: Enable debug logging
)
```

#### Methods

| Method | Description |
|--------|-------------|
| `trace(agent_type, goal, ...)` | Context manager for tracing agent execution |
| `capture_trace(...)` | Manually capture a trace without context manager |
| `flush()` | Force-send all pending traces immediately |
| `wait_for_feedback(timeout, auto_flush)` | Wait for SSE feedback (auto-flushes by default) |
| `get_score(trace_id)` | Get CTQ score for a trace (with retry) |
| `get_feedback(trace_id)` | Get oversight feedback for a trace (with retry) |
| `get_metrics()` | Get SDK metrics (queue size, SSE status, etc.) |
| `close(timeout)` | Gracefully shutdown client |

#### Properties

| Property | Description |
|----------|-------------|
| `sse_connected` | Boolean indicating if SSE stream is connected |
| `pending_feedback_count` | Number of traces waiting for feedback |

### TraceContext

Returned by `client.trace()` context manager.

| Parameter | Required | Description |
|-----------|----------|-------------|
| `agent_type` | Yes | Type of agent: "planner", "executor", or custom |
| `goal` | Yes | What the agent is trying to accomplish |
| `agent_tag` | Recommended | Unique identifier for this agent (defaults to "A"/"B" if not provided) |

```python
with client.trace(
    agent_type="planner",
    agent_tag="myapp-planner",  # Recommended: unique tag
    goal="Plan the task"
) as trace:
    # Set output (required)
    trace.set_output("The plan is...")

    # Set reasoning (recommended)
    trace.set_reasoning("Step 1: Analyzed input. Step 2: ...")

    # Add context information
    trace.add_context("model", "gpt-4")
    trace.add_context("temperature", 0.7)

    # Set documents used (for RAG agents)
    trace.set_documents(
        documents=["doc1.pdf", "doc2.pdf"],
        similarity_scores=[0.95, 0.87]
    )

# Access trace_id after context exits
print(f"Trace ID: {trace.trace_id}")
```

### Configuration

#### Environment Variables

```bash
# Required
ACGP_API_KEY=sk_live_...
ACGP_ENDPOINT=https://api.steward-agent.com

# Optional
ACGP_ENABLED=true              # Master switch (default: true)
ACGP_DEBUG=false               # Debug logging (default: false)
ACGP_FLUSH_INTERVAL=5.0        # Seconds between auto-flushes (default: 5.0)
ACGP_BATCH_SIZE=100            # Max traces per batch (default: 100)
ACGP_MAX_QUEUE_SIZE=10000      # Max queued traces (default: 10000)
ACGP_MAX_RETRIES=3             # Retry attempts (default: 3)
ACGP_FAIL_SILENTLY=true        # Don't raise on errors (default: true)
```

#### SSEConfig

Customize SSE streaming behavior:

```python
from acgp import ACGPClient, SSEConfig

sse_config = SSEConfig(
    reconnect_delay=1.0,       # Initial reconnect delay (seconds)
    max_reconnect_delay=30.0,  # Maximum reconnect delay
    connection_timeout=10.0,   # Connection timeout
    read_timeout=300.0,        # Read timeout (5 minutes)
)

client = ACGPClient(
    api_key="sk_...",
    on_feedback=handle_feedback,
    sse_config=sse_config,
)
```

### Decorator Pattern

For simple function-based agents:

```python
from acgp import ACGPClient, acgp_trace

client = ACGPClient(api_key="sk_...")

@acgp_trace(client, agent_type="planner", agent_tag="myapp-planner")
def my_planning_agent(query: str) -> str:
    # Your agent logic
    return plan

@acgp_trace(client, agent_type="executor", agent_tag="myapp-executor")
def my_execution_agent(plan: str) -> str:
    # Your agent logic
    return result

# Traces are automatically captured
plan = my_planning_agent("What should I do?")
result = my_execution_agent(plan)
```

## Agent Tagging Best Practices

The `agent_tag` parameter identifies individual agents in your dashboard. While it defaults to "A" for planners and "B" for executors, **you should provide unique, descriptive tags** to avoid confusion when:

- Running multiple applications with the same API key
- Having multiple agents of the same type
- Distinguishing agents across different environments

### Recommended Naming Conventions

```python
# Format: {app-name}-{agent-role} or {app-name}/{agent-role}

# Good: Unique, descriptive tags
with client.trace(
    agent_type="planner",
    agent_tag="inventory-system-planner",
    goal="Plan inventory update"
) as trace:
    ...

with client.trace(
    agent_type="executor",
    agent_tag="inventory-system-executor",
    goal="Execute inventory changes"
) as trace:
    ...

# For multi-agent systems
with client.trace(agent_type="planner", agent_tag="crm-lead-scorer", goal="...") as trace: ...
with client.trace(agent_type="executor", agent_tag="crm-email-sender", goal="...") as trace: ...
with client.trace(agent_type="executor", agent_tag="crm-data-enricher", goal="...") as trace: ...
```

### Why Unique Tags Matter

Without unique tags, agents from different apps get mixed in the dashboard:

```python
# App 1: E-commerce
with client.trace(agent_type="planner", goal="...") as trace:  # Gets tag "A"
    ...

# App 2: Customer Support (same API key)
with client.trace(agent_type="planner", goal="...") as trace:  # Also gets tag "A"
    ...

# Dashboard shows both under "Agent A" - impossible to distinguish!
```

With unique tags:

```python
# App 1: E-commerce
with client.trace(agent_type="planner", agent_tag="ecommerce-order-planner", goal="...") as trace:
    ...

# App 2: Customer Support
with client.trace(agent_type="planner", agent_tag="support-ticket-planner", goal="...") as trace:
    ...

# Dashboard shows each agent separately with clear identification
```

## Framework Integration Examples

### CrewAI

```python
from crewai import Agent, Task, Crew
from acgp import ACGPClient

client = ACGPClient(api_key="sk_...", on_feedback=handle_feedback)

agent = Agent(role="Researcher", goal="Research topics", ...)
task = Task(description="Research AI trends", agent=agent)
crew = Crew(agents=[agent], tasks=[task])

with client.trace(
    agent_type="unified",
    agent_tag="crewai-research-agent",  # Unique tag for this CrewAI agent
    goal="Research AI trends"
) as trace:
    result = crew.kickoff()
    trace.set_output(str(result.raw))
    trace.set_reasoning("CrewAI research workflow")

client.wait_for_feedback(timeout=30.0)
client.close()
```

### LangChain

```python
from langchain.agents import initialize_agent
from acgp import ACGPClient

client = ACGPClient(api_key="sk_...", on_feedback=handle_feedback)

agent = initialize_agent(tools, llm, agent="zero-shot-react-description")

with client.trace(
    agent_type="executor",
    agent_tag="langchain-react-agent",  # Unique tag
    goal=user_query
) as trace:
    result = agent.run(user_query)
    trace.set_output(result)

client.wait_for_feedback(timeout=30.0)
client.close()
```

### GPT-Researcher

```python
from gpt_researcher import GPTResearcher
from acgp import ACGPClient

client = ACGPClient(api_key="sk_...", on_feedback=handle_feedback)
researcher = GPTResearcher(query=query)

# Trace planner phase
with client.trace(
    agent_type="planner",
    agent_tag="gpt-researcher-planner",  # Unique tag
    goal=f"Plan research: {query}"
) as trace:
    await researcher.conduct_research()
    trace.set_output(f"Agent: {researcher.agent}")

# Trace executor phase
with client.trace(
    agent_type="executor",
    agent_tag="gpt-researcher-writer",  # Unique tag
    goal=f"Generate report: {query}"
) as trace:
    report = await researcher.write_report()
    trace.set_output(report)

client.wait_for_feedback(timeout=30.0)
client.close()
```

## Metrics & Monitoring

```python
metrics = client.get_metrics()

print(f"SDK Enabled: {metrics['enabled']}")
print(f"Configured: {metrics['configured']}")
print(f"Queue Started: {metrics['queue_started']}")
print(f"SSE Enabled: {metrics['sse_enabled']}")
print(f"SSE Connected: {metrics['sse_connected']}")
print(f"Pending Feedback: {metrics['pending_feedback']}")
print(f"Events Sent: {metrics.get('events_sent', 0)}")
print(f"Events Dropped: {metrics.get('events_dropped', 0)}")
```

## Error Handling

By default, the SDK fails silently to avoid disrupting your agent's execution:

```python
# Default: fail_silently=True
client = ACGPClient(api_key="sk_...")  # Errors logged but not raised

# Strict mode: raise exceptions
client = ACGPClient(api_key="sk_...", fail_silently=False)

try:
    with client.trace(...) as trace:
        # ...
except Exception as e:
    print(f"Trace failed: {e}")
```

## Architecture

```
acgp/
├── __init__.py        # Public exports
├── client.py          # ACGPClient - main entry point
├── config.py          # Configuration management
├── transport/         # Network layer
│   ├── async_queue.py    # Non-blocking event queue
│   ├── batch_sender.py   # Batched HTTP delivery
│   ├── sse_client.py     # Real-time SSE streaming
│   └── retry.py          # Retry with circuit breaker
├── models/            # Data models
└── adapters/          # Framework-specific adapters
```

### SDK Trace Capture Flow

How the SDK captures and transmits traces without blocking agent execution.

```mermaid
sequenceDiagram
    autonumber
    participant Agent as Client Agent
    participant Wrapper as StewardAgentWrapper
    participant Trace as TraceContext
    participant Queue as AsyncEventQueue
    participant Sender as BatchSender
    participant API as Backend API

    Agent->>Wrapper: wrapped_agent.run(input)
    Wrapper->>Trace: Create trace context
    Trace->>Trace: Record start timestamp
    Trace->>Trace: Capture goal

    Wrapper->>Agent: Execute original agent
    Agent-->>Wrapper: Return result

    Wrapper->>Trace: Set output
    Wrapper->>Trace: Set reasoning (if available)
    Wrapper->>Trace: Record end timestamp

    Trace->>Queue: Enqueue trace event
    Note over Queue: Non-blocking enqueue
    Queue-->>Trace: Queued (immediate return)

    Wrapper-->>Agent: Return result (no delay)

    Note over Queue,Sender: Background processing

    loop Every flush_interval (5s default)
        Queue->>Sender: Flush batch
        Sender->>API: POST /ingest/batch
        API-->>Sender: 202 Accepted
    end
```

**Key Points:**
- Wrapper intercepts agent execution transparently
- Trace context captures timing, goal, reasoning, output
- Queue is non-blocking - agent never waits for governance
- BatchSender groups traces for efficient transmission

## Core Dependencies

- `httpx>=0.25.0` - Modern async HTTP client
- `pydantic>=2.0.0` - Data validation
- `python-dotenv>=1.0.0` - Environment variable loading

## Development

```bash
# Clone repository
git clone https://github.com/meaningstack/acgp.git
cd acgp

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black acgp/
ruff check acgp/

# Type checking
mypy acgp/
```

## Contributing

Contributions are welcome! Please read our [Contributing Guide](CONTRIBUTING.md) for details.

## License

MIT License - see [LICENSE](LICENSE) for details.

Copyright (c) 2025 [MeaningStack](https://meaningstack.com)

## Support

- Website: https://meaningstack.com
- Documentation: TBD
- Issues: TBD
- Email: Admin@meaningstack.com
