Metadata-Version: 2.3
Name: abs-auth-rbac-core
Version: 0.7.1
Summary: RBAC and Auth core utilities including JWT token management.
License: MIT
Author: AutoBridgeSystems
Author-email: info@autobridgesystems.com
Requires-Python: >=3.11,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: abs-exception-core (>=0.3.0,<0.4.0)
Requires-Dist: abs-nosql-repository-core (>=0.12.0,<1.0.0)
Requires-Dist: abs-repository-core (>=0.4.0,<1.0.0)
Requires-Dist: abs-utils (>=0.5.0,<0.6.0)
Requires-Dist: azure-identity (>=1.25.2,<2.0.0)
Requires-Dist: azure-servicebus (>=7.14.3,<8.0.0)
Requires-Dist: cryptography (>=46.0.5)
Requires-Dist: fastapi[standard] (>=0.130.0)
Requires-Dist: passlib (>=1.7.4,<2.0.0)
Requires-Dist: psycopg2-binary (>=2.9.10,<3.0.0)
Requires-Dist: pyjwt (>=2.10.1,<3.0.0)
Requires-Dist: sqlalchemy (>=2.0.40,<3.0.0)
Requires-Dist: starlette (>=0.49.1)
Description-Content-Type: text/markdown

# ABS Auth RBAC Core

A comprehensive authentication and Role-Based Access Control (RBAC) package for FastAPI applications. This package provides robust JWT-based authentication and flexible role-based permission management with an in-memory per-user permission cache and Azure Service Bus for cross-server cache invalidation.

## What's New in v0.6.0

**Casbin has been removed.** The RBAC system now uses a per-user in-memory permission cache with targeted Azure Service Bus invalidation. This eliminates the 3-4 second full policy reload that caused 403 errors during permission updates.

### Key Changes

| | Before (Casbin) | After (v0.6.0) |
|---|---|---|
| On permission change | Reloads ALL policies (full table scan) | Invalidates ONLY affected user's cache entry |
| Reload time | 3-4 seconds | ~200-500ms |
| Memory model | Monolithic store of every policy | Per-user granular cache (loaded on demand) |
| Read path | In-memory lookup (stale for 3-4s during reload) | In-memory lookup (stale for max ~500ms) |
| Dependencies removed | - | `casbin`, `casbin-sqlalchemy-adapter`, `casbin-redis-watcher` |

### Migration Guide

**Constructor** — The `RBACService` constructor now accepts `**kwargs` for backward compatibility. Old `redis_config` or `servicebus_config` (for the Casbin watcher) will be silently accepted. To enable cross-server cache invalidation, pass the new `CacheInvalidationNotifierConfig`:

```python
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig

# Single-server mode (cache-only, no cross-server sync)
rbac_service = RBACService(session=db_session)

# Multi-server mode (with Azure Service Bus invalidation)
rbac_service = RBACService(
    session=db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string="Endpoint=sb://your-ns.servicebus.windows.net/;...",
        topic_name="rbac-cache-invalidation",
    ),
    cache_ttl=300,       # 5 min default
    cache_max_size=10000, # LRU eviction cap
)
```

**Removed methods** — The following Casbin-specific methods no longer exist:
- `add_policy()`, `add_policies()`, `remove_policy()`, `remove_policies()`
- `enforce_policy()`, `remove_filter_policy()`
- `reload_policies()`, `get_policy_count()`, `clear_all_policies()`

**Removed exports** — `AzureServiceBusWatcherSchema` is replaced by `CacheInvalidationNotifierConfig`.

**Removed files** — `watcher.py`, `policy.conf`, `gov_casbin_rule.py` are deleted.

**Unchanged** — All decorators (`rbac_require_permission`, `check_permissions`), auth middleware, JWT functions, models, schemas, and permission constants work exactly as before.

## Features

- **JWT-based Authentication**: Secure token-based authentication with customizable expiration
- **Password Security**: Secure password storage using bcrypt with passlib
- **Role-Based Access Control (RBAC)**: Flexible permission management with per-user caching
- **Real-time Cache Invalidation**: Azure Service Bus for targeted cross-server sync
- **User-Role Management**: Dynamic role assignment and revocation
- **Permission Enforcement**: Decorator-based permission checking with nested AND/OR logic
- **Middleware Integration**: Seamless FastAPI middleware integration
- **Comprehensive Error Handling**: Built-in exception handling for security scenarios
- **Dependency Injection Ready**: Compatible with dependency-injector
- **Permission Constants**: Predefined permission constants and enums

## Installation

```bash
pip install abs-auth-rbac-core
```

## Architecture Overview

