cd /news/artificial-intelligence/real-time-network-telemetry-for-ai-b… Β· home β€Ί topics β€Ί artificial-intelligence β€Ί article
[ARTICLE Β· art-40654] src=dev.to β†— pub= topic=artificial-intelligence verified=true sentiment=Β· neutral

Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python

A developer built a production-grade asynchronous NetFlow/sFlow ingestion pipeline in Python using Scapy to capture live network traffic and stream structured JSON events for AI-driven anomaly detection and network observability. The pipeline decouples packet capture from processing using a producer-consumer multi-threaded pattern with a thread-safe queue to handle traffic bursts without dropping packets.

read6 min views1 publishedJun 26, 2026

As AI architectures transition from static data processing to real-time agentic monitoring, the data engineering pipelines feeding them must adapt. If you are building an AI-driven anomaly detection model, a security agent, or an automated network observability tool, you can't rely on historical batch logs. You need live, structural telemetry straight from the network edge.

When monitoring network traffic at scale, two industry standards dominate: NetFlow (stateful conversation aggregation) and sFlow (stateless packet-level sampling).

In this post, we will build a production-grade, asynchronous network ingestion engine using Python and Scapy that captures live wireless traffic, extracts 5-tuple flow metrics, and streams structured JSON events ready to be swallowed by a message broker (like Kafka) or an AI vector database.

The biggest pitfall when writing a packet sniffer in Python is blocking the execution loop. If your script captures a packet, processes it, formats a JSON string, and logs it sequentially, your script will choke and drop thousands of packets during a sudden burst of network traffic.

To feed an AI infrastructure safely, we must decouple the Capture Phase from the Processing Phase using a Producer-Consumer multi-threaded pattern.

[ Wireless Adapter ] 
       β”‚
       β–Ό (Raw Packets)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ PHASE 1: Capture (Scapy AsyncSniffer Thread)          β”‚
β”‚ - Sniffs raw frames in a non-blocking background loop   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β–Ό (Fast Handoff: .put_nowait)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ PHASE 2: Buffer (Thread-Safe bounded Queue)           β”‚
β”‚ - Acts as a shock absorber for network traffic bursts  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β–Ό (Distributed to Worker Pool)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ PHASE 3: Process & Emit (Concurrent Thread Workers)    β”‚
β”‚ - Worker 1  β”‚  Worker 2  β”‚  Worker 3  β”‚  Worker 4      β”‚
β”‚ - Decodes Layers (IP, TCP, UDP)                        β”‚
β”‚ - Updates Stateful NetFlow Cache (Aggregates)          β”‚
β”‚ - Outputs Structured JSON Streaming Event              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Below is the complete, modern Python implementation utilizing Scapy's explicit layer APIs, asynchronous execution, and thread-safe buffering.

Ensure you have the native packet capture library installed on your host system (libpcap

for Linux/macOS, Npcap

for Windows) and install Scapy:

pip install scapy
python
import os
import sys
import queue
import time
import json
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Tuple, Optional

from scapy.all import AsyncSniffer, Packet, conf
from scapy.layers.inet import IP, TCP, UDP

MAX_QUEUE_SIZE = 10000
NUM_WORKERS = 4
INTERFACE: Optional[str | int] = 24  

@dataclass(frozen=True)
class FlowKey:
    src_ip: str
    dst_ip: str
    src_port: int
    dst_port: int
    protocol: int

@dataclass
class FlowMetrics:
    packet_count: int = 0
    byte_count: int = 0
    first_seen: float = 0.0
    last_seen: float = 0.0

