Type to search posts and projects to navigate

Context Engineering for Autonomous Multi-Agent Systems - Packaging, Pruning, and Transferring Context at Scale

Updated June 2026. When I first published this in March 2025, I framed it around the term “Model Context Protocol.” That was a mistake: “MCP” now refers to Anthropic’s open standard for connecting models to tools and data, which is a different (and complementary) concern. This revision renames the home-grown layer described here, keeps the engineering intact, and adds a section on where the two fit together. See How this relates to MCP below.

After exploring the architecture and implementation of autonomous multi-agent systems for telecom customer service in my previous article, I want to address one of the hardest problems I outlined: context engineering. How do you decide what goes into a model’s limited context window, how do you keep it relevant as a conversation grows, and how do you transfer it cleanly when one agent hands off to another? This post builds a small context-packaging layer (the basis of my PyContext library) to answer those questions.

An initial implementation of this context-management layer for autonomous multi-agent systems is available here. Contributions are most welcome.


The problem: context is a scarce, shared resource

In a multi-agent system, the context window is the workspace every agent shares. Even as windows have grown to hundreds of thousands of tokens, what you put in that window still determines cost, latency, and answer quality. Four problems recur:

  1. Heterogeneous agents need a common way to exchange context, not ad-hoc string concatenation
  2. Transmission cost grows with every token moved between agents, so payloads need minimizing
  3. Not all context is equal, so blocks need semantic tagging for routing and prioritization
  4. Multi-step operations need versioned context so handoffs are reproducible

The layer in this post (I call it PyContext) addresses these with a typed context-package format, relevance-based pruning, and a distributed store. It is a library pattern, not an industry standard, and it is deliberately small. Let’s dig into the implementation.

How this relates to MCP

If you arrived here searching for the Model Context Protocol, here is the short version. MCP is an open JSON-RPC standard, introduced by Anthropic in late 2024 and now supported across Anthropic, OpenAI, Google, Microsoft, and most major developer tools. It defines a host/client/server model with three primitives: tools (actions a model can call), resources (read-only context a server exposes), and prompts (reusable templates), over stdio and Streamable HTTP transports with OAuth 2.1 auth.

MCP answers a different question than this post. MCP is about connectivity: how a model or agent reaches external tools and data in a standard way. The layer here is about economics: once that context is available, what actually goes into the limited window, how it is scored and pruned, and how it moves between agents. The two compose cleanly. An MCP server (or several) feeds resources and tool results in; a context-packaging layer like the one below decides what survives into the prompt and travels across a handoff. If you are building agents in 2026, you almost certainly want both.


Technical Implementation


Core Protocol Definition

At its heart, the layer defines a protocol-buffer schema that any agent in the system can produce and consume. Here’s a simplified version of the core format:

syntax = "proto3";

package pycontext;

message ContextBlock {
  string id = 1;
  uint64 timestamp = 2;
  string content = 3;
  float relevance_score = 4;
  map<string, string> metadata = 5;
  ContextType type = 6;
  uint32 token_count = 7;
  repeated string references = 8;
}

enum ContextType {
  SYSTEM = 0;
  USER = 1;
  AGENT = 2;
  MEMORY = 3;
  KNOWLEDGE = 4;
  TOOL = 5;
}

message ContextPackage {
  string session_id = 1;
  string agent_id = 2;
  repeated ContextBlock blocks = 3;
  ContextMetrics metrics = 4;
  uint32 version = 5;
  string trace_id = 6;
}

message ContextMetrics {
  uint32 total_tokens = 1;
  float context_saturation = 2;
  map<string, float> type_distribution = 3;
}

This protocol definition enables serialized context transmission across agent boundaries while maintaining critical metadata that informs context utilization decisions.



Python Implementation

Let’s implement a Python client for this format that we can use across the agent architecture:

import time
import uuid
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum

class ContextType(Enum):
    SYSTEM = 0
    USER = 1
    AGENT = 2
    MEMORY = 3
    KNOWLEDGE = 4
    TOOL = 5

@dataclass
class ContextBlock:
    id: str
    content: str
    relevance_score: float
    type: ContextType
    metadata: Dict[str, str] = None
    timestamp: int = None
    token_count: int = None
    references: List[str] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = int(time.time() * 1000)
        if self.metadata is None:
            self.metadata = {}
        if self.references is None:
            self.references = []
        if self.token_count is None:
            # Rough word-count proxy for token count.
            # In production, use the model's tokenizer or a count-tokens
            # endpoint (e.g. tiktoken, or the provider's token-counting API);
            # whitespace splitting under-counts real tokens by 15-30%.
            self.token_count = len(self.content.split())

@dataclass
class ContextMetrics:
    total_tokens: int
    context_saturation: float
    type_distribution: Dict[str, float]

@dataclass
class ContextPackage:
    session_id: str
    agent_id: str
    blocks: List[ContextBlock]
    metrics: ContextMetrics
    version: int = 1
    trace_id: str = None

    def __post_init__(self):
        if self.trace_id is None:
            self.trace_id = str(uuid.uuid4())

    def calculate_metrics(self) -> None:
        """Calculate metrics based on current context blocks"""
        total_tokens = sum(block.token_count for block in self.blocks)

        # Saturation is measured against a working budget, not the model's
        # hard limit. Modern models offer 200K-1M token windows, but you still
        # cap a working window: cost scales with tokens, latency grows, and
        # very long contexts dilute attention ("context rot"). 8192 here is an
        # illustrative budget; tune it per workload.
        context_saturation = min(1.0, total_tokens / 8192)

        # Calculate distribution of context types
        type_counts = {}
        for block in self.blocks:
            type_name = block.type.name
            if type_name not in type_counts:
                type_counts[type_name] = 0
            type_counts[type_name] += block.token_count

        type_distribution = {
            k: v / total_tokens if total_tokens > 0 else 0
            for k, v in type_counts.items()
        }

        self.metrics = ContextMetrics(
            total_tokens=total_tokens,
            context_saturation=context_saturation,
            type_distribution=type_distribution
        )

    def to_dict(self) -> Dict:
        """Convert to dictionary representation for serialization"""
        return {
            "session_id": self.session_id,
            "agent_id": self.agent_id,
            "blocks": [
                {
                    "id": b.id,
                    "content": b.content,
                    "relevance_score": b.relevance_score,
                    "type": b.type.name,
                    "metadata": b.metadata,
                    "timestamp": b.timestamp,
                    "token_count": b.token_count,
                    "references": b.references
                }
                for b in self.blocks
            ],
            "metrics": {
                "total_tokens": self.metrics.total_tokens,
                "context_saturation": self.metrics.context_saturation,
                "type_distribution": self.metrics.type_distribution
            },
            "version": self.version,
            "trace_id": self.trace_id
        }