```
┌─────────────────────────────────────────────────────┐
│                    Server A                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ RBACService              │   │
│  │ Cache        │    │  - check_permission()    │   │
│  │ (per-user)   │    │  - create_role()         │   │
│  └──────────────┘    │  - assign_permissions()  │   │
│         ^            └─────────┬────────────────┘   │
│         │ invalidate           │ publish             │
│         │                      v                     │
│  ┌──────┴───────────────────────────────────────┐   │
│  │ CacheInvalidationNotifier (Service Bus)      │   │
│  └──────────────────────────────────────────────┘   │
└─────────────────────┬───────────────────────────────┘
                      │ Azure Service Bus Topic
                      │ (targeted invalidation messages)
┌─────────────────────v───────────────────────────────┐
│                    Server B                          │
│  ┌──────────────┐    ┌──────────────────────────┐   │
│  │ Permission   │<───│ CacheInvalidationNotifier│   │
│  │ Cache        │    │ (receives & invalidates) │   │
│  │ (per-user)   │    └──────────────────────────┘   │
│  └──────────────┘                                    │
└─────────────────────────────────────────────────────┘
```

### How It Works

1. **Read path**: `check_permission()` looks up the user's cache entry. On hit, it's a sub-millisecond set lookup. On miss, it runs a single SQL query to load all role + direct permissions for that user, caches the result, then checks.

2. **Write path**: After any permission/role change (e.g., `update_role_permissions()`), the local cache is invalidated for affected users. If a notifier is configured, a targeted invalidation message is published via Azure Service Bus.

3. **Cross-server sync**: Other servers receive the invalidation message and delete only the affected users' cache entries. The next permission check for those users will reload from DB.

4. **Safety TTL**: Cache entries expire after a configurable TTL (default 5 minutes) as a fallback, even without explicit invalidation.

## Quick Start

### 1. Basic Setup

```python
from abs_auth_rbac_core.auth.jwt_functions import JWTFunctions
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
import os

# Initialize JWT functions
jwt_functions = JWTFunctions(
    secret_key=os.getenv("JWT_SECRET_KEY"),
    algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
    expire_minutes=int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
)

# Initialize RBAC service
rbac_service = RBACService(
    session=your_db_session,
    servicebus_config=CacheInvalidationNotifierConfig(
        connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
        topic_name="rbac-cache-invalidation",
    )
)
```

### 2. Authentication Setup

```python
from abs_auth_rbac_core.auth.middleware import auth_middleware
from fastapi import FastAPI, Depends

app = FastAPI()

# Create authentication middleware
auth_middleware = auth_middleware(
    db_session=your_db_session,
    jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
    jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
)

# Apply to specific routers
app.include_router(
    protected_router,
    dependencies=[Depends(auth_middleware)]
)

# Public routes (no middleware)
app.include_router(public_router)
```

**How it works:**
1. Validates JWT token from Authorization header
2. Extracts user UUID from token payload
3. Fetches user from database using UUID
4. Sets user object in `request.state.user`
5. Returns user object for route handlers

**Accessing the user in routes:**
```python
@router.get("/profile")
async def get_profile(request: Request):
    user = request.state.user
    return {"user_id": user.uuid, "email": user.email}
```

### 3. RBAC Operations

```python
# Create a role with permissions
role = rbac_service.create_role(
    name="admin",
    description="Administrator role with full access",
    permission_ids=["permission_uuid1", "permission_uuid2"]
)

# Assign roles to user
rbac_service.bulk_assign_roles_to_user(
    user_uuid="user_uuid",
    role_uuids=["role_uuid1", "role_uuid2"]
)

# Check user permissions (cache-first, sub-ms on hit)
has_permission = rbac_service.check_permission(
    user_uuid="user_uuid",
    resource="USER_MANAGEMENT",
    action="VIEW",
    module="USER_MANAGEMENT"
)

# Get user permissions and roles
user_permissions = rbac_service.get_user_permissions(user_uuid="user_uuid")
user_roles = rbac_service.get_user_roles(user_uuid="user_uuid")
```

## Core Components

### Authentication (`auth/`)
- **`jwt_functions.py`**: JWT token management and password hashing
- **`middleware.py`**: Authentication middleware for FastAPI
- **`auth_functions.py`**: Core authentication functions

### RBAC (`rbac/`)
- **`service.py`**: Main RBAC service with role and permission management
- **`decorator.py`**: Decorators for permission checking (`rbac_require_permission`, `check_permissions`)
- **`cache.py`**: Thread-safe per-user LRU permission cache with TTL
- **`notifier.py`**: Azure Service Bus cache invalidation notifier
- **`types.py`**: Type definitions for permission system

