# Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python

> Source: <https://dev.to/ussdlover/real-time-network-telemetry-for-ai-building-an-asynchronous-netflowsflow-ingestion-pipeline-in-35op>
> Published: 2026-06-26 10:56:50+00:00

As AI architectures transition from static data processing to real-time agentic monitoring, the data engineering pipelines feeding them must adapt. If you are building an AI-driven anomaly detection model, a security agent, or an automated network observability tool, you can't rely on historical batch logs. You need live, structural telemetry straight from the network edge.

When monitoring network traffic at scale, two industry standards dominate: **NetFlow** (stateful conversation aggregation) and **sFlow** (stateless packet-level sampling).

In this post, we will build a production-grade, asynchronous network ingestion engine using **Python** and **Scapy** that captures live wireless traffic, extracts 5-tuple flow metrics, and streams structured JSON events ready to be swallowed by a message broker (like Kafka) or an AI vector database.

The biggest pitfall when writing a packet sniffer in Python is blocking the execution loop. If your script captures a packet, processes it, formats a JSON string, and logs it sequentially, your script will choke and drop thousands of packets during a sudden burst of network traffic.

To feed an AI infrastructure safely, we must decouple the **Capture Phase** from the **Processing Phase** using a Producer-Consumer multi-threaded pattern.

```
[ Wireless Adapter ] 
       │
       ▼ (Raw Packets)
┌────────────────────────────────────────────────────────┐
│ PHASE 1: Capture (Scapy AsyncSniffer Thread)          │
│ - Sniffs raw frames in a non-blocking background loop   │
└────────────────────────────────────────────────────────┘
       │
       ▼ (Fast Handoff: .put_nowait)
┌────────────────────────────────────────────────────────┐
│ PHASE 2: Buffer (Thread-Safe bounded Queue)           │
│ - Acts as a shock absorber for network traffic bursts  │
└────────────────────────────────────────────────────────┘
       │
       ▼ (Distributed to Worker Pool)
┌────────────────────────────────────────────────────────┐
│ PHASE 3: Process & Emit (Concurrent Thread Workers)    │
│ - Worker 1  │  Worker 2  │  Worker 3  │  Worker 4      │
│ - Decodes Layers (IP, TCP, UDP)                        │
│ - Updates Stateful NetFlow Cache (Aggregates)          │
│ - Outputs Structured JSON Streaming Event              │
└────────────────────────────────────────────────────────┘
```

Below is the complete, modern Python implementation utilizing Scapy's explicit layer APIs, asynchronous execution, and thread-safe buffering.

Ensure you have the native packet capture library installed on your host system (`libpcap`

for Linux/macOS, `Npcap`

for Windows) and install Scapy:

