# ML Orchestrator — Architecture, Isolation Plan & Implementation Tasks

> **Flow**: `UI → Kong → BFF → ML Service → ML Orchestrator → ML Container`
> **Scope**: Introduce a clean 4-layer ML stack. The ML Service owns all business logic. The ML Orchestrator routes pure inference requests to stateless ML containers. Everything below Layer 3 is fully independent and reusable in any project.
> **Pattern**: Mirrors the AI orchestrator design (BFF → domain service → orchestrator → stateless workers).

### Target Folder Structure

```
/ml_orchestrator/               ← routing, WS proxy, worker registry, circuit breaker
/ml_services/                   ← stateless inference containers (pure ML, no business logic)
    /face_detection/            ← InsightFace ArcFace, FaceNet MTCNN, MediaPipe FaceMesh
    /audio_analysis/            ← Silero VAD, pyannote diarization, speaker embedding
    /video_monitoring/          ← YOLOv8n, ORB feature matcher, MediaPipe
    /doc_verification/          ← EasyOCR, InsightFace (ID document face extraction)
    /lip_sync/                  ← SyncNet (Wav2Lip discriminator)
    /response_analysis/         ← plagiarism engine, LLM response moderator
```

Business-logic services (e.g. `ai_interviewer` refactored) live **outside** this folder — they own DB/session state and call into `ml_orchestrator/` for all ML work.

---

## Table of Contents