Now let’s implement a ContextManager class that handles context operations over this format:


import json
import heapq
from typing import Dict, List, Optional, Callable

class ContextManager:
    """Manages context operations using Model Context Protocol"""

    def __init__(self,
                max_tokens: int = 8192,
                relevance_threshold: float = 0.2):
        self.max_tokens = max_tokens
        self.relevance_threshold = relevance_threshold
        self.sessions: Dict[str, ContextPackage] = {}

    def create_session(self, agent_id: str) -> str:
        """Create a new context session"""
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = ContextPackage(
            session_id=session_id,
            agent_id=agent_id,
            blocks=[],
            metrics=ContextMetrics(
                total_tokens=0,
                context_saturation=0.0,
                type_distribution={}
            )
        )
        return session_id

    def add_context(self,
                   session_id: str,
                   content: str,
                   context_type: ContextType,
                   relevance_score: float = 1.0,
                   metadata: Dict[str, str] = None) -> str:
        """Add context to an existing session"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")

        block_id = str(uuid.uuid4())
        block = ContextBlock(
            id=block_id,
            content=content,
            relevance_score=relevance_score,
            type=context_type,
            metadata=metadata or {}
        )

        self.sessions[session_id].blocks.append(block)
        self.sessions[session_id].calculate_metrics()

        # If we've exceeded context window, perform context pruning
        if self.sessions[session_id].metrics.context_saturation >= 0.9:
            self._prune_context(session_id)

        return block_id

    def _prune_context(self, session_id: str) -> None:
        """Prune least relevant context to fit within token limits"""
        session = self.sessions[session_id]

        # Don't prune SYSTEM context
        system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]
        other_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]

        # Sort by relevance score (ascending)
        other_blocks.sort(key=lambda x: x.relevance_score)

        # Keep removing blocks until we're under target
        system_tokens = sum(b.token_count for b in system_blocks)
        target_tokens = int(self.max_tokens * 0.8) - system_tokens  # Target 80% usage

        current_tokens = sum(b.token_count for b in other_blocks)
        while current_tokens > target_tokens and other_blocks:
            removed_block = other_blocks.pop(0)  # Remove least relevant
            current_tokens -= removed_block.token_count

        # Reconstitute the blocks list
        session.blocks = system_blocks + other_blocks
        session.calculate_metrics()

    def get_formatted_context(self,
                             session_id: str,
                             formatter: Callable = None) -> str:
        """Get formatted context for model input"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")

        session = self.sessions[session_id]

        # Default formatter concatenates content with block type as separator
        if formatter is None:
            result = []
            for block in session.blocks:
                if block.relevance_score >= self.relevance_threshold:
                    result.append(f"[{block.type.name}]\n{block.content}")
            return "\n\n".join(result)

        return formatter(session)

    def export_session(self, session_id: str) -> Dict:
        """Export session as serializable dict"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")

        return self.sessions[session_id].to_dict()

    def import_session(self, session_data: Dict) -> str:
        """Import a session from serialized data"""
        session_id = session_data["session_id"]

        blocks = []
        for block_data in session_data["blocks"]:
            blocks.append(ContextBlock(
                id=block_data["id"],
                content=block_data["content"],
                relevance_score=block_data["relevance_score"],
                type=ContextType[block_data["type"]],
                metadata=block_data["metadata"],
                timestamp=block_data["timestamp"],
                token_count=block_data["token_count"],
                references=block_data["references"]
            ))

        metrics = ContextMetrics(
            total_tokens=session_data["metrics"]["total_tokens"],
            context_saturation=session_data["metrics"]["context_saturation"],
            type_distribution=session_data["metrics"]["type_distribution"]
        )

        self.sessions[session_id] = ContextPackage(
            session_id=session_id,
            agent_id=session_data["agent_id"],
            blocks=blocks,
            metrics=metrics,
            version=session_data["version"],
            trace_id=session_data["trace_id"]
        )

        return session_id


Integration with Existing Agent Systems

Now let’s integrate the context layer into the multi-agent telecom customer service system from the previous article:

from typing import Dict, List, Optional, Union
import asyncio
import json

class ContextAwareAgent:
    """Base class for agents that use the PyContext context layer"""

    def __init__(self,
                llm_client,
                agent_role: str,
                context_manager: ContextManager = None):
        self.llm = llm_client
        self.role = agent_role
        self.context_manager = context_manager or ContextManager()
        self.session_id = None

    async def initialize_session(self) -> str:
        """Initialize a new context session"""
        self.session_id = self.context_manager.create_session(self.role)

        # Add system prompt as SYSTEM context
        system_prompt = await self._load_role_prompt()
        self.context_manager.add_context(
            session_id=self.session_id,
            content=system_prompt,
            context_type=ContextType.SYSTEM,
            relevance_score=1.0,  # System prompts always max relevance
            metadata={"type": "system_prompt", "role": self.role}
        )

        return self.session_id

    async def _load_role_prompt(self) -> str:
        """Load role-specific prompt - implement in subclasses"""
        raise NotImplementedError()

    async def add_user_context(self,
                              content: str,
                              metadata: Dict = None) -> str:
        """Add user input to context"""
        if self.session_id is None:
            await self.initialize_session()

        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.USER,
            relevance_score=0.9,  # User context starts with high relevance
            metadata=metadata or {}
        )

    async def add_memory_context(self,
                                content: str,
                                relevance_score: float,
                                metadata: Dict = None) -> str:
        """Add memory (from episodic or semantic memory) to context"""
        if self.session_id is None:
            await self.initialize_session()

        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.MEMORY,
            relevance_score=relevance_score,
            metadata=metadata or {}
        )

    async def add_tool_context(self,
                              content: str,
                              tool_name: str,
                              metadata: Dict = None) -> str:
        """Add tool usage results to context"""
        if self.session_id is None:
            await self.initialize_session()

        if metadata is None:
            metadata = {}
        metadata["tool_name"] = tool_name

        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.TOOL,
            relevance_score=0.8,  # Tool outputs generally have high relevance
            metadata=metadata
        )

    async def process_with_llm(self,
                              prompt: str = None) -> str:
        """Process the current context with LLM"""
        if self.session_id is None:
            await self.initialize_session()

        formatted_context = self.context_manager.get_formatted_context(self.session_id)

        if prompt:
            # Add additional prompt as temporary context
            formatted_context += f"\n\n[PROMPT]\n{prompt}"

        # Call LLM with formatted context
        response = await self.llm.generate(formatted_context)

        # Add agent response to context
        self.context_manager.add_context(
            session_id=self.session_id,
            content=response,
            context_type=ContextType.AGENT,
            relevance_score=0.9,  # Agent responses are highly relevant
            metadata={"type": "agent_response"}
        )

        return response

    def export_context(self) -> Dict:
        """Export current context for transfer to another agent"""
        if self.session_id is None:
            raise ValueError("No active session")

        return self.context_manager.export_session(self.session_id)

    def import_context(self, context_data: Dict) -> None:
        """Import context from another agent"""
        self.session_id = self.context_manager.import_session(context_data)



Now we can implement our specialized telecom agents on top of the context layer. Each is a thin subclass of ContextAwareAgent; here they are side by side:

  • class ContextAwareIntentAgent(ContextAwareAgent):
        """Intent analysis agent built on the context layer"""
    
        async def _load_role_prompt(self) -> str:
            return """You are an Intent Analysis Agent in a telecom customer service system.
    Your role is to precisely identify customer intent from queries, detect multiple or
    hidden intents, assess intent confidence, and identify required context for resolution."""
    
        async def analyze_intent(self, query: str) -> Dict:
            """Analyze customer intent with managed context"""
            await self.add_user_context(query, {"type": "customer_query"})
    
            analysis_prompt = """Based on the customer query above, provide the following analysis:
    1. Primary Intent: The main customer goal
    2. Secondary Intents: Additional or implied needs
    3. Required Information: What we need to know to resolve this
    4. Confidence Score: How certain are you (0-1)
    
    Return your analysis in JSON format.
    """
    
            result = await self.process_with_llm(analysis_prompt)
    
            # Parse JSON response (in production, add error handling)
            try:
                analysis = json.loads(result)
            except json.JSONDecodeError:
                # Fallback: Extract manually using regex or prompt again
                analysis = {
                    "primary_intent": "unknown",
                    "secondary_intents": [],
                    "required_information": [],
                    "confidence": 0.5
                }
    
            return analysis
    
  • class ContextAwareTechnicalAgent(ContextAwareAgent):
        """Technical support agent built on the context layer"""
    
        def __init__(self, llm_client, network_api, context_manager=None):
            super().__init__(llm_client, "technical_agent", context_manager)
            self.network_api = network_api
    
        async def _load_role_prompt(self) -> str:
            return """You are a Technical Support Agent specializing in telecom issues.
    Your role is to diagnose technical issues from symptoms, design step-by-step
    troubleshooting plans, interpret diagnostic results, and recommend solutions."""
    
        async def diagnose_issue(self, issue_description: str, customer_id: str) -> Dict:
            """Diagnose technical issue with managed context"""
            await self.add_user_context(issue_description, {"customer_id": customer_id})
    
            # Add relevant customer technical data from memory
            network_data = await self.network_api.get_customer_network_data(customer_id)
            await self.add_memory_context(
                content=json.dumps(network_data),
                relevance_score=0.85,
                metadata={"type": "network_data", "customer_id": customer_id}
            )
    
            # Run network diagnostics and add results to context
            diagnostics = await self.network_api.run_diagnostics(customer_id)
            await self.add_tool_context(
                content=json.dumps(diagnostics),
                tool_name="network_diagnostics",
                metadata={"customer_id": customer_id}
            )
    
            analysis_prompt = """Based on the customer issue, network data, and diagnostic results,
    provide a technical analysis with:
    1. Root Cause: The most likely cause of the issue
    2. Recommended Solution: Step-by-step resolution plan
    3. Alternative Solutions: Other approaches if the primary solution fails
    4. Confidence: How certain are you about this diagnosis (0-1)
    
    Return your analysis in JSON format.
    """
    
            result = await self.process_with_llm(analysis_prompt)
    
            try:
                analysis = json.loads(result)
            except json.JSONDecodeError:
                analysis = {
                    "root_cause": "unknown",
                    "recommended_solution": [],
                    "alternative_solutions": [],
                    "confidence": 0.5
                }
    
            return analysis
    



Context Optimization Strategies

Agent systems operating at scale need advanced optimization strategies to ensure efficient use of context windows. Here are key optimization techniques the context layer enables:


Context Window Management


class ContextWindowOptimizer:
    """Optimizes context window usage using context-block metadata"""

    def __init__(self, context_manager: ContextManager):
        self.context_manager = context_manager

    async def optimize_session(self, session_id: str, max_tokens: int = 4096) -> None:
        """Optimize context window to fit within token limit"""
        if session_id not in self.context_manager.sessions:
            raise ValueError(f"Session {session_id} does not exist")

        session = self.context_manager.sessions[session_id]

        # Calculate current usage
        current_tokens = session.metrics.total_tokens
        if current_tokens <= max_tokens:
            return  # Already within limits

        # Sort blocks by type and relevance
        typed_blocks = {
            ContextType.SYSTEM: [],
            ContextType.USER: [],
            ContextType.AGENT: [],
            ContextType.MEMORY: [],
            ContextType.KNOWLEDGE: [],
            ContextType.TOOL: []
        }

        for block in session.blocks:
            typed_blocks[block.type].append(block)

        # Always keep SYSTEM blocks
        optimized_blocks = typed_blocks[ContextType.SYSTEM].copy()
        used_tokens = sum(b.token_count for b in optimized_blocks)

        # Keep most recent USER and AGENT blocks
        for block_type in [ContextType.USER, ContextType.AGENT]:
            blocks = sorted(typed_blocks[block_type], key=lambda b: b.timestamp, reverse=True)

            for block in blocks:
                if used_tokens + block.token_count <= max_tokens * 0.7:  # Keep 30% for tools/memory
                    optimized_blocks.append(block)
                    used_tokens += block.token_count

        # Use remaining space for TOOL, MEMORY and KNOWLEDGE blocks by relevance
        remaining_types = [ContextType.TOOL, ContextType.MEMORY, ContextType.KNOWLEDGE]
        remaining_blocks = []

        for block_type in remaining_types:
            remaining_blocks.extend(typed_blocks[block_type])

        # Sort by relevance score
        remaining_blocks.sort(key=lambda b: b.relevance_score, reverse=True)

        for block in remaining_blocks:
            if used_tokens + block.token_count <= max_tokens:
                optimized_blocks.append(block)
                used_tokens += block.token_count

        # Update session with optimized blocks
        session.blocks = optimized_blocks
        session.calculate_metrics()



Contextual Relevance Scoring

For production systems, simple relevance scoring isn’t sufficient. The TF-IDF calculator below is a lightweight, dependency-light baseline. In 2026 you would typically swap it for embedding-based similarity (encode each block and the query with an embedding model and rank by cosine similarity in a vector store), which captures semantic relatedness that TF-IDF misses. The interface stays the same; here’s the baseline:

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

class ContextualRelevanceCalculator:
    """Calculates contextual relevance between blocks using TF-IDF"""

    def __init__(self):
        self.vectorizer = TfidfVectorizer(stop_words='english')

    def calculate_relevance(self,
                           query: str,
                           context_blocks: List[ContextBlock]) -> Dict[str, float]:
        """Calculate relevance scores between query and context blocks"""
        texts = [query] + [block.content for block in context_blocks]

        # Handle empty texts
        cleaned_texts = [t if t.strip() else "empty" for t in texts]

        try:
            # Transform texts to TF-IDF vectors
            tfidf_matrix = self.vectorizer.fit_transform(cleaned_texts)

            # Calculate cosine similarity between query and each block
            query_vector = tfidf_matrix[0:1]
            block_vectors = tfidf_matrix[1:]

            similarities = cosine_similarity(query_vector, block_vectors).flatten()

            # Create mapping of block IDs to relevance scores
            relevance_scores = {}
            for i, block in enumerate(context_blocks):
                relevance_scores[block.id] = float(similarities[i])

            return relevance_scores
        except Exception as e:
            # Fallback to default scores if vectorization fails
            return {block.id: 0.5 for block in context_blocks}

    def update_relevance_scores(self,
                               query: str,
                               context_manager: ContextManager,
                               session_id: str) -> None:
        """Update relevance scores for all blocks in a session"""
        if session_id not in context_manager.sessions:
            raise ValueError(f"Session {session_id} does not exist")

        session = context_manager.sessions[session_id]
        non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]

        relevance_scores = self.calculate_relevance(query, non_system_blocks)

        # Update blocks with new scores
        for block in non_system_blocks:
            if block.id in relevance_scores:
                block.relevance_score = relevance_scores[block.id]

        # System blocks always keep max relevance
        for block in session.blocks:
            if block.type == ContextType.SYSTEM:
                block.relevance_score = 1.0



Performance Benchmarking and Optimization

Let’s implement a benchmarking suite for the context layer:

import time
import statistics
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, List, Tuple

@dataclass
class BenchmarkResult:
    operation: str
    times: List[float]
    token_counts: List[int]

    @property
    def mean_time(self) -> float:
        return statistics.mean(self.times)

    @property
    def p95_time(self) -> float:
        return statistics.quantiles(self.times, n=20)[18]  # 95th percentile

    @property
    def tokens_per_second(self) -> float:
        total_tokens = sum(self.token_counts)
        total_time = sum(self.times)
        return total_tokens / total_time if total_time > 0 else 0

class ContextBenchmark:
    """Benchmark performance of context-layer operations"""

    def __init__(self, context_manager: ContextManager):
        self.context_manager = context_manager
        self.results = {}

    async def run_add_context_benchmark(self,
                                      iterations: int = 100,
                                      content_size: int = 500) -> BenchmarkResult:
        """Benchmark context addition operations"""
        session_id = self.context_manager.create_session("benchmark")

        times = []
        token_counts = []

        for i in range(iterations):
            # Generate content of specified size
            content = f"Benchmark content iteration {i} " + "word " * content_size

            start_time = time.time()
            block_id = self.context_manager.add_context(
                session_id=session_id,
                content=content,
                context_type=ContextType.MEMORY,
                relevance_score=0.5
            )
            end_time = time.time()

            elapsed = end_time - start_time
            times.append(elapsed)

            # Get token count
            block = next(b for b in self.context_manager.sessions[session_id].blocks
                        if b.id == block_id)
            token_counts.append(block.token_count)

        result = BenchmarkResult(
            operation="add_context",
            times=times,
            token_counts=token_counts
        )

        self.results["add_context"] = result
        return result

    async def run_format_context_benchmark(self,
                                        iterations: int = 100,
                                        block_count: int = 20) -> BenchmarkResult:
        """Benchmark context formatting operations"""
        session_id = self.context_manager.create_session("benchmark")

        # Prepare session with specified number of blocks
        for i in range(block_count):
            content = f"Benchmark content block {i} " + "word " * 100
            self.context_manager.add_context(
                session_id=session_id,
                content=content,
                context_type=ContextType.MEMORY,
                relevance_score=0.5
            )

        times = []
        token_counts = []

        for i in range(iterations):
            start_time = time.time()
            formatted = self.context_manager.get_formatted_context(session_id)
            end_time = time.time()

            elapsed = end_time - start_time
            times.append(elapsed)

            # Approximate token count of formatted output
            token_counts.append(len(formatted.split()))

        result = BenchmarkResult(
            operation="format_context",
            times=times,
            token_counts=token_counts
        )

        self.results["format_context"] = result
        return result

    async def run_prune_context_benchmark(self,
                                       iterations: int = 100,
                                       initial_blocks: int = 100) -> BenchmarkResult:
        """Benchmark context pruning operations"""
        times = []
        token_counts = []

        for i in range(iterations):
            # Create new session for each iteration
            session_id = self.context_manager.create_session("benchmark")

            # Add initial blocks
            for j in range(initial_blocks):
                content = f"Benchmark content block {j} " + "word " * 50
                self.context_manager.add_context(
                    session_id=session_id,
                    content=content,
                    context_type=ContextType.MEMORY,
                    relevance_score=j/initial_blocks  # Vary relevance
                )

            # Force context saturation to trigger pruning
            self.context_manager.sessions[session_id].metrics.context_saturation = 0.95

            # Measure pruning operation
            start_time = time.time()
            self.context_manager._prune_context(session_id)
            end_time = time.time()

            elapsed = end_time - start_time
            times.append(elapsed)

            # Count tokens in pruned context
            token_count = sum(b.token_count for b in
                             self.context_manager.sessions[session_id].blocks)
            token_counts.append(token_count)

        result = BenchmarkResult(
            operation="prune_context",
            times=times,
            token_counts=token_counts
        )

        self.results["prune_context"] = result
        return result

    def plot_results(self, save_path: str = None) -> None:
        """Plot benchmark results"""
        if not self.results:
            raise ValueError("No benchmark results to plot")

        fig, axs = plt.subplots(1, 3, figsize=(18, 6))

        operations = list(self.results.keys())

        # Plot 1: Mean operation time
        mean_times = [self.results[op].mean_time for op in operations]
        axs[0].bar(operations, mean_times)
        axs[0].set_title('Mean Operation Time (s)')
        axs[0].set_ylabel('Seconds')

        # Plot 2: 95th percentile operation time
        p95_times = [self.results[op].p95_time for op in operations]
        axs[1].bar(operations, p95_times)
        axs[1].set_title('P95 Operation Time (s)')
        axs[1].set_ylabel('Seconds')

        # Plot 3: Tokens per second
        tps = [self.results[op].tokens_per_second for op in operations]
        axs[2].bar(operations, tps)
        axs[2].set_title('Throughput (tokens/s)')
        axs[2].set_ylabel('Tokens per Second')

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path)

        plt.show()



Running the Context Layer in Production

Running this in production requires careful consideration of scaling, memory management, and performance optimization. Here’s how to implement these features in real-world systems:


Distributed Context Store

For large-scale deployments, a centralized context manager won’t suffice. Here’s a distributed context store that scales horizontally:

import redis
import json
from typing import Dict, List, Optional

class DistributedContextStore:
    """Distributed context store using Redis (JSON serialization)"""

    @staticmethod
    def _block_to_dict(block: ContextBlock) -> Dict:
        return {
            "id": block.id,
            "content": block.content,
            "relevance_score": block.relevance_score,
            "type": block.type.name,
            "metadata": block.metadata,
            "timestamp": block.timestamp,
            "token_count": block.token_count,
            "references": block.references,
        }

    @staticmethod
    def _block_from_dict(d: Dict) -> ContextBlock:
        return ContextBlock(
            id=d["id"],
            content=d["content"],
            relevance_score=d["relevance_score"],
            type=ContextType[d["type"]],
            metadata=d["metadata"],
            timestamp=d["timestamp"],
            token_count=d["token_count"],
            references=d["references"],
        )

    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl  # Default TTL in seconds

    def _session_key(self, session_id: str) -> str:
        """Generate Redis key for a session"""
        return f"pyctx:session:{session_id}"

    def _block_key(self, session_id: str, block_id: str) -> str:
        """Generate Redis key for a block"""
        return f"pyctx:block:{session_id}:{block_id}"

    def store_session(self, session: ContextPackage) -> None:
        """Store session metadata in Redis"""
        session_key = self._session_key(session.session_id)

        # Store main session metadata. Avoid pickle: deserializing pickled
        # data from a shared store is a remote-code-execution risk. JSON keeps
        # the payload language-agnostic and safe to read from any service.
        session_meta = {
            "agent_id": session.agent_id,
            "version": str(session.version),
            "trace_id": session.trace_id,
            "metrics": json.dumps({
                "total_tokens": session.metrics.total_tokens,
                "context_saturation": session.metrics.context_saturation,
                "type_distribution": session.metrics.type_distribution,
            }),
            "block_ids": json.dumps([b.id for b in session.blocks]),
        }

        # Store in Redis with a pipeline for performance
        pipe = self.redis.pipeline()
        pipe.hset(session_key, mapping=session_meta)
        pipe.expire(session_key, self.ttl)

        # Store each block separately for efficient partial updates
        for block in session.blocks:
            block_key = self._block_key(session.session_id, block.id)
            pipe.set(block_key, json.dumps(self._block_to_dict(block)), ex=self.ttl)

        pipe.execute()

    def retrieve_session(self, session_id: str) -> Optional[ContextPackage]:
        """Retrieve complete session from Redis"""
        session_key = self._session_key(session_id)

        # Get session metadata
        session_data = self.redis.hgetall(session_key)
        if not session_data:
            return None

        # Decode metrics
        metrics_raw = json.loads(session_data[b"metrics"])
        metrics = ContextMetrics(
            total_tokens=metrics_raw["total_tokens"],
            context_saturation=metrics_raw["context_saturation"],
            type_distribution=metrics_raw["type_distribution"],
        )

        # Get block IDs and retrieve blocks
        block_ids = json.loads(session_data[b"block_ids"])

        pipe = self.redis.pipeline()
        for block_id in block_ids:
            pipe.get(self._block_key(session_id, block_id))

        blocks = [
            self._block_from_dict(json.loads(raw))
            for raw in pipe.execute() if raw
        ]

        # Reconstruct session
        return ContextPackage(
            session_id=session_id,
            agent_id=session_data[b"agent_id"].decode(),
            blocks=blocks,
            metrics=metrics,
            version=int(session_data[b"version"]),
            trace_id=session_data[b"trace_id"].decode()
        )

    def update_block(self, session_id: str, block: ContextBlock) -> None:
        """Update a specific block in a session"""
        block_key = self._block_key(session_id, block.id)
        self.redis.set(block_key, json.dumps(self._block_to_dict(block)), ex=self.ttl)

    def delete_session(self, session_id: str) -> None:
        """Delete a session and all its blocks"""
        session_key = self._session_key(session_id)

        # Get block IDs
        session_data = self.redis.hgetall(session_key)
        if not session_data:
            return

        block_ids = json.loads(session_data[b"block_ids"])

        # Delete all blocks and session
        pipe = self.redis.pipeline()
        for block_id in block_ids:
            block_key = self._block_key(session_id, block_id)
            pipe.delete(block_key)

        pipe.delete(session_key)
        pipe.execute()


Real-time Context Optimization

In high-volume systems, real-time context optimization becomes critical for cost and performance reasons:

import asyncio
import numpy as np
from collections import deque

class RealTimeContextOptimizer:
    """Real-time context optimization strategies for production systems"""

    def __init__(self,
                context_manager: ContextManager,
                token_budget: int = 4096,
                optimization_interval: float = 0.1,  # seconds
                relevance_decay_factor: float = 0.95):
        self.context_manager = context_manager
        self.token_budget = token_budget
        self.optimization_interval = optimization_interval
        self.relevance_decay_factor = relevance_decay_factor
        self.running = False
        self.performance_metrics = deque(maxlen=1000)  # Keep last 1000 metrics

    async def start_optimization_loop(self, session_id: str) -> None:
        """Start continuous optimization in background"""
        self.running = True
        try:
            while self.running:
                start_time = time.time()

                # Perform optimization
                await self.optimize_session(session_id)

                # Measure performance
                end_time = time.time()
                self.performance_metrics.append(end_time - start_time)

                # Sleep until next interval
                await asyncio.sleep(self.optimization_interval)
        except Exception as e:
            print(f"Optimization loop error: {e}")
            self.running = False

    def stop_optimization_loop(self) -> None:
        """Stop background optimization"""
        self.running = False

    async def optimize_session(self, session_id: str) -> None:
        """Perform a single optimization cycle"""
        if session_id not in self.context_manager.sessions:
            return

        session = self.context_manager.sessions[session_id]

        # Apply time-based relevance decay
        self._apply_relevance_decay(session)

        # Update relevance based on recency
        self._update_recency_relevance(session)

        # Enforce token budget
        if session.metrics.total_tokens > self.token_budget:
            self._enforce_token_budget(session)

        # Re-calculate metrics
        session.calculate_metrics()

    def _apply_relevance_decay(self, session: ContextPackage) -> None:
        """Apply time-based decay to relevance scores"""
        current_time = int(time.time() * 1000)

        for block in session.blocks:
            # Don't decay system blocks
            if block.type == ContextType.SYSTEM:
                continue

            # Calculate age in seconds
            age_seconds = (current_time - block.timestamp) / 1000

            # Apply exponential decay
            decay = self.relevance_decay_factor ** (age_seconds / 60)  # Decay per minute
            block.relevance_score *= decay

            # Ensure minimum relevance
            block.relevance_score = max(0.1, block.relevance_score)

    def _update_recency_relevance(self, session: ContextPackage) -> None:
        """Boost relevance of recent conversational turns"""
        # Sort blocks by timestamp
        recent_blocks = sorted(
            [b for b in session.blocks if b.type in (ContextType.USER, ContextType.AGENT)],
            key=lambda b: b.timestamp,
            reverse=True
        )

        # Boost most recent conversation turns
        for i, block in enumerate(recent_blocks[:10]):  # Consider last 10 turns
            recency_boost = 0.95 ** i  # Exponential decay based on recency
            block.relevance_score = min(1.0, block.relevance_score + recency_boost)

    def _enforce_token_budget(self, session: ContextPackage) -> None:
        """Ensure token count stays within budget"""
        # Priority order: SYSTEM > recent USER/AGENT > TOOL > MEMORY > KNOWLEDGE
        system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]

        # Get non-system blocks, sorted by relevance
        non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
        non_system_blocks.sort(key=lambda b: b.relevance_score, reverse=True)

        # Calculate tokens in system blocks
        system_tokens = sum(b.token_count for b in system_blocks)

        # Calculate how many tokens we can use for non-system blocks
        available_tokens = self.token_budget - system_tokens

        # Keep adding blocks until we hit the limit
        kept_blocks = system_blocks.copy()
        used_tokens = system_tokens

        for block in non_system_blocks:
            if used_tokens + block.token_count <= self.token_budget:
                kept_blocks.append(block)
                used_tokens += block.token_count
            # else discard this block

        # Update session blocks
        session.blocks = kept_blocks



Hypothetical Implementation Example: Telecom Customer Service

Let’s explore a hypothetical scenario showing how disciplined context engineering could transform a telecom customer service system handling 50,000+ customer interactions daily.

Hypothetical Baseline (Before Optimization)

Consider a telecom company using a conventional LLM-based customer service system with these theoretical performance characteristics:

  1. Average completion time: 12.5 seconds per query
  2. Context window utilization: 32% (wasted tokens)
  3. Coherence over multi-turn conversations: 68% (measured by user satisfaction)
  4. Agent handoff success rate: 52% (context lost during transfers)
  5. Daily token costs: $4,200 (for 50,000 interactions)



Hypothetical Optimized Approach

In this scenario, we could wire in the context layer and optimize context management with the following approach:

async def telecom_service_enhancement():
    # Initialize context-layer components
    redis_url = "redis://redis-master.production:6379/0"
    context_store = DistributedContextStore(redis_url)

    # Create optimized context manager
    optimized_manager = ContextManager(max_tokens=8192)

    # Inject relevance calculator
    relevance_calculator = ContextualRelevanceCalculator()

    # Initialize real-time optimizer
    optimizer = RealTimeContextOptimizer(
        context_manager=optimized_manager,
        token_budget=4096,
        optimization_interval=0.05,
        relevance_decay_factor=0.98
    )

    # Create context-aware agents
    intent_agent = ContextAwareIntentAgent(
        llm_client=anthropic_client,
        agent_role="intent_agent",
        context_manager=optimized_manager
    )

    technical_agent = ContextAwareTechnicalAgent(
        llm_client=anthropic_client,
        network_api=network_api,
        context_manager=optimized_manager
    )

    # Create coordinator with context transfer capabilities
    coordinator = AgentCoordinator(
        agents={
            "intent": intent_agent,
            "technical": technical_agent,
            # Other specialized agents
        },
        context_manager=optimized_manager,
        context_store=context_store
    )

    # Initialize metrics collection
    metrics_collector = MetricsCollector(
        prometheus_endpoint="http://prometheus.monitoring:9090/metrics"
    )

    # Start service
    return await start_service(coordinator, metrics_collector)



Projected Results (Illustrative)

Based on the architecture described above, this hypothetical system could plausibly move each metric in the right direction. I am deliberately not putting hard numbers here: I never ran the controlled benchmark that would justify them, and inventing precise percentages would be exactly the kind of false precision this post should avoid. Directionally, you would expect:

  1. Lower average completion time, from smaller, better-targeted prompts
  2. Higher useful context-window utilization, with fewer wasted tokens
  3. Better multi-turn coherence, from relevance-aware retention
  4. Higher agent-handoff success, from structured context transfer
  5. Lower daily token cost, from pruning and deduplication

These improvements would result from:

  1. Efficient context packaging and transmission between agents
  2. Dynamic relevance scoring to prioritize important information
  3. Structured context exchange enabling seamless agent handoffs
  4. Automatic optimization of context window utilization
  5. Reduced token waste through intelligent pruning



Key Performance Considerations

To run a context layer like this in your own production system, consider these performance best practices:


  • class ContextMemoryOptimizer:
        """Optimizes memory usage for the context layer in production"""
    
        def __init__(self, context_manager: ContextManager):
            self.context_manager = context_manager
    
        def optimize_memory_usage(self, session_id: str) -> float:
            """Optimize memory usage and return memory saved in MB"""
            session = self.context_manager.sessions[session_id]
    
            # Calculate current memory usage
            initial_memory = self._estimate_memory_usage(session)
    
            # Perform optimizations
            self._deduplicate_content(session)
            self._compress_metadata(session)
            self._truncate_long_blocks(session)
    
            # Calculate new memory usage
            final_memory = self._estimate_memory_usage(session)
    
            # Return memory saved in MB
            return (initial_memory - final_memory) / (1024 * 1024)
    
        def _estimate_memory_usage(self, session: ContextPackage) -> int:
            """Estimate memory usage in bytes"""
            memory_usage = 0
    
            # Session metadata
            memory_usage += len(session.session_id) + len(session.agent_id) + 16  # Base overhead
    
            # Blocks
            for block in session.blocks:
                # Content is the main memory user
                memory_usage += len(block.content) * 2  # Unicode overhead
    
                # Metadata
                if block.metadata:
                    for key, value in block.metadata.items():
                        memory_usage += len(key) + len(value)
    
                # Other fields
                memory_usage += 64  # Base block overhead
    
            return memory_usage
    
        def _deduplicate_content(self, session: ContextPackage) -> None:
            """Remove duplicate content in blocks"""
            content_set = set()
            blocks_to_keep = []
    
            for block in session.blocks:
                # Always keep system blocks
                if block.type == ContextType.SYSTEM:
                    blocks_to_keep.append(block)
                    continue
    
                # Check for duplicate content
                if block.content in content_set:
                    continue  # Skip duplicate
    
                content_set.add(block.content)
                blocks_to_keep.append(block)
    
            session.blocks = blocks_to_keep
    
        def _compress_metadata(self, session: ContextPackage) -> None:
            """Compress metadata by removing unnecessary fields"""
            for block in session.blocks:
                if not block.metadata:
                    continue
    
                # Remove empty values
                block.metadata = {k: v for k, v in block.metadata.items() if v}
    
                # Truncate long values
                for key, value in block.metadata.items():
                    if len(value) > 100:
                        block.metadata[key] = value[:97] + "..."
    
        def _truncate_long_blocks(self, session: ContextPackage) -> None:
            """Truncate extremely long content blocks"""
            for block in session.blocks:
                # Truncate blocks longer than 1000 tokens (approximately)
                if block.token_count > 1000:
                    words = block.content.split()
                    truncated_content = " ".join(words[:950]) + " [... content truncated ...]"
                    block.content = truncated_content
                    block.token_count = len(truncated_content.split())
    
  • For high-throughput systems, managing concurrency is critical:

    import asyncio
    from contextlib import asynccontextmanager
    
    class ContextConcurrencyManager:
        """Manages concurrent access to context resources"""
    
        def __init__(self,
                    max_concurrent_sessions: int = 1000,
                    max_concurrent_contexts: int = 5000):
            self.session_semaphore = asyncio.Semaphore(max_concurrent_sessions)
            self.context_semaphore = asyncio.Semaphore(max_concurrent_contexts)
            self.session_locks = {}
    
        @asynccontextmanager
        async def session_context(self, session_id: str):
            """Manage concurrent access to a session"""
            # Create lock for this session if doesn't exist
            if session_id not in self.session_locks:
                self.session_locks[session_id] = asyncio.Lock()
    
            # Acquire session semaphore and lock
            async with self.session_semaphore:
                async with self.session_locks[session_id]:
                    yield
    
        @asynccontextmanager
        async def context_operation(self):
            """Manage concurrent context operations"""
            async with self.context_semaphore:
                yield
    
        def cleanup_session(self, session_id: str):
            """Remove locks for a session when it's no longer needed"""
            if session_id in self.session_locks:
                del self.session_locks[session_id]
    



