Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python A developer built a production-grade asynchronous NetFlow/sFlow ingestion pipeline in Python using Scapy to capture live network traffic and stream structured JSON events for AI-driven anomaly detection and network observability. The pipeline decouples packet capture from processing using a producer-consumer multi-threaded pattern with a thread-safe queue to handle traffic bursts without dropping packets. As AI architectures transition from static data processing to real-time agentic monitoring, the data engineering pipelines feeding them must adapt. If you are building an AI-driven anomaly detection model, a security agent, or an automated network observability tool, you can't rely on historical batch logs. You need live, structural telemetry straight from the network edge. When monitoring network traffic at scale, two industry standards dominate: NetFlow stateful conversation aggregation and sFlow stateless packet-level sampling . In this post, we will build a production-grade, asynchronous network ingestion engine using Python and Scapy that captures live wireless traffic, extracts 5-tuple flow metrics, and streams structured JSON events ready to be swallowed by a message broker like Kafka or an AI vector database. The biggest pitfall when writing a packet sniffer in Python is blocking the execution loop. If your script captures a packet, processes it, formats a JSON string, and logs it sequentially, your script will choke and drop thousands of packets during a sudden burst of network traffic. To feed an AI infrastructure safely, we must decouple the Capture Phase from the Processing Phase using a Producer-Consumer multi-threaded pattern. Wireless Adapter │ ▼ Raw Packets ┌────────────────────────────────────────────────────────┐ │ PHASE 1: Capture Scapy AsyncSniffer Thread │ │ - Sniffs raw frames in a non-blocking background loop │ └────────────────────────────────────────────────────────┘ │ ▼ Fast Handoff: .put nowait ┌────────────────────────────────────────────────────────┐ │ PHASE 2: Buffer Thread-Safe bounded Queue │ │ - Acts as a shock absorber for network traffic bursts │ └────────────────────────────────────────────────────────┘ │ ▼ Distributed to Worker Pool ┌────────────────────────────────────────────────────────┐ │ PHASE 3: Process & Emit Concurrent Thread Workers │ │ - Worker 1 │ Worker 2 │ Worker 3 │ Worker 4 │ │ - Decodes Layers IP, TCP, UDP │ │ - Updates Stateful NetFlow Cache Aggregates │ │ - Outputs Structured JSON Streaming Event │ └────────────────────────────────────────────────────────┘ Below is the complete, modern Python implementation utilizing Scapy's explicit layer APIs, asynchronous execution, and thread-safe buffering. Ensure you have the native packet capture library installed on your host system libpcap for Linux/macOS, Npcap for Windows and install Scapy: pip install scapy python import os import sys import queue import time import json from dataclasses import dataclass, asdict from concurrent.futures import ThreadPoolExecutor from typing import Dict, Tuple, Optional Explicitly target specific submodules for deterministic enterprise scopes from scapy.all import AsyncSniffer, Packet, conf from scapy.layers.inet import IP, TCP, UDP --- CONFIGURATION --- MAX QUEUE SIZE = 10000 NUM WORKERS = 4 Replace with your specific active network interface index or string name INTERFACE: Optional str | int = 24 --- DATA MODELS --- @dataclass frozen=True class FlowKey: src ip: str dst ip: str src port: int dst port: int protocol: int @dataclass class FlowMetrics: packet count: int = 0 byte count: int = 0 first seen: float = 0.0 last seen: float = 0.0 --- INGESTION PIPELINE ENGINE --- class AIStreamIngestionPipeline: def init self, interface: Optional str | int = None : self.interface = interface self.packet queue: queue.Queue Packet = queue.Queue maxsize=MAX QUEUE SIZE self.flow cache: Dict FlowKey, FlowMetrics = {} self.executor = ThreadPoolExecutor max workers=NUM WORKERS self.is running = False self.sniffer: Optional AsyncSniffer = None def extract packet meta self, packet: Packet - Optional Tuple FlowKey, int : """Parses raw layers using container-optimized syntax.""" if IP not in packet: return None ip layer = packet IP src port, dst port = 0, 0 Safe inspection of Layer 4 using direct class indexing if TCP in packet: src port = packet TCP .sport dst port = packet TCP .dport elif UDP in packet: src port = packet UDP .sport dst port = packet UDP .dport key = FlowKey src ip=ip layer.src, dst ip=ip layer.dst, src port=src port, dst port=dst port, protocol=ip layer.proto return key, len packet def worker process queue self - None: """Isolated consumer thread pool extracting flow features.""" while self.is running or not self.packet queue.empty : try: packet = self.packet queue.get timeout=0.5 except queue.Empty: continue meta = self. extract packet meta packet if not meta: self.packet queue.task done continue key, packet len = meta current time = time.time 1. Stateful Aggregation NetFlow Telemetry Layer if key not in self.flow cache: self.flow cache key = FlowMetrics first seen=current time metrics = self.flow cache key metrics.packet count += 1 metrics.byte count += packet len metrics.last seen = current time 2. Structured Streaming Payload sFlow Event Layer ingest payload = { "event type": "network flow sample", "timestamp": current time, "flow key": asdict key , "packet size bytes": packet len, "aggregate metrics": asdict metrics } Emit clean JSON to stdout pipe into a stream wrapper, Kafka, or vector db print json.dumps ingest payload self.packet queue.task done def enqueue packet self, packet: Packet - None: """High-speed producer callback. Hands off raw socket frames immediately.""" try: self.packet queue.put nowait packet except queue.Full: Backpressure handling: Drops edge frames gracefully if queue overflows pass def start self - None: """Spawns workers and executes the non-blocking packet sniffer thread.""" print f" Initializing pipeline on interface: {self.interface or 'Default Active OS Interface'}", file=sys.stderr self.is running = True for in range NUM WORKERS : self.executor.submit self. worker process queue self.sniffer = AsyncSniffer iface=self.interface, prn=self. enqueue packet, store=False Crucial: Drops raw historical packets out of memory instantly self.sniffer.start print " Ingestion pipeline is live and streaming...", file=sys.stderr def stop self - None: """Gracefully signs off threads and closes open sockets.""" print "\n Shutting down ingestion pipeline gracefully...", file=sys.stderr if self.sniffer: self.sniffer.stop self.is running = False self.executor.shutdown wait=True print " Pipeline stopped cleanly.", file=sys.stderr if name == " main ": Administrative privilege guard for raw socket access if os.name = 'nt' and os.getuid = 0: print " - Critical Error: Root privileges required for raw network socket access.", file=sys.stderr sys.exit 1 Windows Device GUID Resolution block target interface = INTERFACE if os.name == 'nt' and isinstance INTERFACE, int : try: target interface = conf.ifaces.dev from index INTERFACE print f" Resolved index {INTERFACE} to device: {target interface.description}", file=sys.stderr except Exception as e: print f" - Warning: Could not resolve interface index {INTERFACE}: {e}. Falling back.", file=sys.stderr pipeline = AIStreamIngestionPipeline interface=target interface try: pipeline.start while True: time.sleep 1 except KeyboardInterrupt: pipeline.stop Instead of executing high-overhead inspection functions like packet.haslayer IP , this script checks containment explicitly using IP in packet . Scapy overloads Python’s internal contains magic methods, ensuring layer validation occurs at the compiler layer. store=False By default, Scapy stores every single packet captured in an internal history list inside RAM. Running a default sniffer on an enterprise pipeline for an hour will result in a quick memory leak crash. By assigning store=False in our AsyncSniffer , we consume the packet from the adapter, pass it to our local queue, and free its memory instantly. Our ingestion loop forces a strict maxsize=10000 rule and uses a non-blocking put nowait execution. If the downstream data processor e.g., your LLM evaluator or vector engine experiences a latency hiccup, the pipeline intentionally drops incoming packets at the edge rather than letting the memory buffer balloon out of control. The pipeline aggregates connection metadata into a stateful 5-tuple format, while instantly streaming stateless structural JSON tokens. This gives your downstream AI immediate access to critical parameters: { "event type": "network flow sample", "timestamp": 1719409375.42, "flow key": { "src ip": "10.17.245.72", "dst ip": "142.250.180.142", "src port": 54211, "dst port": 443, "protocol": 6 }, "packet size bytes": 1420, "aggregate metrics": { "packet count": 42, "byte count": 59640, "first seen": 1719409371.11, "last seen": 1719409375.42 } } This structural metadata provides your AI system with a comprehensive macro-view map of network behavior, allowing agents to accurately cross-reference anomalies without drowning your entire feature store in costly, raw packet captures.