Building an AI Agent That Responds to Real-Time Events with AWS Bedrock, Kinesis, DynamoDB, and S3 A developer built a real-time AI recommendation agent using AWS Bedrock, Kinesis, DynamoDB, and S3. The system reacts to streaming user behavior within seconds, updating recommendation sets asynchronously without blocking the user. The architecture uses Kinesis for ingestion, Lambda and Bedrock for processing, and DynamoDB for caching. Most recommendation systems are batch jobs. They crunch last night's data, write a recommendations table, and serve it all day. That works fine until your user watches three thriller movies in a row at 9pm and your system is still recommending rom-coms because the batch hasn't run yet. In this post I'll walk through building an agent system that reacts to streaming user behavior in real time using: By the end you'll have an architecture where a user's recommendation set updates within seconds of their behavior changing. The system has three layers: | Layer | Services | Responsibility | |---|---|---| Ingest | Kinesis Data Streams, Kinesis Firehose | Capture and fan-out user events | Process & Reason | Lambda, Amazon Bedrock Agent | Enrich events, build context, invoke LLM | Store & Serve | DynamoDB, S3 | Persist profiles, cache recs, store artifacts | The key design decision is keeping the hot path Kinesis → Lambda → Bedrock → DynamoDB fully async and the serving path API → DynamoDB cache completely decoupled. The user never waits for Bedrock to respond; they get the last cached recommendation set while a fresh one is already being computed in the background. Here's what happens end to end when a user clicks on a product: user.interaction event to This is your app-side producer. Keep it thin — just serialize and publish. Do all enrichment downstream. python import boto3 import json import uuid from datetime import datetime, timezone kinesis = boto3.client 'kinesis', region name='us-east-1' def publish interaction user id: str, item id: str, event type: str, metadata: dict = {} : """ Publish a user interaction event to Kinesis Data Streams. Partition key is user id so all events for a user land on the same shard. """ event = { 'event id': str uuid.uuid4 , 'user id': user id, 'item id': item id, 'event type': event type, 'click', 'purchase', 'dwell', 'skip' 'timestamp': datetime.now timezone.utc .isoformat , 'metadata': metadata, } response = kinesis.put record StreamName='user-interactions', Data=json.dumps event .encode 'utf-8' , PartitionKey=user id, consistent routing per user return response 'SequenceNumber' Example call publish interaction user id='u 8821', item id='prod thriller 042', event type='purchase', metadata={'price': 14.99, 'category': 'thriller', 'session id': 'sess xyz'} Tip:Use user id as the partition key so all events for a given user land on the same shard and arrive in order. This matters when Lambda is building a recency-ordered event window. This is the core of the pipeline. The Lambda reads from the Kinesis stream, pulls user context from DynamoDB, and invokes the Bedrock Agent with a structured prompt. python import boto3 import json import os from datetime import datetime, timezone dynamodb = boto3.resource 'dynamodb' bedrock = boto3.client 'bedrock-agent-runtime', region name='us-east-1' profiles table = dynamodb.Table os.environ 'PROFILES TABLE' DynamoDB User Profiles rec table = dynamodb.Table os.environ 'REC CACHE TABLE' DynamoDB Rec Cache AGENT ID = os.environ 'BEDROCK AGENT ID' AGENT ALIAS = os.environ 'BEDROCK AGENT ALIAS' MAX HISTORY = 20 last N events to include in context def handler event, context : for record in event 'Records' : Kinesis payload is base64-encoded payload = json.loads record 'kinesis' 'data' process event payload def process event payload: dict : user id = payload 'user id' item id = payload 'item id' evt type = payload 'event type' 1. Fetch user profile + recent history from DynamoDB response = profiles table.get item Key={'user id': user id} profile = response.get 'Item', {'user id': user id, 'history': , 'preferences': {}} 2. Append current event and trim to window profile 'history' .append { 'item id': item id, 'event type': evt type, 'timestamp': payload 'timestamp' , 'metadata': payload.get 'metadata', {} , } profile 'history' = profile 'history' -MAX HISTORY: 3. Write enriched profile back profiles table.put item Item=profile 4. Build prompt for Bedrock Agent prompt = build personalization prompt profile 5. Invoke Bedrock Agent agent response = bedrock.invoke agent agentId=AGENT ID, agentAliasId=AGENT ALIAS, sessionId=user id, session per user keeps conversational context inputText=prompt, 6. Parse streaming response chunks recommendations = parse agent response agent response 7. Write to recommendation cache rec table.put item Item={ 'user id': user id, 'recommendations': recommendations, 'generated at': datetime.now timezone.utc .isoformat , 'ttl': int datetime.now timezone.utc .timestamp + 3600, 1hr TTL } def build personalization prompt profile: dict - str: history summary = '\n'.join f"- {e 'event type' .upper } item={e 'item id' } category={e 'metadata' .get 'category','unknown' }" for e in profile 'history' -10: return f"""You are a real-time personalization agent. User profile: {json.dumps profile.get 'preferences', {} } Recent interactions most recent last : {history summary} Based on this behavior, return exactly 5 personalized item recommendations as a JSON array. Each item must include: item id, category, reasoning 1 sentence , confidence score 0-1 . Return only valid JSON. No explanation outside the JSON block.""" def parse agent response agent response - list: full text = '' for event in agent response 'completion' : if 'chunk' in event: full text += event 'chunk' 'bytes' .decode 'utf-8' try: Extract JSON from response start = full text.index ' ' end = full text.rindex ' ' + 1 return json.loads full text start:end except ValueError, json.JSONDecodeError : return The serving layer never touches Bedrock. It reads purely from the DynamoDB cache, keeping p99 latency well under 10ms. python import boto3 import json import os from datetime import datetime, timezone dynamodb = boto3.resource 'dynamodb' rec table = dynamodb.Table os.environ 'REC CACHE TABLE' FALLBACK RECS = 'popular 001', 'popular 002', 'popular 003' cold-start fallback def handler event, context : user id = event 'pathParameters' 'userId' response = rec table.get item Key={'user id': user id} item = response.get 'Item' if not item: Cold start: user has no history yet return api response 200, { 'user id': user id, 'recommendations': FALLBACK RECS, 'source': 'fallback', 'generated at': None, } age seconds = datetime.now timezone.utc - datetime.fromisoformat item 'generated at' .total seconds return api response 200, { 'user id': user id, 'recommendations': item 'recommendations' , 'source': 'cache', 'generated at': item 'generated at' , 'cache age sec': int age seconds , } def api response status: int, body: dict - dict: return { 'statusCode': status, 'headers': { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': ' ', }, 'body': json.dumps body , } | Service | Why it's here | Alternative considered | |---|---|---| Kinesis Data Streams | Ordered, replayable, millisecond latency fan-out | SQS no ordering guarantee per user , EventBridge higher latency | Kinesis Firehose | Zero-code delivery to S3 for archiving | Writing to S3 directly in Lambda adds failure surface | Lambda | Event-driven, scales to 0, tight Kinesis integration | ECS Fargate overkill for stateless enrichment | Amazon Bedrock | Managed LLM with agent runtime, no infra to maintain | Self-hosted model on SageMaker more control, much more ops | DynamoDB | Single-digit ms reads, TTL support, scales automatically | RDS too slow for hot path , ElastiCache extra cost for separate store | S3 | Cheap durable archive + model artifact store | DynamoDB for raw events expensive and unnecessary | Bedrock latency is variable. Claude Sonnet typically responds in 1-4 seconds but can spike. Since recs are written async to cache, this doesn't affect user-facing latency, but it does affect freshness. Monitor bedrock:InvokeAgent duration in CloudWatch. Kinesis shard scaling. One shard handles 1MB/s write or 1000 records/s. At 10k active users you'll need to plan shard count carefully. Use Enhanced Fan-Out if you have multiple Lambda consumers reading the same stream. DynamoDB TTL for cache eviction. The serving Lambda sets a 1-hour TTL on each rec entry. If Bedrock hasn't updated the cache in over an hour e.g. Lambda errors , users fall back to the popular items list. Adjust TTL based on how stale you can tolerate. Cold start users. New users have no history so the Bedrock prompt has nothing useful to reason over. I recommend a popularity-based fallback as shown in the serving Lambda, and switching to personalized recs after the user's first 3-5 interactions. The pattern here is worth generalizing: keep the reasoning layer Bedrock fully off the hot serving path. Write results to a fast cache DynamoDB , serve from the cache, and let the agent pipeline update it continuously in the background. This gives you the intelligence of an LLM-powered agent without the latency of one. The same pattern applies to fraud scoring, content moderation queues, ops alerting — anywhere you need a reasoning system that reacts to real-time streams without blocking the user experience.