Production Deployment Strategy

To deploy a context layer like this in production, I recommend this phased approach:

  1. Pilot phase: Roll the context layer out to a single agent type with low traffic
  2. Gradual rollout: Extend to specialized agents one by one
  3. A/B testing: Compare performance metrics between the context-managed and baseline systems
  4. Full deployment: Scale horizontally with distributed context stores
  5. Continuous optimization: Implement real-time monitors to tune parameters

The pilot deployment may look like this:

async def pilot_deployment():
    # Initialize distributed components with lower capacity
    redis_url = "redis://redis-staging:6379/0"
    context_store = DistributedContextStore(redis_url, ttl=1800)  # 30 minute TTL

    # Create context manager with conservative limits
    context_manager = ContextManager(max_tokens=4096)

    # Configure for 5% of traffic
    traffic_ratio = 0.05

    # Create context-aware intent agent
    intent_agent = ContextAwareIntentAgent(
        llm_client=anthropic_client,
        agent_role="intent_agent",
        context_manager=context_manager
    )

    # Create monitoring
    monitor = ProductionMonitor(
        datadog_api_key="YOUR_API_KEY",
        experiment_name="pycontext_pilot",
        sample_rate=0.1  # Sample 10% of interactions for detailed analysis
    )

    # Start pilot with traffic splitting
    return await start_pilot(
        agent=intent_agent,
        context_store=context_store,
        traffic_ratio=traffic_ratio,
        monitor=monitor
    )