class AIStreamIngestionPipeline:
    def __init__(self, interface: Optional[str | int] = None):
        self.interface = interface
        self.packet_queue: queue.Queue[Packet] = queue.Queue(maxsize=MAX_QUEUE_SIZE)
        self.flow_cache: Dict[FlowKey, FlowMetrics] = {}
        self.executor = ThreadPoolExecutor(max_workers=NUM_WORKERS)
        self.is_running = False
        self.sniffer: Optional[AsyncSniffer] = None

    def _extract_packet_meta(self, packet: Packet) -> Optional[Tuple[FlowKey, int]]:
        """Parses raw layers using container-optimized syntax."""
        if IP not in packet:
            return None

        ip_layer = packet[IP]
        src_port, dst_port = 0, 0

        if TCP in packet:
            src_port = packet[TCP].sport
            dst_port = packet[TCP].dport
        elif UDP in packet:
            src_port = packet[UDP].sport
            dst_port = packet[UDP].dport

        key = FlowKey(
            src_ip=ip_layer.src,
            dst_ip=ip_layer.dst,
            src_port=src_port,
            dst_port=dst_port,
            protocol=ip_layer.proto
        )
        return key, len(packet)

    def _worker_process_queue(self) -> None:
        """Isolated consumer thread pool extracting flow features."""
        while self.is_running or not self.packet_queue.empty():
            try:
                packet = self.packet_queue.get(timeout=0.5)
            except queue.Empty:
                continue

            meta = self._extract_packet_meta(packet)
            if not meta:
                self.packet_queue.task_done()
                continue

            key, packet_len = meta
            current_time = time.time()

            if key not in self.flow_cache:
                self.flow_cache[key] = FlowMetrics(first_seen=current_time)

            metrics = self.flow_cache[key]
            metrics.packet_count += 1
            metrics.byte_count += packet_len
            metrics.last_seen = current_time

            ingest_payload = {
                "event_type": "network_flow_sample",
                "timestamp": current_time,
                "flow_key": asdict(key),
                "packet_size_bytes": packet_len,
                "aggregate_metrics": asdict(metrics)
            }

            print(json.dumps(ingest_payload))
            self.packet_queue.task_done()

    def _enqueue_packet(self, packet: Packet) -> None:
        """High-speed producer callback. Hands off raw socket frames immediately."""
        try:
            self.packet_queue.put_nowait(packet)
        except queue.Full:
            pass

    def start(self) -> None:
        """Spawns workers and executes the non-blocking packet sniffer thread."""
        print(f"[*] Initializing pipeline on interface: {self.interface or 'Default Active OS Interface'}",
              file=sys.stderr)
        self.is_running = True

        for _ in range(NUM_WORKERS):
            self.executor.submit(self._worker_process_queue)

        self.sniffer = AsyncSniffer(
            iface=self.interface,
            prn=self._enqueue_packet,
            store=False  # Crucial: Drops raw historical packets out of memory instantly
        )
        self.sniffer.start()
        print("[*] Ingestion pipeline is live and streaming...", file=sys.stderr)

    def stop(self) -> None:
        """Gracefully signs off threads and closes open sockets."""
        print("\n[*] Shutting down ingestion pipeline gracefully...", file=sys.stderr)
        if self.sniffer:
            self.sniffer.stop()

        self.is_running = False
        self.executor.shutdown(wait=True)
        print("[*] Pipeline stopped cleanly.", file=sys.stderr)

if __name__ == "__main__":
    if os.name != 'nt' and os.getuid() != 0:
        print("[-] Critical Error: Root privileges required for raw network socket access.", file=sys.stderr)
        sys.exit(1)

    target_interface = INTERFACE
    if os.name == 'nt' and isinstance(INTERFACE, int):
        try:
            target_interface = conf.ifaces.dev_from_index(INTERFACE)
            print(f"[*] Resolved index {INTERFACE} to device: {target_interface.description}", file=sys.stderr)
        except Exception as e:
            print(f"[-] Warning: Could not resolve interface index {INTERFACE}: {e}. Falling back.", file=sys.stderr)

    pipeline = AIStreamIngestionPipeline(interface=target_interface)
    try:
        pipeline.start()
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pipeline.stop()

Instead of executing high-overhead inspection functions like packet.haslayer(IP)

, this script checks containment explicitly using IP in packet

. Scapy overloads Python’s internal __contains__

magic methods, ensuring layer validation occurs at the compiler layer.

store=False

) By default, Scapy stores every single packet captured in an internal history list inside RAM. Running a default sniffer on an enterprise pipeline for an hour will result in a quick memory leak crash. By assigning store=False

in our AsyncSniffer

, we consume the packet from the adapter, pass it to our local queue, and free its memory instantly.

Our ingestion loop forces a strict maxsize=10000

rule and uses a non-blocking put_nowait()

execution. If the downstream data processor (e.g., your LLM evaluator or vector engine) experiences a latency hiccup, the pipeline intentionally drops incoming packets at the edge rather than letting the memory buffer balloon out of control.

The pipeline aggregates connection metadata into a stateful 5-tuple format, while instantly streaming stateless structural JSON tokens. This gives your downstream AI immediate access to critical parameters:

{
  "event_type": "network_flow_sample", 
  "timestamp": 1719409375.42, 
  "flow_key": {
    "src_ip": "10.17.245.72", 
    "dst_ip": "142.250.180.142", 
    "src_port": 54211, 
    "dst_port": 443, 
    "protocol": 6
  }, 
  "packet_size_bytes": 1420, 
  "aggregate_metrics": {
    "packet_count": 42, 
    "byte_count": 59640, 
    "first_seen": 1719409371.11, 
    "last_seen": 1719409375.42
  }
}

This structural metadata provides your AI system with a comprehensive macro-view map of network behavior, allowing agents to accurately cross-reference anomalies without drowning your entire feature store in costly, raw packet captures.

── more in #artificial-intelligence 4 stories Β· sorted by recency
── more on @python 3 stories trending now
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain β€” perfect for shipping the agent you just read about.

$git push zahid main
β†’ Live at https://your-agent.zahid.host βœ“
Get free account β†’ Pricing
from €0/mo Β· no card required
LIVE [news/real-time-network-te…] indexed:0 read:6min 2026-06-26 Β· β€”