Updated June 2026. When I first published this in March 2025, I framed it around the term “Model Context Protocol.” That was a mistake: “MCP” now refers to Anthropic’s open standard for connecting models to tools and data, which is a different (and complementary) concern. This revision renames the home-grown layer described here, keeps the engineering intact, and adds a section on where the two fit together. See How this relates to MCP below.
After exploring the architecture and implementation of autonomous multi-agent systems for telecom customer service in my previous article, I want to address one of the hardest problems I outlined: context engineering. How do you decide what goes into a model’s limited context window, how do you keep it relevant as a conversation grows, and how do you transfer it cleanly when one agent hands off to another? This post builds a small context-packaging layer (the basis of my PyContext library) to answer those questions.
An initial implementation of this context-management layer for autonomous multi-agent systems is available here. Contributions are most welcome.
The problem: context is a scarce, shared resource
In a multi-agent system, the context window is the workspace every agent shares. Even as windows have grown to hundreds of thousands of tokens, what you put in that window still determines cost, latency, and answer quality. Four problems recur:
- Heterogeneous agents need a common way to exchange context, not ad-hoc string concatenation
- Transmission cost grows with every token moved between agents, so payloads need minimizing
- Not all context is equal, so blocks need semantic tagging for routing and prioritization
- Multi-step operations need versioned context so handoffs are reproducible
The layer in this post (I call it PyContext) addresses these with a typed context-package format, relevance-based pruning, and a distributed store. It is a library pattern, not an industry standard, and it is deliberately small. Let’s dig into the implementation.
How this relates to MCP
If you arrived here searching for the Model Context Protocol, here is the short version. MCP is an open JSON-RPC standard, introduced by Anthropic in late 2024 and now supported across Anthropic, OpenAI, Google, Microsoft, and most major developer tools. It defines a host/client/server model with three primitives: tools (actions a model can call), resources (read-only context a server exposes), and prompts (reusable templates), over stdio and Streamable HTTP transports with OAuth 2.1 auth.
MCP answers a different question than this post. MCP is about connectivity: how a model or agent reaches external tools and data in a standard way. The layer here is about economics: once that context is available, what actually goes into the limited window, how it is scored and pruned, and how it moves between agents. The two compose cleanly. An MCP server (or several) feeds resources and tool results in; a context-packaging layer like the one below decides what survives into the prompt and travels across a handoff. If you are building agents in 2026, you almost certainly want both.
Technical Implementation
Core Protocol Definition
At its heart, the layer defines a protocol-buffer schema that any agent in the system can produce and consume. Here’s a simplified version of the core format:
syntax = "proto3";
package pycontext;
message ContextBlock {
string id = 1;
uint64 timestamp = 2;
string content = 3;
float relevance_score = 4;
map<string, string> metadata = 5;
ContextType type = 6;
uint32 token_count = 7;
repeated string references = 8;
}
enum ContextType {
SYSTEM = 0;
USER = 1;
AGENT = 2;
MEMORY = 3;
KNOWLEDGE = 4;
TOOL = 5;
}
message ContextPackage {
string session_id = 1;
string agent_id = 2;
repeated ContextBlock blocks = 3;
ContextMetrics metrics = 4;
uint32 version = 5;
string trace_id = 6;
}
message ContextMetrics {
uint32 total_tokens = 1;
float context_saturation = 2;
map<string, float> type_distribution = 3;
}
This protocol definition enables serialized context transmission across agent boundaries while maintaining critical metadata that informs context utilization decisions.
Python Implementation
Let’s implement a Python client for this format that we can use across the agent architecture:
import time
import uuid
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
class ContextType(Enum):
SYSTEM = 0
USER = 1
AGENT = 2
MEMORY = 3
KNOWLEDGE = 4
TOOL = 5
@dataclass
class ContextBlock:
id: str
content: str
relevance_score: float
type: ContextType
metadata: Dict[str, str] = None
timestamp: int = None
token_count: int = None
references: List[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = int(time.time() * 1000)
if self.metadata is None:
self.metadata = {}
if self.references is None:
self.references = []
if self.token_count is None:
# Rough word-count proxy for token count.
# In production, use the model's tokenizer or a count-tokens
# endpoint (e.g. tiktoken, or the provider's token-counting API);
# whitespace splitting under-counts real tokens by 15-30%.
self.token_count = len(self.content.split())
@dataclass
class ContextMetrics:
total_tokens: int
context_saturation: float
type_distribution: Dict[str, float]
@dataclass
class ContextPackage:
session_id: str
agent_id: str
blocks: List[ContextBlock]
metrics: ContextMetrics
version: int = 1
trace_id: str = None
def __post_init__(self):
if self.trace_id is None:
self.trace_id = str(uuid.uuid4())
def calculate_metrics(self) -> None:
"""Calculate metrics based on current context blocks"""
total_tokens = sum(block.token_count for block in self.blocks)
# Saturation is measured against a working budget, not the model's
# hard limit. Modern models offer 200K-1M token windows, but you still
# cap a working window: cost scales with tokens, latency grows, and
# very long contexts dilute attention ("context rot"). 8192 here is an
# illustrative budget; tune it per workload.
context_saturation = min(1.0, total_tokens / 8192)
# Calculate distribution of context types
type_counts = {}
for block in self.blocks:
type_name = block.type.name
if type_name not in type_counts:
type_counts[type_name] = 0
type_counts[type_name] += block.token_count
type_distribution = {
k: v / total_tokens if total_tokens > 0 else 0
for k, v in type_counts.items()
}
self.metrics = ContextMetrics(
total_tokens=total_tokens,
context_saturation=context_saturation,
type_distribution=type_distribution
)
def to_dict(self) -> Dict:
"""Convert to dictionary representation for serialization"""
return {
"session_id": self.session_id,
"agent_id": self.agent_id,
"blocks": [
{
"id": b.id,
"content": b.content,
"relevance_score": b.relevance_score,
"type": b.type.name,
"metadata": b.metadata,
"timestamp": b.timestamp,
"token_count": b.token_count,
"references": b.references
}
for b in self.blocks
],
"metrics": {
"total_tokens": self.metrics.total_tokens,
"context_saturation": self.metrics.context_saturation,
"type_distribution": self.metrics.type_distribution
},
"version": self.version,
"trace_id": self.trace_id
}
Now let’s implement a ContextManager class that handles context operations over this format:
import json
import heapq
from typing import Dict, List, Optional, Callable
class ContextManager:
"""Manages context operations using Model Context Protocol"""
def __init__(self,
max_tokens: int = 8192,
relevance_threshold: float = 0.2):
self.max_tokens = max_tokens
self.relevance_threshold = relevance_threshold
self.sessions: Dict[str, ContextPackage] = {}
def create_session(self, agent_id: str) -> str:
"""Create a new context session"""
session_id = str(uuid.uuid4())
self.sessions[session_id] = ContextPackage(
session_id=session_id,
agent_id=agent_id,
blocks=[],
metrics=ContextMetrics(
total_tokens=0,
context_saturation=0.0,
type_distribution={}
)
)
return session_id
def add_context(self,
session_id: str,
content: str,
context_type: ContextType,
relevance_score: float = 1.0,
metadata: Dict[str, str] = None) -> str:
"""Add context to an existing session"""
if session_id not in self.sessions:
raise ValueError(f"Session {session_id} does not exist")
block_id = str(uuid.uuid4())
block = ContextBlock(
id=block_id,
content=content,
relevance_score=relevance_score,
type=context_type,
metadata=metadata or {}
)
self.sessions[session_id].blocks.append(block)
self.sessions[session_id].calculate_metrics()
# If we've exceeded context window, perform context pruning
if self.sessions[session_id].metrics.context_saturation >= 0.9:
self._prune_context(session_id)
return block_id
def _prune_context(self, session_id: str) -> None:
"""Prune least relevant context to fit within token limits"""
session = self.sessions[session_id]
# Don't prune SYSTEM context
system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]
other_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
# Sort by relevance score (ascending)
other_blocks.sort(key=lambda x: x.relevance_score)
# Keep removing blocks until we're under target
system_tokens = sum(b.token_count for b in system_blocks)
target_tokens = int(self.max_tokens * 0.8) - system_tokens # Target 80% usage
current_tokens = sum(b.token_count for b in other_blocks)
while current_tokens > target_tokens and other_blocks:
removed_block = other_blocks.pop(0) # Remove least relevant
current_tokens -= removed_block.token_count
# Reconstitute the blocks list
session.blocks = system_blocks + other_blocks
session.calculate_metrics()
def get_formatted_context(self,
session_id: str,
formatter: Callable = None) -> str:
"""Get formatted context for model input"""
if session_id not in self.sessions:
raise ValueError(f"Session {session_id} does not exist")
session = self.sessions[session_id]
# Default formatter concatenates content with block type as separator
if formatter is None:
result = []
for block in session.blocks:
if block.relevance_score >= self.relevance_threshold:
result.append(f"[{block.type.name}]\n{block.content}")
return "\n\n".join(result)
return formatter(session)
def export_session(self, session_id: str) -> Dict:
"""Export session as serializable dict"""
if session_id not in self.sessions:
raise ValueError(f"Session {session_id} does not exist")
return self.sessions[session_id].to_dict()
def import_session(self, session_data: Dict) -> str:
"""Import a session from serialized data"""
session_id = session_data["session_id"]
blocks = []
for block_data in session_data["blocks"]:
blocks.append(ContextBlock(
id=block_data["id"],
content=block_data["content"],
relevance_score=block_data["relevance_score"],
type=ContextType[block_data["type"]],
metadata=block_data["metadata"],
timestamp=block_data["timestamp"],
token_count=block_data["token_count"],
references=block_data["references"]
))
metrics = ContextMetrics(
total_tokens=session_data["metrics"]["total_tokens"],
context_saturation=session_data["metrics"]["context_saturation"],
type_distribution=session_data["metrics"]["type_distribution"]
)
self.sessions[session_id] = ContextPackage(
session_id=session_id,
agent_id=session_data["agent_id"],
blocks=blocks,
metrics=metrics,
version=session_data["version"],
trace_id=session_data["trace_id"]
)
return session_id
Integration with Existing Agent Systems
Now let’s integrate the context layer into the multi-agent telecom customer service system from the previous article:
from typing import Dict, List, Optional, Union
import asyncio
import json
class ContextAwareAgent:
"""Base class for agents that use the PyContext context layer"""
def __init__(self,
llm_client,
agent_role: str,
context_manager: ContextManager = None):
self.llm = llm_client
self.role = agent_role
self.context_manager = context_manager or ContextManager()
self.session_id = None
async def initialize_session(self) -> str:
"""Initialize a new context session"""
self.session_id = self.context_manager.create_session(self.role)
# Add system prompt as SYSTEM context
system_prompt = await self._load_role_prompt()
self.context_manager.add_context(
session_id=self.session_id,
content=system_prompt,
context_type=ContextType.SYSTEM,
relevance_score=1.0, # System prompts always max relevance
metadata={"type": "system_prompt", "role": self.role}
)
return self.session_id
async def _load_role_prompt(self) -> str:
"""Load role-specific prompt - implement in subclasses"""
raise NotImplementedError()
async def add_user_context(self,
content: str,
metadata: Dict = None) -> str:
"""Add user input to context"""
if self.session_id is None:
await self.initialize_session()
return self.context_manager.add_context(
session_id=self.session_id,
content=content,
context_type=ContextType.USER,
relevance_score=0.9, # User context starts with high relevance
metadata=metadata or {}
)
async def add_memory_context(self,
content: str,
relevance_score: float,
metadata: Dict = None) -> str:
"""Add memory (from episodic or semantic memory) to context"""
if self.session_id is None:
await self.initialize_session()
return self.context_manager.add_context(
session_id=self.session_id,
content=content,
context_type=ContextType.MEMORY,
relevance_score=relevance_score,
metadata=metadata or {}
)
async def add_tool_context(self,
content: str,
tool_name: str,
metadata: Dict = None) -> str:
"""Add tool usage results to context"""
if self.session_id is None:
await self.initialize_session()
if metadata is None:
metadata = {}
metadata["tool_name"] = tool_name
return self.context_manager.add_context(
session_id=self.session_id,
content=content,
context_type=ContextType.TOOL,
relevance_score=0.8, # Tool outputs generally have high relevance
metadata=metadata
)
async def process_with_llm(self,
prompt: str = None) -> str:
"""Process the current context with LLM"""
if self.session_id is None:
await self.initialize_session()
formatted_context = self.context_manager.get_formatted_context(self.session_id)
if prompt:
# Add additional prompt as temporary context
formatted_context += f"\n\n[PROMPT]\n{prompt}"
# Call LLM with formatted context
response = await self.llm.generate(formatted_context)
# Add agent response to context
self.context_manager.add_context(
session_id=self.session_id,
content=response,
context_type=ContextType.AGENT,
relevance_score=0.9, # Agent responses are highly relevant
metadata={"type": "agent_response"}
)
return response
def export_context(self) -> Dict:
"""Export current context for transfer to another agent"""
if self.session_id is None:
raise ValueError("No active session")
return self.context_manager.export_session(self.session_id)
def import_context(self, context_data: Dict) -> None:
"""Import context from another agent"""
self.session_id = self.context_manager.import_session(context_data)
Now we can implement our specialized telecom agents on top of the context layer. Each is a thin subclass of ContextAwareAgent; here they are side by side:
-
class ContextAwareIntentAgent(ContextAwareAgent): """Intent analysis agent built on the context layer""" async def _load_role_prompt(self) -> str: return """You are an Intent Analysis Agent in a telecom customer service system. Your role is to precisely identify customer intent from queries, detect multiple or hidden intents, assess intent confidence, and identify required context for resolution.""" async def analyze_intent(self, query: str) -> Dict: """Analyze customer intent with managed context""" await self.add_user_context(query, {"type": "customer_query"}) analysis_prompt = """Based on the customer query above, provide the following analysis: 1. Primary Intent: The main customer goal 2. Secondary Intents: Additional or implied needs 3. Required Information: What we need to know to resolve this 4. Confidence Score: How certain are you (0-1) Return your analysis in JSON format. """ result = await self.process_with_llm(analysis_prompt) # Parse JSON response (in production, add error handling) try: analysis = json.loads(result) except json.JSONDecodeError: # Fallback: Extract manually using regex or prompt again analysis = { "primary_intent": "unknown", "secondary_intents": [], "required_information": [], "confidence": 0.5 } return analysis -
class ContextAwareTechnicalAgent(ContextAwareAgent): """Technical support agent built on the context layer""" def __init__(self, llm_client, network_api, context_manager=None): super().__init__(llm_client, "technical_agent", context_manager) self.network_api = network_api async def _load_role_prompt(self) -> str: return """You are a Technical Support Agent specializing in telecom issues. Your role is to diagnose technical issues from symptoms, design step-by-step troubleshooting plans, interpret diagnostic results, and recommend solutions.""" async def diagnose_issue(self, issue_description: str, customer_id: str) -> Dict: """Diagnose technical issue with managed context""" await self.add_user_context(issue_description, {"customer_id": customer_id}) # Add relevant customer technical data from memory network_data = await self.network_api.get_customer_network_data(customer_id) await self.add_memory_context( content=json.dumps(network_data), relevance_score=0.85, metadata={"type": "network_data", "customer_id": customer_id} ) # Run network diagnostics and add results to context diagnostics = await self.network_api.run_diagnostics(customer_id) await self.add_tool_context( content=json.dumps(diagnostics), tool_name="network_diagnostics", metadata={"customer_id": customer_id} ) analysis_prompt = """Based on the customer issue, network data, and diagnostic results, provide a technical analysis with: 1. Root Cause: The most likely cause of the issue 2. Recommended Solution: Step-by-step resolution plan 3. Alternative Solutions: Other approaches if the primary solution fails 4. Confidence: How certain are you about this diagnosis (0-1) Return your analysis in JSON format. """ result = await self.process_with_llm(analysis_prompt) try: analysis = json.loads(result) except json.JSONDecodeError: analysis = { "root_cause": "unknown", "recommended_solution": [], "alternative_solutions": [], "confidence": 0.5 } return analysis
Context Optimization Strategies
Agent systems operating at scale need advanced optimization strategies to ensure efficient use of context windows. Here are key optimization techniques the context layer enables:
Context Window Management
class ContextWindowOptimizer:
"""Optimizes context window usage using context-block metadata"""
def __init__(self, context_manager: ContextManager):
self.context_manager = context_manager
async def optimize_session(self, session_id: str, max_tokens: int = 4096) -> None:
"""Optimize context window to fit within token limit"""
if session_id not in self.context_manager.sessions:
raise ValueError(f"Session {session_id} does not exist")
session = self.context_manager.sessions[session_id]
# Calculate current usage
current_tokens = session.metrics.total_tokens
if current_tokens <= max_tokens:
return # Already within limits
# Sort blocks by type and relevance
typed_blocks = {
ContextType.SYSTEM: [],
ContextType.USER: [],
ContextType.AGENT: [],
ContextType.MEMORY: [],
ContextType.KNOWLEDGE: [],
ContextType.TOOL: []
}
for block in session.blocks:
typed_blocks[block.type].append(block)
# Always keep SYSTEM blocks
optimized_blocks = typed_blocks[ContextType.SYSTEM].copy()
used_tokens = sum(b.token_count for b in optimized_blocks)
# Keep most recent USER and AGENT blocks
for block_type in [ContextType.USER, ContextType.AGENT]:
blocks = sorted(typed_blocks[block_type], key=lambda b: b.timestamp, reverse=True)
for block in blocks:
if used_tokens + block.token_count <= max_tokens * 0.7: # Keep 30% for tools/memory
optimized_blocks.append(block)
used_tokens += block.token_count
# Use remaining space for TOOL, MEMORY and KNOWLEDGE blocks by relevance
remaining_types = [ContextType.TOOL, ContextType.MEMORY, ContextType.KNOWLEDGE]
remaining_blocks = []
for block_type in remaining_types:
remaining_blocks.extend(typed_blocks[block_type])
# Sort by relevance score
remaining_blocks.sort(key=lambda b: b.relevance_score, reverse=True)
for block in remaining_blocks:
if used_tokens + block.token_count <= max_tokens:
optimized_blocks.append(block)
used_tokens += block.token_count
# Update session with optimized blocks
session.blocks = optimized_blocks
session.calculate_metrics()
Contextual Relevance Scoring
For production systems, simple relevance scoring isn’t sufficient. The TF-IDF calculator below is a lightweight, dependency-light baseline. In 2026 you would typically swap it for embedding-based similarity (encode each block and the query with an embedding model and rank by cosine similarity in a vector store), which captures semantic relatedness that TF-IDF misses. The interface stays the same; here’s the baseline:
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class ContextualRelevanceCalculator:
"""Calculates contextual relevance between blocks using TF-IDF"""
def __init__(self):
self.vectorizer = TfidfVectorizer(stop_words='english')
def calculate_relevance(self,
query: str,
context_blocks: List[ContextBlock]) -> Dict[str, float]:
"""Calculate relevance scores between query and context blocks"""
texts = [query] + [block.content for block in context_blocks]
# Handle empty texts
cleaned_texts = [t if t.strip() else "empty" for t in texts]
try:
# Transform texts to TF-IDF vectors
tfidf_matrix = self.vectorizer.fit_transform(cleaned_texts)
# Calculate cosine similarity between query and each block
query_vector = tfidf_matrix[0:1]
block_vectors = tfidf_matrix[1:]
similarities = cosine_similarity(query_vector, block_vectors).flatten()
# Create mapping of block IDs to relevance scores
relevance_scores = {}
for i, block in enumerate(context_blocks):
relevance_scores[block.id] = float(similarities[i])
return relevance_scores
except Exception as e:
# Fallback to default scores if vectorization fails
return {block.id: 0.5 for block in context_blocks}
def update_relevance_scores(self,
query: str,
context_manager: ContextManager,
session_id: str) -> None:
"""Update relevance scores for all blocks in a session"""
if session_id not in context_manager.sessions:
raise ValueError(f"Session {session_id} does not exist")
session = context_manager.sessions[session_id]
non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
relevance_scores = self.calculate_relevance(query, non_system_blocks)
# Update blocks with new scores
for block in non_system_blocks:
if block.id in relevance_scores:
block.relevance_score = relevance_scores[block.id]
# System blocks always keep max relevance
for block in session.blocks:
if block.type == ContextType.SYSTEM:
block.relevance_score = 1.0
Performance Benchmarking and Optimization
Let’s implement a benchmarking suite for the context layer:
import time
import statistics
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, List, Tuple
@dataclass
class BenchmarkResult:
operation: str
times: List[float]
token_counts: List[int]
@property
def mean_time(self) -> float:
return statistics.mean(self.times)
@property
def p95_time(self) -> float:
return statistics.quantiles(self.times, n=20)[18] # 95th percentile
@property
def tokens_per_second(self) -> float:
total_tokens = sum(self.token_counts)
total_time = sum(self.times)
return total_tokens / total_time if total_time > 0 else 0
class ContextBenchmark:
"""Benchmark performance of context-layer operations"""
def __init__(self, context_manager: ContextManager):
self.context_manager = context_manager
self.results = {}
async def run_add_context_benchmark(self,
iterations: int = 100,
content_size: int = 500) -> BenchmarkResult:
"""Benchmark context addition operations"""
session_id = self.context_manager.create_session("benchmark")
times = []
token_counts = []
for i in range(iterations):
# Generate content of specified size
content = f"Benchmark content iteration {i} " + "word " * content_size
start_time = time.time()
block_id = self.context_manager.add_context(
session_id=session_id,
content=content,
context_type=ContextType.MEMORY,
relevance_score=0.5
)
end_time = time.time()
elapsed = end_time - start_time
times.append(elapsed)
# Get token count
block = next(b for b in self.context_manager.sessions[session_id].blocks
if b.id == block_id)
token_counts.append(block.token_count)
result = BenchmarkResult(
operation="add_context",
times=times,
token_counts=token_counts
)
self.results["add_context"] = result
return result
async def run_format_context_benchmark(self,
iterations: int = 100,
block_count: int = 20) -> BenchmarkResult:
"""Benchmark context formatting operations"""
session_id = self.context_manager.create_session("benchmark")
# Prepare session with specified number of blocks
for i in range(block_count):
content = f"Benchmark content block {i} " + "word " * 100
self.context_manager.add_context(
session_id=session_id,
content=content,
context_type=ContextType.MEMORY,
relevance_score=0.5
)
times = []
token_counts = []
for i in range(iterations):
start_time = time.time()
formatted = self.context_manager.get_formatted_context(session_id)
end_time = time.time()
elapsed = end_time - start_time
times.append(elapsed)
# Approximate token count of formatted output
token_counts.append(len(formatted.split()))
result = BenchmarkResult(
operation="format_context",
times=times,
token_counts=token_counts
)
self.results["format_context"] = result
return result
async def run_prune_context_benchmark(self,
iterations: int = 100,
initial_blocks: int = 100) -> BenchmarkResult:
"""Benchmark context pruning operations"""
times = []
token_counts = []
for i in range(iterations):
# Create new session for each iteration
session_id = self.context_manager.create_session("benchmark")
# Add initial blocks
for j in range(initial_blocks):
content = f"Benchmark content block {j} " + "word " * 50
self.context_manager.add_context(
session_id=session_id,
content=content,
context_type=ContextType.MEMORY,
relevance_score=j/initial_blocks # Vary relevance
)
# Force context saturation to trigger pruning
self.context_manager.sessions[session_id].metrics.context_saturation = 0.95
# Measure pruning operation
start_time = time.time()
self.context_manager._prune_context(session_id)
end_time = time.time()
elapsed = end_time - start_time
times.append(elapsed)
# Count tokens in pruned context
token_count = sum(b.token_count for b in
self.context_manager.sessions[session_id].blocks)
token_counts.append(token_count)
result = BenchmarkResult(
operation="prune_context",
times=times,
token_counts=token_counts
)
self.results["prune_context"] = result
return result
def plot_results(self, save_path: str = None) -> None:
"""Plot benchmark results"""
if not self.results:
raise ValueError("No benchmark results to plot")
fig, axs = plt.subplots(1, 3, figsize=(18, 6))
operations = list(self.results.keys())
# Plot 1: Mean operation time
mean_times = [self.results[op].mean_time for op in operations]
axs[0].bar(operations, mean_times)
axs[0].set_title('Mean Operation Time (s)')
axs[0].set_ylabel('Seconds')
# Plot 2: 95th percentile operation time
p95_times = [self.results[op].p95_time for op in operations]
axs[1].bar(operations, p95_times)
axs[1].set_title('P95 Operation Time (s)')
axs[1].set_ylabel('Seconds')
# Plot 3: Tokens per second
tps = [self.results[op].tokens_per_second for op in operations]
axs[2].bar(operations, tps)
axs[2].set_title('Throughput (tokens/s)')
axs[2].set_ylabel('Tokens per Second')
plt.tight_layout()
if save_path:
plt.savefig(save_path)
plt.show()
Running the Context Layer in Production
Running this in production requires careful consideration of scaling, memory management, and performance optimization. Here’s how to implement these features in real-world systems:
Distributed Context Store
For large-scale deployments, a centralized context manager won’t suffice. Here’s a distributed context store that scales horizontally:
import redis
import json
from typing import Dict, List, Optional
class DistributedContextStore:
"""Distributed context store using Redis (JSON serialization)"""
@staticmethod
def _block_to_dict(block: ContextBlock) -> Dict:
return {
"id": block.id,
"content": block.content,
"relevance_score": block.relevance_score,
"type": block.type.name,
"metadata": block.metadata,
"timestamp": block.timestamp,
"token_count": block.token_count,
"references": block.references,
}
@staticmethod
def _block_from_dict(d: Dict) -> ContextBlock:
return ContextBlock(
id=d["id"],
content=d["content"],
relevance_score=d["relevance_score"],
type=ContextType[d["type"]],
metadata=d["metadata"],
timestamp=d["timestamp"],
token_count=d["token_count"],
references=d["references"],
)
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl # Default TTL in seconds
def _session_key(self, session_id: str) -> str:
"""Generate Redis key for a session"""
return f"pyctx:session:{session_id}"
def _block_key(self, session_id: str, block_id: str) -> str:
"""Generate Redis key for a block"""
return f"pyctx:block:{session_id}:{block_id}"
def store_session(self, session: ContextPackage) -> None:
"""Store session metadata in Redis"""
session_key = self._session_key(session.session_id)
# Store main session metadata. Avoid pickle: deserializing pickled
# data from a shared store is a remote-code-execution risk. JSON keeps
# the payload language-agnostic and safe to read from any service.
session_meta = {
"agent_id": session.agent_id,
"version": str(session.version),
"trace_id": session.trace_id,
"metrics": json.dumps({
"total_tokens": session.metrics.total_tokens,
"context_saturation": session.metrics.context_saturation,
"type_distribution": session.metrics.type_distribution,
}),
"block_ids": json.dumps([b.id for b in session.blocks]),
}
# Store in Redis with a pipeline for performance
pipe = self.redis.pipeline()
pipe.hset(session_key, mapping=session_meta)
pipe.expire(session_key, self.ttl)
# Store each block separately for efficient partial updates
for block in session.blocks:
block_key = self._block_key(session.session_id, block.id)
pipe.set(block_key, json.dumps(self._block_to_dict(block)), ex=self.ttl)
pipe.execute()
def retrieve_session(self, session_id: str) -> Optional[ContextPackage]:
"""Retrieve complete session from Redis"""
session_key = self._session_key(session_id)
# Get session metadata
session_data = self.redis.hgetall(session_key)
if not session_data:
return None
# Decode metrics
metrics_raw = json.loads(session_data[b"metrics"])
metrics = ContextMetrics(
total_tokens=metrics_raw["total_tokens"],
context_saturation=metrics_raw["context_saturation"],
type_distribution=metrics_raw["type_distribution"],
)
# Get block IDs and retrieve blocks
block_ids = json.loads(session_data[b"block_ids"])
pipe = self.redis.pipeline()
for block_id in block_ids:
pipe.get(self._block_key(session_id, block_id))
blocks = [
self._block_from_dict(json.loads(raw))
for raw in pipe.execute() if raw
]
# Reconstruct session
return ContextPackage(
session_id=session_id,
agent_id=session_data[b"agent_id"].decode(),
blocks=blocks,
metrics=metrics,
version=int(session_data[b"version"]),
trace_id=session_data[b"trace_id"].decode()
)
def update_block(self, session_id: str, block: ContextBlock) -> None:
"""Update a specific block in a session"""
block_key = self._block_key(session_id, block.id)
self.redis.set(block_key, json.dumps(self._block_to_dict(block)), ex=self.ttl)
def delete_session(self, session_id: str) -> None:
"""Delete a session and all its blocks"""
session_key = self._session_key(session_id)
# Get block IDs
session_data = self.redis.hgetall(session_key)
if not session_data:
return
block_ids = json.loads(session_data[b"block_ids"])
# Delete all blocks and session
pipe = self.redis.pipeline()
for block_id in block_ids:
block_key = self._block_key(session_id, block_id)
pipe.delete(block_key)
pipe.delete(session_key)
pipe.execute()
Real-time Context Optimization
In high-volume systems, real-time context optimization becomes critical for cost and performance reasons:
import asyncio
import numpy as np
from collections import deque
class RealTimeContextOptimizer:
"""Real-time context optimization strategies for production systems"""
def __init__(self,
context_manager: ContextManager,
token_budget: int = 4096,
optimization_interval: float = 0.1, # seconds
relevance_decay_factor: float = 0.95):
self.context_manager = context_manager
self.token_budget = token_budget
self.optimization_interval = optimization_interval
self.relevance_decay_factor = relevance_decay_factor
self.running = False
self.performance_metrics = deque(maxlen=1000) # Keep last 1000 metrics
async def start_optimization_loop(self, session_id: str) -> None:
"""Start continuous optimization in background"""
self.running = True
try:
while self.running:
start_time = time.time()
# Perform optimization
await self.optimize_session(session_id)
# Measure performance
end_time = time.time()
self.performance_metrics.append(end_time - start_time)
# Sleep until next interval
await asyncio.sleep(self.optimization_interval)
except Exception as e:
print(f"Optimization loop error: {e}")
self.running = False
def stop_optimization_loop(self) -> None:
"""Stop background optimization"""
self.running = False
async def optimize_session(self, session_id: str) -> None:
"""Perform a single optimization cycle"""
if session_id not in self.context_manager.sessions:
return
session = self.context_manager.sessions[session_id]
# Apply time-based relevance decay
self._apply_relevance_decay(session)
# Update relevance based on recency
self._update_recency_relevance(session)
# Enforce token budget
if session.metrics.total_tokens > self.token_budget:
self._enforce_token_budget(session)
# Re-calculate metrics
session.calculate_metrics()
def _apply_relevance_decay(self, session: ContextPackage) -> None:
"""Apply time-based decay to relevance scores"""
current_time = int(time.time() * 1000)
for block in session.blocks:
# Don't decay system blocks
if block.type == ContextType.SYSTEM:
continue
# Calculate age in seconds
age_seconds = (current_time - block.timestamp) / 1000
# Apply exponential decay
decay = self.relevance_decay_factor ** (age_seconds / 60) # Decay per minute
block.relevance_score *= decay
# Ensure minimum relevance
block.relevance_score = max(0.1, block.relevance_score)
def _update_recency_relevance(self, session: ContextPackage) -> None:
"""Boost relevance of recent conversational turns"""
# Sort blocks by timestamp
recent_blocks = sorted(
[b for b in session.blocks if b.type in (ContextType.USER, ContextType.AGENT)],
key=lambda b: b.timestamp,
reverse=True
)
# Boost most recent conversation turns
for i, block in enumerate(recent_blocks[:10]): # Consider last 10 turns
recency_boost = 0.95 ** i # Exponential decay based on recency
block.relevance_score = min(1.0, block.relevance_score + recency_boost)
def _enforce_token_budget(self, session: ContextPackage) -> None:
"""Ensure token count stays within budget"""
# Priority order: SYSTEM > recent USER/AGENT > TOOL > MEMORY > KNOWLEDGE
system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]
# Get non-system blocks, sorted by relevance
non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
non_system_blocks.sort(key=lambda b: b.relevance_score, reverse=True)
# Calculate tokens in system blocks
system_tokens = sum(b.token_count for b in system_blocks)
# Calculate how many tokens we can use for non-system blocks
available_tokens = self.token_budget - system_tokens
# Keep adding blocks until we hit the limit
kept_blocks = system_blocks.copy()
used_tokens = system_tokens
for block in non_system_blocks:
if used_tokens + block.token_count <= self.token_budget:
kept_blocks.append(block)
used_tokens += block.token_count
# else discard this block
# Update session blocks
session.blocks = kept_blocks
Hypothetical Implementation Example: Telecom Customer Service
Let’s explore a hypothetical scenario showing how disciplined context engineering could transform a telecom customer service system handling 50,000+ customer interactions daily.
Hypothetical Baseline (Before Optimization)
Consider a telecom company using a conventional LLM-based customer service system with these theoretical performance characteristics:
- Average completion time: 12.5 seconds per query
- Context window utilization: 32% (wasted tokens)
- Coherence over multi-turn conversations: 68% (measured by user satisfaction)
- Agent handoff success rate: 52% (context lost during transfers)
- Daily token costs: $4,200 (for 50,000 interactions)
Hypothetical Optimized Approach
In this scenario, we could wire in the context layer and optimize context management with the following approach:
async def telecom_service_enhancement():
# Initialize context-layer components
redis_url = "redis://redis-master.production:6379/0"
context_store = DistributedContextStore(redis_url)
# Create optimized context manager
optimized_manager = ContextManager(max_tokens=8192)
# Inject relevance calculator
relevance_calculator = ContextualRelevanceCalculator()
# Initialize real-time optimizer
optimizer = RealTimeContextOptimizer(
context_manager=optimized_manager,
token_budget=4096,
optimization_interval=0.05,
relevance_decay_factor=0.98
)
# Create context-aware agents
intent_agent = ContextAwareIntentAgent(
llm_client=anthropic_client,
agent_role="intent_agent",
context_manager=optimized_manager
)
technical_agent = ContextAwareTechnicalAgent(
llm_client=anthropic_client,
network_api=network_api,
context_manager=optimized_manager
)
# Create coordinator with context transfer capabilities
coordinator = AgentCoordinator(
agents={
"intent": intent_agent,
"technical": technical_agent,
# Other specialized agents
},
context_manager=optimized_manager,
context_store=context_store
)
# Initialize metrics collection
metrics_collector = MetricsCollector(
prometheus_endpoint="http://prometheus.monitoring:9090/metrics"
)
# Start service
return await start_service(coordinator, metrics_collector)
Projected Results (Illustrative)
Based on the architecture described above, this hypothetical system could plausibly move each metric in the right direction. I am deliberately not putting hard numbers here: I never ran the controlled benchmark that would justify them, and inventing precise percentages would be exactly the kind of false precision this post should avoid. Directionally, you would expect:
- Lower average completion time, from smaller, better-targeted prompts
- Higher useful context-window utilization, with fewer wasted tokens
- Better multi-turn coherence, from relevance-aware retention
- Higher agent-handoff success, from structured context transfer
- Lower daily token cost, from pruning and deduplication
These improvements would result from:
- Efficient context packaging and transmission between agents
- Dynamic relevance scoring to prioritize important information
- Structured context exchange enabling seamless agent handoffs
- Automatic optimization of context window utilization
- Reduced token waste through intelligent pruning
Key Performance Considerations
To run a context layer like this in your own production system, consider these performance best practices:
-
class ContextMemoryOptimizer: """Optimizes memory usage for the context layer in production""" def __init__(self, context_manager: ContextManager): self.context_manager = context_manager def optimize_memory_usage(self, session_id: str) -> float: """Optimize memory usage and return memory saved in MB""" session = self.context_manager.sessions[session_id] # Calculate current memory usage initial_memory = self._estimate_memory_usage(session) # Perform optimizations self._deduplicate_content(session) self._compress_metadata(session) self._truncate_long_blocks(session) # Calculate new memory usage final_memory = self._estimate_memory_usage(session) # Return memory saved in MB return (initial_memory - final_memory) / (1024 * 1024) def _estimate_memory_usage(self, session: ContextPackage) -> int: """Estimate memory usage in bytes""" memory_usage = 0 # Session metadata memory_usage += len(session.session_id) + len(session.agent_id) + 16 # Base overhead # Blocks for block in session.blocks: # Content is the main memory user memory_usage += len(block.content) * 2 # Unicode overhead # Metadata if block.metadata: for key, value in block.metadata.items(): memory_usage += len(key) + len(value) # Other fields memory_usage += 64 # Base block overhead return memory_usage def _deduplicate_content(self, session: ContextPackage) -> None: """Remove duplicate content in blocks""" content_set = set() blocks_to_keep = [] for block in session.blocks: # Always keep system blocks if block.type == ContextType.SYSTEM: blocks_to_keep.append(block) continue # Check for duplicate content if block.content in content_set: continue # Skip duplicate content_set.add(block.content) blocks_to_keep.append(block) session.blocks = blocks_to_keep def _compress_metadata(self, session: ContextPackage) -> None: """Compress metadata by removing unnecessary fields""" for block in session.blocks: if not block.metadata: continue # Remove empty values block.metadata = {k: v for k, v in block.metadata.items() if v} # Truncate long values for key, value in block.metadata.items(): if len(value) > 100: block.metadata[key] = value[:97] + "..." def _truncate_long_blocks(self, session: ContextPackage) -> None: """Truncate extremely long content blocks""" for block in session.blocks: # Truncate blocks longer than 1000 tokens (approximately) if block.token_count > 1000: words = block.content.split() truncated_content = " ".join(words[:950]) + " [... content truncated ...]" block.content = truncated_content block.token_count = len(truncated_content.split()) -
For high-throughput systems, managing concurrency is critical:
import asyncio from contextlib import asynccontextmanager class ContextConcurrencyManager: """Manages concurrent access to context resources""" def __init__(self, max_concurrent_sessions: int = 1000, max_concurrent_contexts: int = 5000): self.session_semaphore = asyncio.Semaphore(max_concurrent_sessions) self.context_semaphore = asyncio.Semaphore(max_concurrent_contexts) self.session_locks = {} @asynccontextmanager async def session_context(self, session_id: str): """Manage concurrent access to a session""" # Create lock for this session if doesn't exist if session_id not in self.session_locks: self.session_locks[session_id] = asyncio.Lock() # Acquire session semaphore and lock async with self.session_semaphore: async with self.session_locks[session_id]: yield @asynccontextmanager async def context_operation(self): """Manage concurrent context operations""" async with self.context_semaphore: yield def cleanup_session(self, session_id: str): """Remove locks for a session when it's no longer needed""" if session_id in self.session_locks: del self.session_locks[session_id]
Production Deployment Strategy
To deploy a context layer like this in production, I recommend this phased approach:
- Pilot phase: Roll the context layer out to a single agent type with low traffic
- Gradual rollout: Extend to specialized agents one by one
- A/B testing: Compare performance metrics between the context-managed and baseline systems
- Full deployment: Scale horizontally with distributed context stores
- Continuous optimization: Implement real-time monitors to tune parameters
The pilot deployment may look like this:
async def pilot_deployment():
# Initialize distributed components with lower capacity
redis_url = "redis://redis-staging:6379/0"
context_store = DistributedContextStore(redis_url, ttl=1800) # 30 minute TTL
# Create context manager with conservative limits
context_manager = ContextManager(max_tokens=4096)
# Configure for 5% of traffic
traffic_ratio = 0.05
# Create context-aware intent agent
intent_agent = ContextAwareIntentAgent(
llm_client=anthropic_client,
agent_role="intent_agent",
context_manager=context_manager
)
# Create monitoring
monitor = ProductionMonitor(
datadog_api_key="YOUR_API_KEY",
experiment_name="pycontext_pilot",
sample_rate=0.1 # Sample 10% of interactions for detailed analysis
)
# Start pilot with traffic splitting
return await start_pilot(
agent=intent_agent,
context_store=context_store,
traffic_ratio=traffic_ratio,
monitor=monitor
)
Concluding thoughts
Deliberate context engineering is one of the highest-leverage parts of multi-agent system design. A typed context-packaging layer like the one here addresses context optimization, agent collaboration, and memory management directly, where ad-hoc string passing falls apart at scale.
Key takeaways from our implementation:
- A shared format matters: A common context schema enables interoperability between diverse agent systems
- Memory optimization is critical: Real-time context management directly impacts cost and performance
- Production deployments require careful scaling: Distributed context stores and concurrency management are essential
- Relevance scoring drives optimization: Dynamic scoring algorithms significantly improve context window utilization
While this telecom example is hypothetical, the architectural patterns described show how disciplined context management could deliver tangible business benefits through reduced costs, faster responses, and improved customer satisfaction in real-world applications.
As autonomous agent systems continue to evolve, this kind of context discipline pairs naturally with connectivity standards like MCP: MCP brings tools and data to your agents, and a context layer decides what actually makes it into each prompt and across each handoff.
Future directions for this layer include:
- Cross-modal context representation: Supporting efficient encoding of multimodal content
- Federated context management: Enabling privacy-preserving context sharing across organizations
- Self-optimizing context strategies: Using reinforcement learning to dynamically tune context parameters
By adding a context layer like this to your own systems, you can pursue similar performance improvements while establishing a foundation for future enhancements to your agent architecture.
Want to go deeper on context-aware agent architectures? Check out my previous articles on autonomous multi-agent systems and context-aware data pipelines.