Concluding thoughts

Deliberate context engineering is one of the highest-leverage parts of multi-agent system design. A typed context-packaging layer like the one here addresses context optimization, agent collaboration, and memory management directly, where ad-hoc string passing falls apart at scale.

Key takeaways from our implementation:

  1. A shared format matters: A common context schema enables interoperability between diverse agent systems
  2. Memory optimization is critical: Real-time context management directly impacts cost and performance
  3. Production deployments require careful scaling: Distributed context stores and concurrency management are essential
  4. Relevance scoring drives optimization: Dynamic scoring algorithms significantly improve context window utilization

While this telecom example is hypothetical, the architectural patterns described show how disciplined context management could deliver tangible business benefits through reduced costs, faster responses, and improved customer satisfaction in real-world applications.

As autonomous agent systems continue to evolve, this kind of context discipline pairs naturally with connectivity standards like MCP: MCP brings tools and data to your agents, and a context layer decides what actually makes it into each prompt and across each handoff.

Future directions for this layer include:

  1. Cross-modal context representation: Supporting efficient encoding of multimodal content
  2. Federated context management: Enabling privacy-preserving context sharing across organizations
  3. Self-optimizing context strategies: Using reinforcement learning to dynamically tune context parameters

By adding a context layer like this to your own systems, you can pursue similar performance improvements while establishing a foundation for future enhancements to your agent architecture.


Want to go deeper on context-aware agent architectures? Check out my previous articles on autonomous multi-agent systems and context-aware data pipelines.

Cite this article

Mitra, Subhadip. (2025, March). Context Engineering for Autonomous Multi-Agent Systems - Packaging, Pruning, and Transferring Context at Scale. Subhadip Mitra. Retrieved from https://subhadipmitra.com/blog/2025/implementing-model-context-protocol/

@article{mitra2025context-engineering-for-autonomous-multi-agent-systems-packaging-pruning-and-transferring-context-at-scale,
  title   = {Context Engineering for Autonomous Multi-Agent Systems - Packaging, Pruning, and Transferring Context at Scale},
  author  = {Mitra, Subhadip},
  journal = {Subhadip Mitra},
  year    = {2025},
  month   = {Mar},
  url     = {https://subhadipmitra.com/blog/2025/implementing-model-context-protocol/}
}
Share this article

Get More Like This

Strategic insights on Data, AI, and Cloud transformation delivered to your inbox.

Free insights. No spam. Unsubscribe anytime.

Subhadip Mitra