1. [Problem Statement](#1-problem-statement)
2. [Current State](#2-current-state)
3. [Target Architecture](#3-target-architecture)
4. [The Four Layers Explained](#4-the-four-layers-explained)
5. [ML Services Catalogue](#5-ml-services-catalogue)
6. [ML Orchestrator Design](#6-ml-orchestrator-design)
7. [Wire Protocol](#7-wire-protocol)
8. [Fixes Required in ai_interviewer](#8-fixes-required-in-ai_interviewer)
9. [Docker Compose Topology](#9-docker-compose-topology)
10. [Kong Routing](#10-kong-routing)
11. [Horizontal Scaling Strategy](#11-horizontal-scaling-strategy)
12. [Implementation Tasks](#12-implementation-tasks)
13. [Before vs After Summary](#13-before-vs-after-summary)

---

## 1. Problem Statement

`ai_interviewer` currently does **two unrelated jobs**:

| Job | Should Be In |
|-----|-------------|
| Interview session management (WebSocket, OpenAI Realtime, question flow, scoring) | `ai_interviewer` (as ML Service, Layer 3) ✅ |
| ML inference (face verify, voice enroll, object detect, OCR, lip sync, diarization, plagiarism) | Stateless `ml_services/` containers via `ml_orchestrator` ❌ |

Three concrete problems today:

1. **Memory bloat** — Every `ai_interviewer` replica loads 8 ML models simultaneously (~3.3 GB RAM). Scaling the interview service means scaling all that dead weight.
2. **No independent scaling** — Face verification spikes at interview start; YOLOv8 runs the whole time; plagiarism only runs at the end. All models are pinned to one replica count.
3. **Three broken sub-services** — `proctoring-service`, `recording-service`, `syncnet-service` were all started but never wired up. Code is duplicated between `ai_interviewer/proctoring/` and `ai_interviewer/proctoring-service/proctoring/`.

---

## 2. Current State

### 2.1 Service Map

```
ai_interviewer (port 8008)           ← RUNNING — loads all ML in-process
├── proctoring/                      ← EMBEDDED in main process
│   ├── model_registry.py            ← Singleton: 8 ML models per replica
│   ├── monitoring/
│   │   ├── face_analysis_pipeline.py    (MediaPipe FaceMesh, gaze, blink, head pose)
│   │   ├── audio_pipeline.py            (Silero VAD, pyannote diarization)
│   │   ├── video_pipeline.py            (YOLOv8 object/phone/person detection)
│   │   ├── behavior_engine.py           (pure logic aggregator — no models)
│   │   └── room_scan.py                 (YOLOv8 + ORB feature matching)
│   ├── verification/
│   │   ├── face_verification.py         (InsightFace ArcFace buffalo_l)
│   │   ├── id_verification.py           (InsightFace + EasyOCR)
│   │   └── voice_enrollment.py          (pyannote speaker embeddings)
│   └── analysis/
│       ├── plagiarism_engine.py         (stylometry + LLM hybrid)
│       └── response_moderator.py        (LLM call — already external)
│
├── proctoring-service/              ← DISABLED in docker-compose (causes 502)
├── recording-service/               ← NOT in docker-compose (orphaned)
└── syncnet-service/                 ← NOT in docker-compose (orphaned)
```

### 2.2 ML Models Currently Loaded Per Replica

| Model | Library | RAM (CPU) | Used For |
|-------|---------|-----------|---------|
| YOLOv8n | ultralytics | ~200 MB | Person count, phone/device detection |
| InsightFace ArcFace buffalo_l | insightface | ~600 MB | Face recognition (verify-face, verify-id) |
| FaceNet MTCNN + InceptionResnetV1 | facenet_pytorch | ~400 MB | Face detection + embedding (fallback) |
| Silero VAD | torch.hub | ~40 MB | Voice activity detection |
| pyannote diarization 3.1 | pyannote.audio | ~1.2 GB | Speaker diarization |
| EasyOCR (en + hi) | easyocr | ~600 MB | ID document text extraction |
| MediaPipe FaceMesh | mediapipe | ~30 MB | 478-landmark mesh, gaze, blink |
| SyncNet (Wav2Lip) | torch | ~200 MB | Lip sync detection |
| **Total** | | **~3.3 GB** | **Every replica pays this cost** |

---

## 3. Target Architecture

### 3.1 Full Request Flow

```
 UI (Browser)
   │
   │  WSS / HTTPS
   ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  LAYER 1 — KONG GATEWAY (:8082)                                         │
│  TLS termination, rate-limit, auth header injection, routing            │
│  /ai-interviewer → :8008   (ml-orchestrator is internal only)           │
└─────────────────────────────────────┬───────────────────────────────────┘
                                      │  HTTPS / WSS
                                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  LAYER 2 — BFF  (Go, :8080)                                             │
│  Zitadel authentication, tenant_id resolution, quota token issuance     │
│  Routes /bff/interview/* → ML Service   /bff/ai/* → AI Service          │
└─────────────────────────────────────┬───────────────────────────────────┘
                                      │  HTTPS / WSS  (internal network)
                                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  LAYER 3 — ML SERVICE  (business logic, :8008)                          │
│  ai_interviewer (refactored) — or any future domain service             │
│                                                                         │
│  OWNS:                            DOES NOT OWN:                         │
│  ✅ Interview session state        ❌ ML model weights                   │
│  ✅ OpenAI Realtime WebSocket      ❌ ML inference logic                 │
│  ✅ Question flow + scoring        ❌ Model loading/unloading            │
│  ✅ Violation rules + thresholds                                         │
│  ✅ DB writes (results, records)                                         │
│  ✅ Decides when to start/stop ML streams                                │
│                                                                         │
│  → calls ml-orchestrator for ALL ML work via MLOrchestratorClient       │
└─────────────────────────────────────┬───────────────────────────────────┘
                                      │  HTTP / WS  (ai-network)
                                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  LAYER 4a — ML ORCHESTRATOR  (:8015)                                    │
│  /ml_orchestrator/                                                      │
│                                                                         │
│  OWNS:                            DOES NOT OWN:                         │
│  ✅ Task routing to containers     ❌ Business logic                     │
│  ✅ WS bidirectional proxy         ❌ Database access                    │
│  ✅ Worker registry + affinity     ❌ ML models                          │
│  ✅ Circuit breaker per worker                                           │
│  ✅ Prometheus metrics                                                   │
└────────┬────────────────────┬────────────────────┬────────────────────┬─┘
         │  HTTP/WS           │  HTTP/WS           │  HTTP/WS           │  HTTP/WS
         ▼                    ▼                    ▼                    ▼
┌────────────────┐  ┌────────────────┐  ┌───────────────┐  ┌───────────────┐
│  LAYER 4b      │  │  LAYER 4b      │  │  LAYER 4b     │  │  LAYER 4b     │
│  ML CONTAINER  │  │  ML CONTAINER  │  │  ML CONTAINER │  │  ML CONTAINER │
│ /ml_services/  │  │ /ml_services/  │  │ /ml_services/ │  │ /ml_services/ │
│                │  │                │  │               │  │               │
│face_detection  │  │audio_analysis  │  │video_monitoring│  │doc_verification│
│    :9001       │  │    :9002       │  │    :9003      │  │    :9004      │
│                │  │                │  │               │  │               │
│  STATELESS     │  │  STATELESS     │  │  STATELESS    │  │  STATELESS    │
│  pure infer.   │  │  pure infer.   │  │  pure infer.  │  │  pure infer.  │
└────────────────┘  └────────────────┘  └───────────────┘  └───────────────┘

┌────────────────┐  ┌────────────────┐
│  LAYER 4b      │  │  LAYER 4b      │
│  ML CONTAINER  │  │  ML CONTAINER  │
│ /ml_services/  │  │ /ml_services/  │
│                │  │                │
│   lip_sync     │  │response_analysis│
│    :9005       │  │    :9006       │
│                │  │                │
│  STATELESS     │  │  STATELESS     │
│  pure infer.   │  │  pure infer.   │
└────────────────┘  └────────────────┘
```

### 3.2 Layer Responsibility Matrix

| Layer | Service | Owns | Does NOT Own |
|-------|---------|------|--------------|
| 1 | Kong Gateway | TLS, auth headers, rate-limit, routing | Business logic |
| 2 | BFF (Go) | Zitadel auth, tenant resolution, quota token | ML models, session state |
| 3 | ML Service (`ai_interviewer`) | Business logic, session state, DB writes, violation rules | ML inference, model weights |
| 4a | ML Orchestrator (`ml_orchestrator/`) | Request routing, WS proxy, worker health, affinity | Business logic, DB, models |
| 4b | ML Containers (`ml_services/*/`) | ML inference only — pure input → output | State, DB, business rules |

### 3.3 Why This Architecture Is Reusable

Layers 4a and 4b are **completely domain-agnostic**:
- Any future service in any project (background screening, recruitment, KYC) calls `ml-orchestrator` to get face verification, audio analysis, or any other ML capability
- ML containers are infrastructure, not application code
- GPU upgrades, model version changes, horizontal scale: all happen at Layer 4b without touching Layer 3
- A Layer 3 service for a different domain is just a new Python/Go service + a `depends_on: ml-orchestrator` entry

---

## 4. The Four Layers Explained

### Layer 3 — ML Service (Business Logic)

What `ai_interviewer` becomes after refactoring. It:
- Receives the WebSocket connection from the frontend (via BFF → Kong)
- Manages the interview lifecycle: start → verification → questions → scoring → finalization
- Decides **when** to trigger ML work (start face streaming on interview start, run plagiarism on end)
- Calls `ml-orchestrator` via `MLOrchestratorClient` for every ML task — **never loads models directly**
- Writes results back to the database
- Owns all violation thresholds, scoring weights, business rules

Any future domain creates its own Layer 3 service and calls the same Layer 4.

### Layer 4a — ML Orchestrator (`ml_orchestrator/`)

Single internal entry point for all ML inference:

```
POST /ml/infer                      ← synchronous HTTP (face verify, id verify, plagiarism)
WS   /ml/stream/{session_id}/{type} ← bidirectional WebSocket proxy (live face/audio/video)
GET  /health                        ← worker health summary
```

Internal pipeline per request:
1. Validate `task_type` + payload schema
2. Look up `session_id → assigned_worker` in Redis (create if new)
3. Check circuit breaker state for assigned worker
4. Forward to worker via HTTP or WS proxy
5. Return typed `MLInferResponse` to caller

### Layer 4b — ML Containers (`ml_services/`)

Six stateless FastAPI containers, each owning exactly one ML domain:

| Container | Folder | Port | Models Loaded | Transport |
|-----------|--------|------|---------------|-----------|
| Face Detection | `ml_services/face_detection/` | 9001 | InsightFace ArcFace buffalo_l, FaceNet MTCNN, MediaPipe FaceMesh | HTTP + WS |
| Audio Analysis | `ml_services/audio_analysis/` | 9002 | Silero VAD, pyannote diarization 3.1, speaker embedder | HTTP + WS |
| Video Monitoring | `ml_services/video_monitoring/` | 9003 | YOLOv8n, ORB matcher (OpenCV) | HTTP + WS |
| Document Verification | `ml_services/doc_verification/` | 9004 | EasyOCR (en+hi), InsightFace (ID face extract) | HTTP only |
| Lip Sync | `ml_services/lip_sync/` | 9005 | SyncNet (Wav2Lip SyncNet_color) | WS only |
| Response Analysis | `ml_services/response_analysis/` | 9006 | Stylometry (pure Python), LLM via proxy | HTTP only |

**Every container**: no DB access, no session state, context passed in payload, scales independently.

---

## 5. ML Services Catalogue

### 5.1 face_detection (port 9001)

**Folder**: `ml_services/face_detection/`
**Source to migrate**: `ai_interviewer/proctoring/verification/face_verification.py` + `monitoring/face_analysis_pipeline.py` + `monitoring/face_analysis_helpers.py`
**Models**: InsightFace ArcFace buffalo_l, FaceNet MTCNN + InceptionResnetV1, MediaPipe FaceMesh

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `POST /face/verify` | HTTP | `{image_b64, reference_embedding_b64}` | `{match: bool, score: float, confidence: str}` |
| `POST /face/embed` | HTTP | `{image_b64}` | `{embedding_b64, face_count: int}` |
| `WS /face/stream/{session_id}` | WebSocket | JPEG frames (binary) | JSON `{gaze, blink, head_pose, violations[]}` |

**Stateless**: Embeddings passed in per-request. No DB. No cross-session state.

---

### 5.2 audio_analysis (port 9002)

**Folder**: `ml_services/audio_analysis/`
**Source to migrate**: `ai_interviewer/proctoring/verification/voice_enrollment.py` + `monitoring/audio_pipeline.py`
**Models**: Silero VAD, pyannote diarization 3.1, speaker embedding extractor

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `POST /audio/enroll` | HTTP | `{audio_b64, sample_rate}` | `{embedding_b64, quality_score}` |
| `POST /audio/verify` | HTTP | `{audio_b64, reference_embedding_b64}` | `{match: bool, score: float}` |
| `POST /audio/diarize` | HTTP | `{audio_b64}` | `{segments: [{speaker, start_s, end_s}]}` |
| `WS /audio/stream/{session_id}` | WebSocket | PCM chunks (binary) | JSON `{has_speech, rms, speaker_change}` |

**Stateless**: Reference embeddings passed per-request. No session storage in service.

---

### 5.3 video_monitoring (port 9003)

**Folder**: `ml_services/video_monitoring/`
**Source to migrate**: `ai_interviewer/proctoring/monitoring/video_pipeline.py` + `monitoring/room_scan.py`
**Models**: YOLOv8n, ORB feature matcher (OpenCV — no weights file)

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `POST /video/room-scan/frame` | HTTP | `{frame_b64, scan_id}` | `{objects[], person_count, reference_stored}` |
| `POST /video/room-scan/complete` | HTTP | `{scan_id}` | `{result: RoomScanResult}` |
| `WS /video/stream/{session_id}` | WebSocket | JPEG frames (binary) | JSON `{objects[], phone_detected, multi_person, violations[]}` |

**Stateless**: Room scan reference frame held in Redis by orchestrator, not by this service.

---

### 5.4 doc_verification (port 9004)

**Folder**: `ml_services/doc_verification/`
**Source to migrate**: `ai_interviewer/proctoring/verification/id_verification.py`
**Models**: EasyOCR (en + hi), InsightFace ArcFace (face extraction from ID photo)

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `POST /doc/id-verify` | HTTP | `{id_image_b64, face_image_b64}` | `{face_match: bool, ocr_fields: {name, dob, id_no}, confidence}` |
| `POST /doc/ocr` | HTTP | `{image_b64, language}` | `{text: str, blocks[]}` |

**Stateless**: Pure inference. No storage.

---

### 5.5 lip_sync (port 9005)

**Folder**: `ml_services/lip_sync/`
**Source to migrate**: `ai_interviewer/syncnet-service/` (already complete — copy as-is, add health endpoint)
**Models**: SyncNet (Wav2Lip SyncNet_color discriminator)

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `WS /lipsync/stream/{session_id}` | WebSocket | JSON `{type: video_frame\|audio_chunk, data_b64, ts}` | JSON `{type: sync_score\|lip_sync_violation, score, ts}` |

**Stateless**: Sliding window state per-connection only, cleared on disconnect.

---

### 5.6 response_analysis (port 9006)

**Folder**: `ml_services/response_analysis/`
**Source to migrate**: `ai_interviewer/proctoring/analysis/plagiarism_engine.py` + `analysis/response_moderator.py`
**Models**: None (LLM calls through LiteLLM proxy; stylometry is pure Python)

| Endpoint | Transport | Input | Output |
|----------|-----------|-------|--------|
| `POST /analysis/plagiarism` | HTTP | `{transcript: str, proxyllm_url, proxyllm_key}` | `{risk_score, stylometry, llm_verdict, reasoning}` |
| `POST /analysis/moderate` | HTTP | `{text: str, partial: bool}` | `{category, action, confidence}` |

**Stateless**: LLM credentials passed per-request. No stored state.

---

## 6. ML Orchestrator Design

`ml_orchestrator/` is a standalone FastAPI service — the single internal entry point for all ML inference.

### 6.1 Folder Structure

```
ml_orchestrator/
├── main.py             ← FastAPI app, lifespan, endpoints
├── schemas.py          ← MLTaskType, MLInferRequest, MLInferResponse
├── router.py           ← MLRouter: dispatch_http() + proxy_websocket()
├── worker_registry.py  ← WORKER_POOLS, TASK_TO_SERVICE, session affinity
├── circuit_breaker.py  ← per-worker health tracking
├── Dockerfile
└── requirements.txt
```

### 6.2 Schemas

```python
# ml_orchestrator/schemas.py

class MLTaskType(str, Enum):
    FACE_VERIFY      = "face_verify"
    FACE_EMBED       = "face_embed"
    FACE_STREAM      = "face_stream"         # → WS proxy to face_detection
    AUDIO_ENROLL     = "audio_enroll"
    AUDIO_VERIFY     = "audio_verify"
    AUDIO_DIARIZE    = "audio_diarize"
    AUDIO_STREAM     = "audio_stream"        # → WS proxy to audio_analysis
    VIDEO_ROOM_SCAN  = "video_room_scan"
    VIDEO_STREAM     = "video_stream"        # → WS proxy to video_monitoring
    DOC_ID_VERIFY    = "doc_id_verify"
    LIPSYNC_STREAM   = "lipsync_stream"      # → WS proxy to lip_sync
    PLAGIARISM       = "plagiarism_analyze"
    MODERATION       = "response_moderate"


class MLInferRequest(BaseModel):
    session_id: str              # interview session (for worker affinity)
    task_type: MLTaskType
    payload: Dict[str, Any]      # task-specific — typed per task_type
    timeout_s: float = 30.0


class MLInferResponse(BaseModel):
    session_id: str
    task_type: MLTaskType
    success: bool
    result: Dict[str, Any]
    latency_ms: int
    worker_id: str               # which container handled this
    error: Optional[str] = None
```

### 6.3 Worker Registry

```python
# ml_orchestrator/worker_registry.py

WORKER_POOLS = {
    "face":     ["http://ml-face-detection-1:9001", "http://ml-face-detection-2:9001"],
    "audio":    ["http://ml-audio-analysis-1:9002"],
    "video":    ["http://ml-video-monitoring-1:9003"],
    "doc":      ["http://ml-doc-verification-1:9004"],
    "lipsync":  ["http://ml-lip-sync-1:9005"],
    "analysis": ["http://ml-response-analysis-1:9006"],
}

TASK_TO_SERVICE = {
    MLTaskType.FACE_VERIFY:     "face",
    MLTaskType.FACE_EMBED:      "face",
    MLTaskType.FACE_STREAM:     "face",
    MLTaskType.AUDIO_ENROLL:    "audio",
    MLTaskType.AUDIO_VERIFY:    "audio",
    MLTaskType.AUDIO_DIARIZE:   "audio",
    MLTaskType.AUDIO_STREAM:    "audio",
    MLTaskType.VIDEO_ROOM_SCAN: "video",
    MLTaskType.VIDEO_STREAM:    "video",
    MLTaskType.DOC_ID_VERIFY:   "doc",
    MLTaskType.LIPSYNC_STREAM:  "lipsync",
    MLTaskType.PLAGIARISM:      "analysis",
    MLTaskType.MODERATION:      "analysis",
}
```

### 6.4 Session Affinity (Redis)

Streaming tasks require the same worker for the session lifetime (WS connection is long-lived; worker holds in-memory frame buffers):

```
Redis key:  ml:session:{session_id}:face    → "http://ml-face-detection-2:9001"
            ml:session:{session_id}:audio   → "http://ml-audio-analysis-1:9002"
            ml:session:{session_id}:video   → "http://ml-video-monitoring-1:9003"
TTL:        6 hours (max interview duration)
```

One-shot HTTP tasks (face verify, id verify, plagiarism) use round-robin — no affinity needed.

### 6.5 FastAPI App Skeleton

```python
# ml_orchestrator/main.py

from fastapi import FastAPI, WebSocket
from schemas import MLInferRequest, MLInferResponse
from router import MLRouter

app = FastAPI(title="ML Orchestrator", version="1.0.0")
ml_router = MLRouter()

@app.post("/ml/infer", response_model=MLInferResponse)
async def ml_infer(req: MLInferRequest):
    return await ml_router.dispatch_http(req)

@app.websocket("/ml/stream/{session_id}/{stream_type}")
async def ml_stream(websocket: WebSocket, session_id: str, stream_type: str):
    # stream_type: face | audio | video | lipsync
    await ml_router.proxy_websocket(websocket, session_id, stream_type)

@app.get("/health")
async def health():
    return await ml_router.health_summary()
```

The WebSocket proxy in `router.py` must be bidirectional: `asyncio.gather` on both `client→worker` and `worker→client` coroutines simultaneously.

---

## 7. Wire Protocol

### 7.1 ML Service → ml-orchestrator (HTTP)

```
POST http://ml-orchestrator:8015/ml/infer
Content-Type: application/json

{
  "session_id": "sched_abc123",
  "task_type": "face_verify",
  "payload": {
    "image_b64": "...",
    "reference_embedding_b64": "..."
  }
}

── Response 200 ────────────────────────────────────────
{
  "session_id": "sched_abc123",
  "task_type": "face_verify",
  "success": true,
  "result": { "match": true, "score": 0.87, "confidence": "high" },
  "latency_ms": 142,
  "worker_id": "ml-face-detection-1"
}
```

### 7.2 ML Service → ml-orchestrator → ml-container (WebSocket stream)

```
ML Service connects:
  WS ws://ml-orchestrator:8015/ml/stream/sched_abc123/face

ml-orchestrator:
  1. Looks up or assigns face_detection worker for sched_abc123 → Redis
  2. Opens WS connection to http://ml-face-detection-1:9001/face/stream/sched_abc123
  3. asyncio.gather → bidirectional proxy:
       ML Service → orchestrator → face_detection  (JPEG binary frames)
       face_detection → orchestrator → ML Service  (JSON violation events)
```

### 7.3 Full End-to-End Path

```
Browser ──WSS──► Kong (:8082) ──WSS──► BFF (:8080) ──WSS──► ML Service (:8008)
                                                                    │
                                              HTTP/WS (ai-network) │
                                                                    ▼
                                                        ml-orchestrator (:8015)
                                                                    │
                                                    ┌───────────────┼───────────────┐
                                                    ▼               ▼               ▼
                                             face_detection  audio_analysis  video_monitoring
                                                (:9001)          (:9002)          (:9003)
```

The browser and BFF never know about `ml-orchestrator`. Only the ML Service calls it directly.

### 7.4 MLOrchestratorClient (shared library)

```python
# shared/ml_orchestrator_client.py

class MLOrchestratorClient:
    def __init__(self):
        self._base_url = os.environ["ML_ORCHESTRATOR_URL"]
        self._http = httpx.AsyncClient(timeout=60.0)

    async def infer(self, session_id: str, task_type: str, payload: dict) -> dict:
        resp = await self._http.post(
            f"{self._base_url}/ml/infer",
            json={"session_id": session_id, "task_type": task_type, "payload": payload}
        )
        resp.raise_for_status()
        data = resp.json()
        if not data["success"]:
            raise RuntimeError(data.get("error", "ML inference failed"))
        return data["result"]

    def stream_url(self, session_id: str, stream_type: str) -> str:
        """Returns WS URL for a streaming session. stream_type: face|audio|video|lipsync"""
        base = self._base_url.replace("http://", "ws://").replace("https://", "wss://")
        return f"{base}/ml/stream/{session_id}/{stream_type}"

    async def aclose(self):
        await self._http.aclose()
```

---

## 8. Fixes Required in ai_interviewer

### 8.1 Remove Embedded ML Model Loading

**File**: `ai_interviewer/proctoring/model_registry.py`
**Fix**: Delete entirely. `ai_interviewer` must not load any ML model. `ModelRegistry` singleton must not exist in the main process.

### 8.2 Replace Direct Proctoring Imports with MLOrchestratorClient Calls

Every `from proctoring.xxx import YYY` in `ai_interviewer/` becomes a client call:

| Current (direct import) | New (client call) |
|------------------------|-------------------|
| `FaceVerificationService().verify_face(...)` | `await ml_client.infer(sid, "face_verify", payload)` |
| `VoiceEnrollmentService().enroll_voice(...)` | `await ml_client.infer(sid, "audio_enroll", payload)` |
| `IDVerificationService().verify_document(...)` | `await ml_client.infer(sid, "doc_id_verify", payload)` |
| `RoomScanService().start(...)` | `await ml_client.infer(sid, "video_room_scan", payload)` |
| `ProctoringEngine(...)` startup | Connect WS streams to `ml_client.stream_url(sid, "face|audio|video")` |
| `PlagiarismEngine(...).analyze(...)` | `await ml_client.infer(sid, "plagiarism_analyze", payload)` |

**Affected files**:
- `ai_interviewer/routers/proctoring.py` — 6 service instantiations → client calls
- `ai_interviewer/routers/finalization.py` — broken HTTP call + PlagiarismEngine → client calls
- `ai_interviewer/manager_monitoring_mixin.py` — ProctoringEngine startup → WS stream connect
- `ai_interviewer/routers/websocket.py` — frame forwarding → forward to ml-orchestrator WS
- `ai_interviewer/routers/proctoring_helpers.py` — helper instantiations → client calls

### 8.3 Fix the Broken Finalization Call

**File**: `ai_interviewer/routers/finalization.py` (~line 301)
**Current**: `httpx.post(PROCTORING_SERVICE_URL + "/proctoring/stop-and-save/...")` → 502 every time
**Fix**:
- Proctoring results (behavioral, violations) are written to Redis by ML containers during the live interview via WS stream events; finalization reads them from Redis/DB
- Plagiarism: `await ml_client.infer(session_id, "plagiarism_analyze", {"transcript": transcript})`
- Remove `PROCTORING_SERVICE_URL` entirely from config and docker-compose

### 8.4 Add ML_ORCHESTRATOR_URL to Config

**File**: `ai_interviewer/config.py`:
```python
# Add:
ML_ORCHESTRATOR_URL: str = os.getenv("ML_ORCHESTRATOR_URL", "http://ml-orchestrator:8015")

# Remove:
# PROCTORING_SERVICE_URL  ← no longer needed
```

### 8.5 Clean Up Dead Code

Delete after migration is complete:
- `ai_interviewer/proctoring/` — superseded by `ml_services/face_detection/`, `audio_analysis/`, `video_monitoring/`
- `ai_interviewer/proctoring-service/` — incomplete duplicate, superseded
- `ai_interviewer/recording-service/` — orphaned, never deployed, superseded
- `ai_interviewer/syncnet-service/` — migrated to `ml_services/lip_sync/`

### 8.6 Fix Kong Dead Route

**File**: `kong/ai/kong.yml`
- Remove `proctoring-service` service + route block (currently causes 502 on any request)
- `ml-orchestrator` is internal only — no Kong exposure needed

---

## 9. Docker Compose Topology

```yaml
# Additions to docker-compose.ai.yml

  # ──── ML Orchestrator ──────────────────────────────────────────
  ml-orchestrator:
    build:
      context: ./ml_orchestrator
      dockerfile: Dockerfile
    container_name: ml-orchestrator
    env_file: .env.ai
    environment:
      PORT: 8015
      REDIS_URL: redis://redis:6379
      ML_FACE_WORKERS: "http://ml-face-detection:9001"
      ML_AUDIO_WORKERS: "http://ml-audio-analysis:9002"
      ML_VIDEO_WORKERS: "http://ml-video-monitoring:9003"
      ML_DOC_WORKERS: "http://ml-doc-verification:9004"
      ML_LIPSYNC_WORKERS: "http://ml-lip-sync:9005"
      ML_ANALYSIS_WORKERS: "http://ml-response-analysis:9006"
    expose:
      - "8015"
    depends_on:
      redis:
        condition: service_healthy
      ml-face-detection:
        condition: service_healthy
      ml-audio-analysis:
        condition: service_healthy
      ml-video-monitoring:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8015/health')"]
      interval: 15s
      timeout: 5s
      retries: 3
      start_period: 30s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── face_detection (:9001) ───────────────────────────────────
  ml-face-detection:
    build:
      context: ./ml_services/face_detection
      dockerfile: Dockerfile
    container_name: ml-face-detection
    env_file: .env.ai
    environment:
      PORT: 9001
      PROCTORING_ML_DEVICE: "cpu"     # set "cuda" for GPU node
      PROCTORING_ML_WORKERS: "2"
      PROCTORING_FACE_WORKERS: "1"
    expose:
      - "9001"
    volumes:
      - ml_models_face:/root/.insightface:/root/.cache/torch  # cache model weights
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9001/health')"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s    # model loading 10-30s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── audio_analysis (:9002) ───────────────────────────────────
  ml-audio-analysis:
    build:
      context: ./ml_services/audio_analysis
      dockerfile: Dockerfile
    container_name: ml-audio-analysis
    env_file: .env.ai
    environment:
      PORT: 9002
      PROCTORING_ML_WORKERS: "2"
      HUGGINGFACE_TOKEN: "${HUGGINGFACE_TOKEN}"
    expose:
      - "9002"
    volumes:
      - ml_models_audio:/root/.cache
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9002/health')"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 90s    # pyannote is slow to load
    networks:
      - ai-network
    restart: unless-stopped

  # ──── video_monitoring (:9003) ─────────────────────────────────
  ml-video-monitoring:
    build:
      context: ./ml_services/video_monitoring
      dockerfile: Dockerfile
    container_name: ml-video-monitoring
    env_file: .env.ai
    environment:
      PORT: 9003
      PROCTORING_ML_WORKERS: "4"
      YOLO_MODEL: "yolov8n.pt"
    expose:
      - "9003"
    volumes:
      - ml_models_video:/root/.cache/ultralytics
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9003/health')"]
      interval: 20s
      timeout: 5s
      retries: 3
      start_period: 45s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── doc_verification (:9004) ─────────────────────────────────
  ml-doc-verification:
    build:
      context: ./ml_services/doc_verification
      dockerfile: Dockerfile
    container_name: ml-doc-verification
    env_file: .env.ai
    environment:
      PORT: 9004
      OCR_LANGUAGES: "en,hi"
      PROCTORING_ML_WORKERS: "2"
    expose:
      - "9004"
    volumes:
      - ml_models_doc:/root/.EasyOCR
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9004/health')"]
      interval: 20s
      timeout: 5s
      retries: 3
      start_period: 60s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── lip_sync (:9005) ─────────────────────────────────────────
  ml-lip-sync:
    build:
      context: ./ml_services/lip_sync
      dockerfile: Dockerfile
    container_name: ml-lip-sync
    env_file: .env.ai
    environment:
      PORT: 9005
    expose:
      - "9005"
    volumes:
      - ml_models_lipsync:/app/weights
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9005/health')"]
      interval: 20s
      timeout: 5s
      retries: 3
      start_period: 45s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── response_analysis (:9006) ────────────────────────────────
  ml-response-analysis:
    build:
      context: ./ml_services/response_analysis
      dockerfile: Dockerfile
    container_name: ml-response-analysis
    env_file: .env.ai
    environment:
      PORT: 9006
      PROXYLLM_URL: "${PROXYLLM_URL}"
      PROXYLLM_KEY: "${PROXYLLM_KEY}"
      PROXYLLM_MODEL: "${PROXYLLM_MODEL}"
    expose:
      - "9006"
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9006/health')"]
      interval: 15s
      timeout: 5s
      retries: 3
      start_period: 20s
    networks:
      - ai-network
    restart: unless-stopped

  # ──── Update ai-interviewer ─────────────────────────────────────
  # Changes to existing ai-interviewer service:
  #
  # environment — ADD:
  #   ML_ORCHESTRATOR_URL: "http://ml-orchestrator:8015"
  # environment — REMOVE:
  #   PROCTORING_SERVICE_URL   (dead, caused 502 on every finalization)
  #
  # depends_on — ADD:
  #   ml-orchestrator:
  #     condition: service_healthy

volumes:
  # (existing volumes stay)
  ml_models_face:
  ml_models_audio:
  ml_models_video:
  ml_models_doc:
  ml_models_lipsync:
```

---

## 10. Kong Routing

Changes to `kong/ai/kong.yml`:

```yaml
# REMOVE — dead route, points to disabled container, causes 502:
# - name: proctoring-service
#   url: http://proctoring-service:8009
#   routes: ...

# ml-orchestrator is INTERNAL ONLY — ML Service calls it service-to-service.
# No Kong route needed for ml-orchestrator itself.
# If in future any frontend proctoring endpoint needs direct ML access,
# route it through /ai-interviewer paths instead.
```

---

## 11. Horizontal Scaling Strategy

### 11.1 Scale Triggers

| Container | Scale Metric | Min | Max | Rationale |
|-----------|-------------|-----|-----|-----------|
| `ml-face-detection` | 1 replica per 10 concurrent WS streaming sessions | 1 | 10 | Face mesh runs every frame |
| `ml-audio-analysis` | 1 replica per 20 concurrent WS streaming sessions | 1 | 5 | Audio chunks are smaller |
| `ml-video-monitoring` | 1 replica per 15 concurrent WS streaming sessions | 1 | 8 | YOLO per-frame is CPU-heavy |
| `ml-doc-verification` | HTTP queue depth > 10 | 1 | 3 | Only at interview start |
| `ml-lip-sync` | 1 replica per 8 concurrent WS sessions | 1 | 6 | SyncNet is heavy |
| `ml-response-analysis` | HTTP queue depth > 5 | 1 | 4 | Only at interview end |
| `ml-orchestrator` | CPU > 60% | 1 | 3 | Proxy overhead is low |

### 11.2 Session Affinity on Scale-Out

When `ml-face-detection` scales 1 → 2:
- Existing sessions stay on their assigned worker (Redis affinity mapping persists)
- New sessions are assigned to the least-loaded replica
- On scale-down: drain existing sessions (wait for WS disconnect) before terminating

### 11.3 GPU Path

Each ML container accepts `PROCTORING_ML_DEVICE=cuda` env var:
- ArcFace inference: ~10× faster on GPU
- YOLOv8 inference: ~15× faster on GPU
- One GPU-enabled replica can serve ~5× more sessions

No code changes needed — just change the env var and add GPU resource request in Kubernetes.

### 11.4 Model Weight Caching

Named volumes (`ml_models_face`, `ml_models_audio`, etc.) prevent re-downloading weights on restart. New replicas mount the same volume — first replica downloads once, subsequent replicas reuse.

---

## 12. Implementation Tasks

### Task 1 — Create ml_services/ Folder Structure

Create `ml_services/` directory with 6 subdirectories. Each gets `main.py`, `Dockerfile`, `requirements.txt`.

**Migrate existing code** (move, do not rewrite — only add FastAPI HTTP/WS wrappers):
- `ml_services/face_detection/` ← `ai_interviewer/proctoring/verification/face_verification.py` + `monitoring/face_analysis_pipeline.py` + `monitoring/face_analysis_helpers.py` + `proctoring/model_registry.py`
- `ml_services/audio_analysis/` ← `ai_interviewer/proctoring/verification/voice_enrollment.py` + `monitoring/audio_pipeline.py`
- `ml_services/video_monitoring/` ← `ai_interviewer/proctoring/monitoring/video_pipeline.py` + `monitoring/room_scan.py`
- `ml_services/doc_verification/` ← `ai_interviewer/proctoring/verification/id_verification.py`
- `ml_services/lip_sync/` ← `ai_interviewer/syncnet-service/` (copy as-is + add `/health` endpoint)
- `ml_services/response_analysis/` ← `ai_interviewer/proctoring/analysis/plagiarism_engine.py` + `analysis/response_moderator.py`

Each service wraps its existing class in a FastAPI endpoint. Zero logic rewrite.

---

### Task 2 — Build ml_orchestrator/ Service

Create `ml_orchestrator/` with:
- `main.py` — FastAPI app: `POST /ml/infer`, `WS /ml/stream/{session_id}/{type}`, `GET /health`
- `schemas.py` — `MLTaskType`, `MLInferRequest`, `MLInferResponse`
- `router.py` — `MLRouter.dispatch_http()` (httpx to worker) + `MLRouter.proxy_websocket()` (asyncio.gather bidirectional proxy)
- `worker_registry.py` — `WORKER_POOLS`, `TASK_TO_SERVICE`, Redis session affinity
- `circuit_breaker.py` — per-worker health state (reuse `shared/circuit_breaker.py` pattern)
- `Dockerfile`

---

### Task 3 — Add MLOrchestratorClient to shared/

Create `shared/ml_orchestrator_client.py` (see Section 7.4 for full implementation).

This client is the only thing `ai_interviewer` (and any future ML Service) needs to call the entire ML stack.

---

### Task 4 — Refactor ai_interviewer Proctoring Router

Rewrite `ai_interviewer/routers/proctoring.py`:
- Remove all `from proctoring.xxx import ...` statements
- Remove all service class instantiations
- Each endpoint body becomes `result = await ml_client.infer(session_id, task_type, payload)`
- All endpoint URL paths + request/response shapes stay identical — zero frontend breakage

Rewrite `ai_interviewer/routers/proctoring_helpers.py` the same way.

---

### Task 5 — Refactor ai_interviewer WebSocket to Use ML Streams

Rewrite `ai_interviewer/manager_monitoring_mixin.py`:
- Remove `ProctoringEngine` instantiation (which loaded all models in-process)
- On interview start: connect to ml-orchestrator WS streams for face, audio, video
- Use `ml_client.stream_url(session_id, "face")` etc. for WS URLs
- Forward incoming candidate browser frames to the appropriate ML stream
- Receive violation events back from ML streams, dispatch them to the candidate WS session as before

Use `asyncio.create_task` to run face + audio + video streams concurrently without blocking the interview WS.

---

### Task 6 — Fix Finalization (Broken Since Deployment)

Rewrite `ai_interviewer/routers/finalization.py`:
- Remove broken `httpx.post(PROCTORING_SERVICE_URL + "/proctoring/stop-and-save/...")` (currently 502)
- Proctoring results (violations, behavioral scores) are now stored in Redis by ML containers during the interview; finalization reads from Redis key `ml:session:{session_id}:results`
- Plagiarism: replace `PlagiarismEngine(proxyllm_url, ...).analyze(transcript)` with `await ml_client.infer(session_id, "plagiarism_analyze", {...})`
- Update `ai_interviewer/config.py`: add `ML_ORCHESTRATOR_URL`, remove `PROCTORING_SERVICE_URL`

---

### Task 7 — Clean Up Dead Code

After Tasks 1–6 are complete and tested:
- Delete `ai_interviewer/proctoring/` (entire directory)
- Delete `ai_interviewer/proctoring-service/` (entire directory)
- Delete `ai_interviewer/recording-service/` (never deployed, superseded)
- Delete `ai_interviewer/syncnet-service/` (migrated to `ml_services/lip_sync/`)
- Remove `PROCTORING_SERVICE_URL` from `docker-compose.ai.yml` and `.env.ai`
- Remove commented-out `proctoring-service:` block from `docker-compose.ai.yml`
- Remove dead Kong `proctoring-service` route from `kong/ai/kong.yml`

---

### Task 8 — Update docker-compose.ai.yml + .env.ai

**docker-compose.ai.yml**:
- Add all 7 new service definitions (see Section 9)
- Add `ml_models_*` named volumes
- Update `ai-interviewer`: add `ML_ORCHESTRATOR_URL`, remove `PROCTORING_SERVICE_URL`, add `depends_on: ml-orchestrator: condition: service_healthy`

**`.env.ai`** — new keys to add:
```
ML_ORCHESTRATOR_URL=http://ml-orchestrator:8015
HUGGINGFACE_TOKEN=hf_...         # required for pyannote diarization
YOLO_MODEL=yolov8n.pt
OCR_LANGUAGES=en,hi
PROCTORING_ML_DEVICE=cpu
```

---

### Task 9 — Update Kong AI Gateway

**`kong/ai/kong.yml`**:
- Remove `proctoring-service` service + route block (dead route, produces 502)
- No new Kong routes needed for `ml-orchestrator` (it is internal only — ML Service calls it directly)
- Verify existing `/ai-interviewer` route still works after `ai_interviewer` config changes

---

### Task 10 — Integration Testing + Observability

**Integration tests**:
- `POST /ml/infer` with `face_verify` — round-trip with a known face image pair
- `POST /ml/infer` with `doc_id_verify` — OCR + face extraction from test ID
- WS stream session: connect face + video + audio streams simultaneously, send synthetic frames, assert violation events are emitted correctly
- Finalization end-to-end: full interview → stop → verify proctoring results present in DB (previously always empty due to 502)

**Prometheus metrics** (each ML container exposes `/metrics`):
- `ml_infer_duration_seconds{task_type}` — inference latency per task
- `ml_model_load_seconds{model}` — startup model load time
- `ml_active_sessions{stream_type}` — concurrent WS sessions per container
- `ml_orchestrator_dispatch_latency_ms{service}` — routing overhead
- `ml_worker_health{worker_id}` — circuit breaker state

---

## 13. Before vs After Summary

| Concern | Before | After |
|---------|--------|-------|
| RAM per ai_interviewer replica | ~3.3 GB (8 ML models) | ~150 MB (zero models) |
| Scale unit for ML work | Entire ai_interviewer pod | Individual ML container per domain |
| proctoring-service | Disabled — 502 on every finalization | Replaced by ml_orchestrator + face_detection + audio_analysis |
| recording-service | Not deployed (orphaned) | Replaced by video_monitoring + audio_analysis |
| syncnet-service | Not deployed (orphaned) | Replaced by lip_sync (copy as-is + wired) |
| Duplicate proctoring code | Two copies: `proctoring/` + `proctoring-service/proctoring/` | Single canonical copy per ml_services/* container |
| Finalization proctoring results | Silent failure every time | Read from Redis (populated live by ML containers during interview) |
| Adding a new ML domain (e.g. emotion detection) | Modify ai_interviewer, re-deploy everything | Add one new container to `ml_services/`, register in `ml_orchestrator/worker_registry.py` |
| GPU-accelerating face detection | Requires GPU on every ai_interviewer replica | Add GPU resource to only `ml-face-detection` replicas |
| Reuse ML in another project | Not possible (embedded, coupled to interview logic) | Any service calls `ml-orchestrator` — fully decoupled |