```
pip install scapy
python
import os
import sys
import queue
import time
import json
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Tuple, Optional

# Explicitly target specific submodules for deterministic enterprise scopes
from scapy.all import AsyncSniffer, Packet, conf
from scapy.layers.inet import IP, TCP, UDP

# --- CONFIGURATION ---
MAX_QUEUE_SIZE = 10000
NUM_WORKERS = 4
# Replace with your specific active network interface index or string name
INTERFACE: Optional[str | int] = 24  

# --- DATA MODELS ---
@dataclass(frozen=True)
class FlowKey:
    src_ip: str
    dst_ip: str
    src_port: int
    dst_port: int
    protocol: int

@dataclass
class FlowMetrics:
    packet_count: int = 0
    byte_count: int = 0
    first_seen: float = 0.0
    last_seen: float = 0.0

# --- INGESTION PIPELINE ENGINE ---
class AIStreamIngestionPipeline:
    def __init__(self, interface: Optional[str | int] = None):
        self.interface = interface
        self.packet_queue: queue.Queue[Packet] = queue.Queue(maxsize=MAX_QUEUE_SIZE)
        self.flow_cache: Dict[FlowKey, FlowMetrics] = {}
        self.executor = ThreadPoolExecutor(max_workers=NUM_WORKERS)
        self.is_running = False
        self.sniffer: Optional[AsyncSniffer] = None

    def _extract_packet_meta(self, packet: Packet) -> Optional[Tuple[FlowKey, int]]:
        """Parses raw layers using container-optimized syntax."""
        if IP not in packet:
            return None

        ip_layer = packet[IP]
        src_port, dst_port = 0, 0

        # Safe inspection of Layer 4 using direct class indexing
        if TCP in packet:
            src_port = packet[TCP].sport
            dst_port = packet[TCP].dport
        elif UDP in packet:
            src_port = packet[UDP].sport
            dst_port = packet[UDP].dport

        key = FlowKey(
            src_ip=ip_layer.src,
            dst_ip=ip_layer.dst,
            src_port=src_port,
            dst_port=dst_port,
            protocol=ip_layer.proto
        )
        return key, len(packet)

    def _worker_process_queue(self) -> None:
        """Isolated consumer thread pool extracting flow features."""
        while self.is_running or not self.packet_queue.empty():
            try:
                packet = self.packet_queue.get(timeout=0.5)
            except queue.Empty:
                continue

            meta = self._extract_packet_meta(packet)
            if not meta:
                self.packet_queue.task_done()
                continue

            key, packet_len = meta
            current_time = time.time()

            # 1. Stateful Aggregation (NetFlow Telemetry Layer)
            if key not in self.flow_cache:
                self.flow_cache[key] = FlowMetrics(first_seen=current_time)

            metrics = self.flow_cache[key]
            metrics.packet_count += 1
            metrics.byte_count += packet_len
            metrics.last_seen = current_time

            # 2. Structured Streaming Payload (sFlow Event Layer)
            ingest_payload = {
                "event_type": "network_flow_sample",
                "timestamp": current_time,
                "flow_key": asdict(key),
                "packet_size_bytes": packet_len,
                "aggregate_metrics": asdict(metrics)
            }

            # Emit clean JSON to stdout (pipe into a stream wrapper, Kafka, or vector db)
            print(json.dumps(ingest_payload))
            self.packet_queue.task_done()

    def _enqueue_packet(self, packet: Packet) -> None:
        """High-speed producer callback. Hands off raw socket frames immediately."""
        try:
            self.packet_queue.put_nowait(packet)
        except queue.Full:
            # Backpressure handling: Drops edge frames gracefully if queue overflows
            pass

    def start(self) -> None:
        """Spawns workers and executes the non-blocking packet sniffer thread."""
        print(f"[*] Initializing pipeline on interface: {self.interface or 'Default Active OS Interface'}",
              file=sys.stderr)
        self.is_running = True

        for _ in range(NUM_WORKERS):
            self.executor.submit(self._worker_process_queue)

        self.sniffer = AsyncSniffer(
            iface=self.interface,
            prn=self._enqueue_packet,
            store=False  # Crucial: Drops raw historical packets out of memory instantly
        )
        self.sniffer.start()
        print("[*] Ingestion pipeline is live and streaming...", file=sys.stderr)

    def stop(self) -> None:
        """Gracefully signs off threads and closes open sockets."""
        print("\n[*] Shutting down ingestion pipeline gracefully...", file=sys.stderr)
        if self.sniffer:
            self.sniffer.stop()

        self.is_running = False
        self.executor.shutdown(wait=True)
        print("[*] Pipeline stopped cleanly.", file=sys.stderr)

if __name__ == "__main__":
    # Administrative privilege guard for raw socket access
    if os.name != 'nt' and os.getuid() != 0:
        print("[-] Critical Error: Root privileges required for raw network socket access.", file=sys.stderr)
        sys.exit(1)

    # Windows Device GUID Resolution block
    target_interface = INTERFACE
    if os.name == 'nt' and isinstance(INTERFACE, int):
        try:
            target_interface = conf.ifaces.dev_from_index(INTERFACE)
            print(f"[*] Resolved index {INTERFACE} to device: {target_interface.description}", file=sys.stderr)
        except Exception as e:
            print(f"[-] Warning: Could not resolve interface index {INTERFACE}: {e}. Falling back.", file=sys.stderr)

    pipeline = AIStreamIngestionPipeline(interface=target_interface)
    try:
        pipeline.start()
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pipeline.stop()
```

Instead of executing high-overhead inspection functions like `packet.haslayer(IP)`

, this script checks containment explicitly using `IP in packet`

. Scapy overloads Python’s internal `__contains__`

magic methods, ensuring layer validation occurs at the compiler layer.

`store=False`

)
By default, Scapy stores every single packet captured in an internal history list inside RAM. Running a default sniffer on an enterprise pipeline for an hour will result in a quick memory leak crash. By assigning `store=False`

in our `AsyncSniffer`

, we consume the packet from the adapter, pass it to our local queue, and free its memory instantly.

Our ingestion loop forces a strict `maxsize=10000`

rule and uses a non-blocking `put_nowait()`

execution. If the downstream data processor (e.g., your LLM evaluator or vector engine) experiences a latency hiccup, the pipeline intentionally drops incoming packets at the edge rather than letting the memory buffer balloon out of control.

The pipeline aggregates connection metadata into a stateful 5-tuple format, while instantly streaming stateless structural JSON tokens. This gives your downstream AI immediate access to critical parameters:

```
{
  "event_type": "network_flow_sample", 
  "timestamp": 1719409375.42, 
  "flow_key": {
    "src_ip": "10.17.245.72", 
    "dst_ip": "142.250.180.142", 
    "src_port": 54211, 
    "dst_port": 443, 
    "protocol": 6
  }, 
  "packet_size_bytes": 1420, 
  "aggregate_metrics": {
    "packet_count": 42, 
    "byte_count": 59640, 
    "first_seen": 1719409371.11, 
    "last_seen": 1719409375.42
  }
}
```

This structural metadata provides your AI system with a comprehensive macro-view map of network behavior, allowing agents to accurately cross-reference anomalies without drowning your entire feature store in costly, raw packet captures.