### Models (`models/`)
- **`user.py`**: User model
- **`roles.py`**: Role model
- **`permissions.py`**: Permission model
- **`user_role.py`**: User-Role association model
- **`role_permission.py`**: Role-Permission association model
- **`user_permission.py`**: User-Permission association model
- **`rbac_model.py`**: Base RBAC model
- **`base_model.py`**: Base model with common fields

### Schema (`schema/`)
- **`permission.py`**: Permission-related schemas

### Utilities (`util/`)
- **`permission_constants.py`**: Predefined permission constants and enums
- **`get_permissions_resources.py`**: Resource ID extraction from permissions

## Dependency Injection Setup

```python
from dependency_injector import containers, providers
from abs_auth_rbac_core.auth.middleware import auth_middleware
from abs_auth_rbac_core.rbac import RBACService, CacheInvalidationNotifierConfig
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

class Container(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(
        modules=[
            "src.api.auth_route",
            "src.api.endpoints.rbac.permission_route",
            "src.api.endpoints.rbac.role_route",
            "src.api.endpoints.rbac.users_route",
        ]
    )

    db_session = providers.Factory(your_db_session_factory)

    rbac_service = providers.Singleton(
        RBACService,
        session=db_session,
        servicebus_config=CacheInvalidationNotifierConfig(
            connection_string=os.getenv("AZURE_SERVICEBUS_CONNECTION_STRING"),
            topic_name="rbac-cache-invalidation",
        )
    )

    get_auth_middleware = providers.Factory(
        auth_middleware,
        db_session=db_session,
        jwt_secret_key=os.getenv("JWT_SECRET_KEY"),
        jwt_algorithm=os.getenv("JWT_ALGORITHM", "HS256")
    )
```

## Route Implementation with Permissions

```python
from fastapi import APIRouter, Depends, Request
from dependency_injector.wiring import Provide, inject
from abs_auth_rbac_core.rbac import rbac_require_permission, check_permissions
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource
)

router = APIRouter(prefix="/users")

# Simple permission check (all listed permissions required)
@router.get("/{user_id}", response_model=UserProfile)
@inject
@rbac_require_permission(
    f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"
)
async def get_user(
    user_id: int,
    request: Request,
    service: UserService = Depends(Provide[Container.user_service]),
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    return service.get_user_profile("id", user_id, rbac_service)

# Advanced permission check with AND/OR logic, placeholders, pre/post callbacks
@router.put("/{entity_id}")
@inject
@check_permissions(
    permissions=[
        ("MODULE", "resource", "edit"),
        ("MODULE", "resource", "admin"),
    ],
    operator="OR",
)
async def update_entity(
    entity_id: str,
    request: Request,
    rbac_service: RBACService = Depends(Provide[Container.rbac_service]),
):
    ...
```

## Permission System

### Permission Format
Permissions follow the format: `module:resource:action`

- **Module**: The system module (e.g., `USER_MANAGEMENT`, `EMAIL_PROCESS`)
- **Resource**: The specific resource within the module (e.g., `USER_MANAGEMENT`, `ROLE_MANAGEMENT`)
- **Action**: The action being performed (e.g., `VIEW`, `CREATE`, `EDIT`, `DELETE`)

### Using Permission Constants

```python
from abs_auth_rbac_core.util.permission_constants import (
    PermissionAction,
    PermissionModule,
    PermissionResource,
    PermissionConstants
)

# Using enums
permission_string = f"{PermissionModule.USER_MANAGEMENT.value}:{PermissionResource.USER_MANAGEMENT.value}:{PermissionAction.VIEW.value}"

# Using predefined constants
user_view_permission = PermissionConstants.RBAC_USER_MANAGEMENT_VIEW
permission_string = f"{user_view_permission.module}:{user_view_permission.resource}:{user_view_permission.action}"
```

## Configuration

### Environment Variables

```bash
# JWT Configuration
JWT_SECRET_KEY=your-secret-key-here
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=60
JWT_REFRESH_TOKEN_EXPIRE_MINUTES=1440

# Azure Service Bus (for cross-server cache invalidation)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://your-ns.servicebus.windows.net/;...
# Or use managed identity:
# AZURE_SERVICEBUS_NAMESPACE=your-ns.servicebus.windows.net

# Instance ID (auto-detected from HOSTNAME if not set)
RBAC_INSTANCE_ID=server-1

# Database Configuration
DATABASE_URI=postgresql://user:password@localhost/dbname
```

### Cache Configuration

```python
rbac_service = RBACService(
    session=db_session,
    cache_ttl=300,        # Cache entry TTL in seconds (default: 300 = 5 min)
    cache_max_size=10000, # Max cached users before LRU eviction (default: 10,000)
)
```

## Error Handling

```python
from abs_exception_core.exceptions import (
    UnauthorizedError,
    PermissionDeniedError,
    ValidationError,
    DuplicatedError,
    NotFoundError
)
```

## Health Checks

```python
@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "rbac_notifier": rbac_service.is_watcher_active(),
        "rbac_notifier_details": rbac_service.get_watcher_status(),
    }
```

## Troubleshooting

### Common Issues

| Issue | Solution |
|-------|----------|
| **Authentication Fails** | Check JWT secret key, token expiration, user existence |
| **Permission Denied** | Verify user roles, role-permission assignments, permission format |
| **Stale permissions after update** | Check Service Bus connection, or wait for cache TTL expiry |
| **Cache invalidation not working** | Verify `AZURE_SERVICEBUS_CONNECTION_STRING`, topic exists, subscription auto-created |
| **High memory usage** | Reduce `cache_max_size` or lower `cache_ttl` |

### Debug Mode

```python
import logging
logging.basicConfig(level=logging.DEBUG)

# Check notifier status
print(f"Notifier active: {rbac_service.is_watcher_active()}")
print(f"Status: {rbac_service.get_watcher_status()}")
```

## API Reference

### RBACService Methods

| Method | Description |
|--------|-------------|
| `check_permission(user_uuid, resource, action, module)` | Check role-based permission (cache-first) |
| `check_permission_by_user(user_uuid, resource, action, module)` | Check direct user permission (cache-first) |
| `check_permission_by_role(role_uuid, resource, action, module)` | Check role permission (direct SQL) |
| `create_role(name, description, permission_ids)` | Create a new role |
| `update_role_permissions(role_uuid, permissions, name, description)` | Update role permissions |
| `delete_role(role_uuid, exception_roles)` | Delete a role |
| `bulk_assign_roles_to_user(user_uuid, role_uuids)` | Assign multiple roles to user |
| `bulk_revoke_roles_from_user(user_uuid, role_uuids)` | Revoke roles from user |
| `bulk_attach_roles_to_user(user_uuid, role_uuids)` | Attach additional roles to user |
| `attach_permissions_to_user(user_uuid, permission_uuids)` | Attach direct permissions |
| `revoke_user_permissions(user_uuid, permission_uuids)` | Revoke direct permissions |
| `revoke_all_user_access(user_uuid)` | Revoke all roles and permissions |
| `get_user_permissions(user_uuid)` | Get role-based permissions |
| `get_user_only_permissions(user_uuid)` | Get direct user permissions |
| `get_user_roles(user_uuid)` | Get user roles |
| `list_roles()` | List all roles |
| `list_permissions()` | List all permissions |
| `is_watcher_active()` | Check notifier status |
| `get_watcher_status()` | Get notifier details |

### CacheInvalidationNotifierConfig

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `connection_string` | `str \| None` | `None` | Service Bus connection string |
| `fully_qualified_namespace` | `str \| None` | `None` | For managed identity auth |
| `topic_name` | `str` | `"rbac-cache-invalidation"` | Service Bus topic name |
| `subscription_name` | `str \| None` | Auto from `RBAC_INSTANCE_ID` | Subscription name |
| `max_wait_time_seconds` | `int` | `5` | Receiver poll interval |
| `message_ttl_seconds` | `int` | `300` | Message time-to-live |
| `auto_create_subscription` | `bool` | `True` | Auto-create subscription |

### JWT Functions

| Method | Description |
|--------|-------------|
| `create_access_token(data)` | Create JWT access token |
| `decode_jwt(token)` | Decode and validate JWT |
| `get_password_hash(password)` | Hash password with bcrypt |
| `verify_password(password, hashed)` | Verify password hash |

## Dependencies

### Core Dependencies
- **pyjwt** (>=2.10.1,<3.0.0): JWT token handling
- **fastapi[standard]** (>=0.115.2): Web framework
- **passlib** (>=1.7.4,<2.0.0): Password hashing
- **sqlalchemy** (>=2.0.40,<3.0.0): Database ORM
- **azure-servicebus** (>=7.14.3,<8.0.0): Cross-server cache invalidation
- **azure-identity** (>=1.25.2,<2.0.0): Azure authentication

### Internal Dependencies
- **abs-exception-core** (>=0.2.2,<0.3.0): Exception handling
- **abs-utils** (>=0.4.3,<0.5.0): Logger and utilities
- **abs-repository-core** (>=0.3.0,<0.4.0): Base repository
- **abs-nosql-repository-core** (>=0.11.8,<0.12.0): NoSQL repository
- **psycopg2-binary** (>=2.9.10,<3.0.0): PostgreSQL adapter

## License

This project is licensed under the MIT License - see the LICENSE file for details.

---

**Version**: 0.6.0
**Last Updated**: 2026
**Python Version**: >=3.11,<4.0

