Implementing Model Context Protocol in Autonomous Multi-Agent Systems - Technical Architecture and Performance Optimization

After exploring the architecture and implementation of autonomous multi-agent systems for telecom customer service in my previous article, it’s time to address an emerging standard that promises to solve many of the challenges I outlined: Model Context Protocol (MCP).

An initial implementation of a framework for building autonomous multi-agent systems with standardized context management is available here. Contributions are most welcome.


What is Model Context Protocol and Why Should You Care?

Model Context Protocol represents a standardized approach to managing, transmitting, and optimizing context between large language models and agent systems. While proprietary context handling mechanisms abound, MCP offers a unified framework that solves critical challenges:

  1. Standardized context exchange between heterogeneous agent systems
  2. Optimized context transmission with payload minimization
  3. Semantic context tagging for intelligent routing and prioritization
  4. Versioned context management for complex multi-step operations

Unlike ad-hoc solutions, MCP provides a framework designed specifically for the high-throughput, context-sensitive operations required by production-grade autonomous agent systems. Let’s dig into the technical implementation.


Technical Implementation


Core Protocol Definition

At its heart, MCP implements a standard protocol buffer definition that any agent system can adopt. Here’s a simplified version of the core protocol:

syntax = "proto3";

package mcp;

message ContextBlock {
  string id = 1;
  uint64 timestamp = 2;
  string content = 3;
  float relevance_score = 4;
  map<string, string> metadata = 5;
  ContextType type = 6;
  uint32 token_count = 7;
  repeated string references = 8;
}

enum ContextType {
  SYSTEM = 0;
  USER = 1;
  AGENT = 2;
  MEMORY = 3;
  KNOWLEDGE = 4;
  TOOL = 5;
}

message ContextPackage {
  string session_id = 1;
  string agent_id = 2;
  repeated ContextBlock blocks = 3;
  ContextMetrics metrics = 4;
  uint32 version = 5;
  string trace_id = 6;
}

message ContextMetrics {
  uint32 total_tokens = 1;
  float context_saturation = 2;
  map<string, float> type_distribution = 3;
}

This protocol definition enables serialized context transmission across agent boundaries while maintaining critical metadata that informs context utilization decisions.



Python Implementation

Let’s implement a Python client for MCP that can be used in our agent architecture:

import time
import uuid
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum

class ContextType(Enum):
    SYSTEM = 0
    USER = 1
    AGENT = 2
    MEMORY = 3
    KNOWLEDGE = 4
    TOOL = 5

@dataclass
class ContextBlock:
    id: str
    content: str
    relevance_score: float
    type: ContextType
    metadata: Dict[str, str] = None
    timestamp: int = None
    token_count: int = None
    references: List[str] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = int(time.time() * 1000)
        if self.metadata is None:
            self.metadata = {}
        if self.references is None:
            self.references = []
        if self.token_count is None:
            # Approximate token count based on whitespace splitting
            # In production, use a proper tokenizer
            self.token_count = len(self.content.split())

@dataclass
class ContextMetrics:
    total_tokens: int
    context_saturation: float
    type_distribution: Dict[str, float]

@dataclass
class ContextPackage:
    session_id: str
    agent_id: str
    blocks: List[ContextBlock]
    metrics: ContextMetrics
    version: int = 1
    trace_id: str = None
    
    def __post_init__(self):
        if self.trace_id is None:
            self.trace_id = str(uuid.uuid4())
            
    def calculate_metrics(self) -> None:
        """Calculate metrics based on current context blocks"""
        total_tokens = sum(block.token_count for block in self.blocks)
        
        # Assuming 8K context window
        context_saturation = min(1.0, total_tokens / 8192)
        
        # Calculate distribution of context types
        type_counts = {}
        for block in self.blocks:
            type_name = block.type.name
            if type_name not in type_counts:
                type_counts[type_name] = 0
            type_counts[type_name] += block.token_count
        
        type_distribution = {
            k: v / total_tokens if total_tokens > 0 else 0 
            for k, v in type_counts.items()
        }
        
        self.metrics = ContextMetrics(
            total_tokens=total_tokens,
            context_saturation=context_saturation,
            type_distribution=type_distribution
        )
        
    def to_dict(self) -> Dict:
        """Convert to dictionary representation for serialization"""
        return {
            "session_id": self.session_id,
            "agent_id": self.agent_id,
            "blocks": [
                {
                    "id": b.id,
                    "content": b.content,
                    "relevance_score": b.relevance_score,
                    "type": b.type.name,
                    "metadata": b.metadata,
                    "timestamp": b.timestamp,
                    "token_count": b.token_count,
                    "references": b.references
                }
                for b in self.blocks
            ],
            "metrics": {
                "total_tokens": self.metrics.total_tokens,
                "context_saturation": self.metrics.context_saturation,
                "type_distribution": self.metrics.type_distribution
            },
            "version": self.version,
            "trace_id": self.trace_id
        }



Now let’s implement a ContextManager class that handles context operations with MCP:


import json
import heapq
from typing import Dict, List, Optional, Callable

class ContextManager:
    """Manages context operations using Model Context Protocol"""
    
    def __init__(self, 
                max_tokens: int = 8192,
                relevance_threshold: float = 0.2):
        self.max_tokens = max_tokens
        self.relevance_threshold = relevance_threshold
        self.sessions: Dict[str, ContextPackage] = {}
        
    def create_session(self, agent_id: str) -> str:
        """Create a new context session"""
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = ContextPackage(
            session_id=session_id,
            agent_id=agent_id,
            blocks=[],
            metrics=ContextMetrics(
                total_tokens=0,
                context_saturation=0.0,
                type_distribution={}
            )
        )
        return session_id
    
    def add_context(self, 
                   session_id: str,
                   content: str,
                   context_type: ContextType,
                   relevance_score: float = 1.0,
                   metadata: Dict[str, str] = None) -> str:
        """Add context to an existing session"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")
        
        block_id = str(uuid.uuid4())
        block = ContextBlock(
            id=block_id,
            content=content,
            relevance_score=relevance_score,
            type=context_type,
            metadata=metadata or {}
        )
        
        self.sessions[session_id].blocks.append(block)
        self.sessions[session_id].calculate_metrics()
        
        # If we've exceeded context window, perform context pruning
        if self.sessions[session_id].metrics.context_saturation >= 0.9:
            self._prune_context(session_id)
            
        return block_id
    
    def _prune_context(self, session_id: str) -> None:
        """Prune least relevant context to fit within token limits"""
        session = self.sessions[session_id]
        
        # Don't prune SYSTEM context
        system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]
        other_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
        
        # Sort by relevance score (ascending)
        other_blocks.sort(key=lambda x: x.relevance_score)
        
        # Keep removing blocks until we're under target
        system_tokens = sum(b.token_count for b in system_blocks)
        target_tokens = int(self.max_tokens * 0.8) - system_tokens  # Target 80% usage
        
        current_tokens = sum(b.token_count for b in other_blocks)
        while current_tokens > target_tokens and other_blocks:
            removed_block = other_blocks.pop(0)  # Remove least relevant
            current_tokens -= removed_block.token_count
        
        # Reconstitute the blocks list
        session.blocks = system_blocks + other_blocks
        session.calculate_metrics()
    
    def get_formatted_context(self, 
                             session_id: str,
                             formatter: Callable = None) -> str:
        """Get formatted context for model input"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")
        
        session = self.sessions[session_id]
        
        # Default formatter concatenates content with block type as separator
        if formatter is None:
            result = []
            for block in session.blocks:
                if block.relevance_score >= self.relevance_threshold:
                    result.append(f"[{block.type.name}]\n{block.content}")
            return "\n\n".join(result)
        
        return formatter(session)
    
    def export_session(self, session_id: str) -> Dict:
        """Export session as serializable dict"""
        if session_id not in self.sessions:
            raise ValueError(f"Session {session_id} does not exist")
        
        return self.sessions[session_id].to_dict()
        
    def import_session(self, session_data: Dict) -> str:
        """Import a session from serialized data"""
        session_id = session_data["session_id"]
        
        blocks = []
        for block_data in session_data["blocks"]:
            blocks.append(ContextBlock(
                id=block_data["id"],
                content=block_data["content"],
                relevance_score=block_data["relevance_score"],
                type=ContextType[block_data["type"]],
                metadata=block_data["metadata"],
                timestamp=block_data["timestamp"],
                token_count=block_data["token_count"],
                references=block_data["references"]
            ))
        
        metrics = ContextMetrics(
            total_tokens=session_data["metrics"]["total_tokens"],
            context_saturation=session_data["metrics"]["context_saturation"],
            type_distribution=session_data["metrics"]["type_distribution"]
        )
        
        self.sessions[session_id] = ContextPackage(
            session_id=session_id,
            agent_id=session_data["agent_id"],
            blocks=blocks,
            metrics=metrics,
            version=session_data["version"],
            trace_id=session_data["trace_id"]
        )
        
        return session_id


Integration with Existing Agent Systems

Now let’s implement the integration of MCP into our multi-agent telecom customer service system from the previous article:

from typing import Dict, List, Optional, Union
import asyncio
import json

class MCPEnabledAgent:
    """Base class for agents that use Model Context Protocol"""
    
    def __init__(self,
                llm_client,
                agent_role: str,
                context_manager: ContextManager = None):
        self.llm = llm_client
        self.role = agent_role
        self.context_manager = context_manager or ContextManager()
        self.session_id = None
    
    async def initialize_session(self) -> str:
        """Initialize a new context session"""
        self.session_id = self.context_manager.create_session(self.role)
        
        # Add system prompt as SYSTEM context
        system_prompt = await self._load_role_prompt()
        self.context_manager.add_context(
            session_id=self.session_id,
            content=system_prompt,
            context_type=ContextType.SYSTEM,
            relevance_score=1.0,  # System prompts always max relevance
            metadata={"type": "system_prompt", "role": self.role}
        )
        
        return self.session_id
    
    async def _load_role_prompt(self) -> str:
        """Load role-specific prompt - implement in subclasses"""
        raise NotImplementedError()
    
    async def add_user_context(self, 
                              content: str, 
                              metadata: Dict = None) -> str:
        """Add user input to context"""
        if self.session_id is None:
            await self.initialize_session()
            
        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.USER,
            relevance_score=0.9,  # User context starts with high relevance
            metadata=metadata or {}
        )
    
    async def add_memory_context(self,
                                content: str,
                                relevance_score: float,
                                metadata: Dict = None) -> str:
        """Add memory (from episodic or semantic memory) to context"""
        if self.session_id is None:
            await self.initialize_session()
            
        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.MEMORY,
            relevance_score=relevance_score,
            metadata=metadata or {}
        )
    
    async def add_tool_context(self,
                              content: str,
                              tool_name: str,
                              metadata: Dict = None) -> str:
        """Add tool usage results to context"""
        if self.session_id is None:
            await self.initialize_session()
            
        if metadata is None:
            metadata = {}
        metadata["tool_name"] = tool_name
            
        return self.context_manager.add_context(
            session_id=self.session_id,
            content=content,
            context_type=ContextType.TOOL,
            relevance_score=0.8,  # Tool outputs generally have high relevance
            metadata=metadata
        )
    
    async def process_with_llm(self, 
                              prompt: str = None) -> str:
        """Process the current context with LLM"""
        if self.session_id is None:
            await self.initialize_session()
        
        formatted_context = self.context_manager.get_formatted_context(self.session_id)
        
        if prompt:
            # Add additional prompt as temporary context
            formatted_context += f"\n\n[PROMPT]\n{prompt}"
        
        # Call LLM with formatted context
        response = await self.llm.generate(formatted_context)
        
        # Add agent response to context
        self.context_manager.add_context(
            session_id=self.session_id,
            content=response,
            context_type=ContextType.AGENT,
            relevance_score=0.9,  # Agent responses are highly relevant
            metadata={"type": "agent_response"}
        )
        
        return response
    
    def export_context(self) -> Dict:
        """Export current context for transfer to another agent"""
        if self.session_id is None:
            raise ValueError("No active session")
            
        return self.context_manager.export_session(self.session_id)
    
    def import_context(self, context_data: Dict) -> None:
        """Import context from another agent"""
        self.session_id = self.context_manager.import_session(context_data)



Now we can implement our specialized telecom agents with MCP integration:

class MCPEnabledIntentAgent(MCPEnabledAgent):
    """Intent analysis agent with MCP integration"""
    
    async def _load_role_prompt(self) -> str:
        return """You are an Intent Analysis Agent in a telecom customer service system.
Your role is to precisely identify customer intent from queries, detect multiple or
hidden intents, assess intent confidence, and identify required context for resolution."""
    
    async def analyze_intent(self, query: str) -> Dict:
        """Analyze customer intent using MCP for context management"""
        await self.add_user_context(query, {"type": "customer_query"})
        
        analysis_prompt = """Based on the customer query above, provide the following analysis:
1. Primary Intent: The main customer goal
2. Secondary Intents: Additional or implied needs
3. Required Information: What we need to know to resolve this
4. Confidence Score: How certain are you (0-1)

Return your analysis in JSON format.
"""
        
        result = await self.process_with_llm(analysis_prompt)
        
        # Parse JSON response (in production, add error handling)
        try:
            analysis = json.loads(result)
        except json.JSONDecodeError:
            # Fallback: Extract manually using regex or prompt again
            analysis = {
                "primary_intent": "unknown",
                "secondary_intents": [],
                "required_information": [],
                "confidence": 0.5
            }
            
        return analysis

class MCPEnabledTechnicalAgent(MCPEnabledAgent):
    """Technical support agent with MCP integration"""
    
    def __init__(self, llm_client, network_api, context_manager=None):
        super().__init__(llm_client, "technical_agent", context_manager)
        self.network_api = network_api
    
    async def _load_role_prompt(self) -> str:
        return """You are a Technical Support Agent specializing in telecom issues.
Your role is to diagnose technical issues from symptoms, design step-by-step
troubleshooting plans, interpret diagnostic results, and recommend solutions."""
    
    async def diagnose_issue(self, issue_description: str, customer_id: str) -> Dict:
        """Diagnose technical issue with MCP context management"""
        await self.add_user_context(issue_description, {"customer_id": customer_id})
        
        # Add relevant customer technical data from memory
        network_data = await self.network_api.get_customer_network_data(customer_id)
        await self.add_memory_context(
            content=json.dumps(network_data),
            relevance_score=0.85,
            metadata={"type": "network_data", "customer_id": customer_id}
        )
        
        # Run network diagnostics and add results to context
        diagnostics = await self.network_api.run_diagnostics(customer_id)
        await self.add_tool_context(
            content=json.dumps(diagnostics),
            tool_name="network_diagnostics",
            metadata={"customer_id": customer_id}
        )
        
        analysis_prompt = """Based on the customer issue, network data, and diagnostic results,
provide a technical analysis with:
1. Root Cause: The most likely cause of the issue
2. Recommended Solution: Step-by-step resolution plan
3. Alternative Solutions: Other approaches if the primary solution fails
4. Confidence: How certain are you about this diagnosis (0-1)

Return your analysis in JSON format.
"""
        
        result = await self.process_with_llm(analysis_prompt)
        
        try:
            analysis = json.loads(result)
        except json.JSONDecodeError:
            analysis = {
                "root_cause": "unknown",
                "recommended_solution": [],
                "alternative_solutions": [],
                "confidence": 0.5
            }
            
        return analysis



Context Optimization Strategies

Agent systems operating at scale need advanced optimization strategies to ensure efficient use of context windows. Here are key MCP-enabled optimization techniques:


Context Window Management


class ContextWindowOptimizer:
    """Optimizes context window usage using MCP metadata"""
    
    def __init__(self, context_manager: ContextManager):
        self.context_manager = context_manager
    
    async def optimize_session(self, session_id: str, max_tokens: int = 4096) -> None:
        """Optimize context window to fit within token limit"""
        if session_id not in self.context_manager.sessions:
            raise ValueError(f"Session {session_id} does not exist")
            
        session = self.context_manager.sessions[session_id]
        
        # Calculate current usage
        current_tokens = session.metrics.total_tokens
        if current_tokens <= max_tokens:
            return  # Already within limits
            
        # Sort blocks by type and relevance
        typed_blocks = {
            ContextType.SYSTEM: [],
            ContextType.USER: [],
            ContextType.AGENT: [],
            ContextType.MEMORY: [],
            ContextType.KNOWLEDGE: [],
            ContextType.TOOL: []
        }
        
        for block in session.blocks:
            typed_blocks[block.type].append(block)
            
        # Always keep SYSTEM blocks
        optimized_blocks = typed_blocks[ContextType.SYSTEM].copy()
        used_tokens = sum(b.token_count for b in optimized_blocks)
        
        # Keep most recent USER and AGENT blocks
        for block_type in [ContextType.USER, ContextType.AGENT]:
            blocks = sorted(typed_blocks[block_type], key=lambda b: b.timestamp, reverse=True)
            
            for block in blocks:
                if used_tokens + block.token_count <= max_tokens * 0.7:  # Keep 30% for tools/memory
                    optimized_blocks.append(block)
                    used_tokens += block.token_count
                    
        # Use remaining space for TOOL, MEMORY and KNOWLEDGE blocks by relevance
        remaining_types = [ContextType.TOOL, ContextType.MEMORY, ContextType.KNOWLEDGE]
        remaining_blocks = []
        
        for block_type in remaining_types:
            remaining_blocks.extend(typed_blocks[block_type])
            
        # Sort by relevance score
        remaining_blocks.sort(key=lambda b: b.relevance_score, reverse=True)
        
        for block in remaining_blocks:
            if used_tokens + block.token_count <= max_tokens:
                optimized_blocks.append(block)
                used_tokens += block.token_count
                
        # Update session with optimized blocks
        session.blocks = optimized_blocks
        session.calculate_metrics()



Contextual Relevance Scoring

For production systems, simple relevance scoring isn’t sufficient. Let’s implement a more sophisticated relevance calculator:

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

class ContextualRelevanceCalculator:
    """Calculates contextual relevance between blocks using TF-IDF"""
    
    def __init__(self):
        self.vectorizer = TfidfVectorizer(stop_words='english')
        
    def calculate_relevance(self, 
                           query: str, 
                           context_blocks: List[ContextBlock]) -> Dict[str, float]:
        """Calculate relevance scores between query and context blocks"""
        texts = [query] + [block.content for block in context_blocks]
        
        # Handle empty texts
        cleaned_texts = [t if t.strip() else "empty" for t in texts]
        
        try:
            # Transform texts to TF-IDF vectors
            tfidf_matrix = self.vectorizer.fit_transform(cleaned_texts)
            
            # Calculate cosine similarity between query and each block
            query_vector = tfidf_matrix[0:1]
            block_vectors = tfidf_matrix[1:]
            
            similarities = cosine_similarity(query_vector, block_vectors).flatten()
            
            # Create mapping of block IDs to relevance scores
            relevance_scores = {}
            for i, block in enumerate(context_blocks):
                relevance_scores[block.id] = float(similarities[i])
                
            return relevance_scores
        except Exception as e:
            # Fallback to default scores if vectorization fails
            return {block.id: 0.5 for block in context_blocks}
    
    def update_relevance_scores(self, 
                               query: str, 
                               context_manager: ContextManager,
                               session_id: str) -> None:
        """Update relevance scores for all blocks in a session"""
        if session_id not in context_manager.sessions:
            raise ValueError(f"Session {session_id} does not exist")
            
        session = context_manager.sessions[session_id]
        non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
        
        relevance_scores = self.calculate_relevance(query, non_system_blocks)
        
        # Update blocks with new scores
        for block in non_system_blocks:
            if block.id in relevance_scores:
                block.relevance_score = relevance_scores[block.id]
                
        # System blocks always keep max relevance
        for block in session.blocks:
            if block.type == ContextType.SYSTEM:
                block.relevance_score = 1.0



Performance Benchmarking and Optimization

Let’s implement a benchmarking suite for MCP:

import time
import statistics
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, List, Tuple

@dataclass
class BenchmarkResult:
    operation: str
    times: List[float]
    token_counts: List[int]
    
    @property
    def mean_time(self) -> float:
        return statistics.mean(self.times)
    
    @property
    def p95_time(self) -> float:
        return statistics.quantiles(self.times, n=20)[18]  # 95th percentile
    
    @property
    def tokens_per_second(self) -> float:
        total_tokens = sum(self.token_counts)
        total_time = sum(self.times)
        return total_tokens / total_time if total_time > 0 else 0

class MCPBenchmark:
    """Benchmark performance of MCP operations"""
    
    def __init__(self, context_manager: ContextManager):
        self.context_manager = context_manager
        self.results = {}
        
    async def run_add_context_benchmark(self, 
                                      iterations: int = 100, 
                                      content_size: int = 500) -> BenchmarkResult:
        """Benchmark context addition operations"""
        session_id = self.context_manager.create_session("benchmark")
        
        times = []
        token_counts = []
        
        for i in range(iterations):
            # Generate content of specified size
            content = f"Benchmark content iteration {i} " + "word " * content_size
            
            start_time = time.time()
            block_id = self.context_manager.add_context(
                session_id=session_id,
                content=content,
                context_type=ContextType.MEMORY,
                relevance_score=0.5
            )
            end_time = time.time()
            
            elapsed = end_time - start_time
            times.append(elapsed)
            
            # Get token count
            block = next(b for b in self.context_manager.sessions[session_id].blocks 
                        if b.id == block_id)
            token_counts.append(block.token_count)
            
        result = BenchmarkResult(
            operation="add_context",
            times=times,
            token_counts=token_counts
        )
        
        self.results["add_context"] = result
        return result
    
    async def run_format_context_benchmark(self, 
                                        iterations: int = 100,
                                        block_count: int = 20) -> BenchmarkResult:
        """Benchmark context formatting operations"""
        session_id = self.context_manager.create_session("benchmark")
        
        # Prepare session with specified number of blocks
        for i in range(block_count):
            content = f"Benchmark content block {i} " + "word " * 100
            self.context_manager.add_context(
                session_id=session_id,
                content=content,
                context_type=ContextType.MEMORY,
                relevance_score=0.5
            )
            
        times = []
        token_counts = []
        
        for i in range(iterations):
            start_time = time.time()
            formatted = self.context_manager.get_formatted_context(session_id)
            end_time = time.time()
            
            elapsed = end_time - start_time
            times.append(elapsed)
            
            # Approximate token count of formatted output
            token_counts.append(len(formatted.split()))
            
        result = BenchmarkResult(
            operation="format_context",
            times=times,
            token_counts=token_counts
        )
        
        self.results["format_context"] = result
        return result
    
    async def run_prune_context_benchmark(self,
                                       iterations: int = 100,
                                       initial_blocks: int = 100) -> BenchmarkResult:
        """Benchmark context pruning operations"""
        times = []
        token_counts = []
        
        for i in range(iterations):
            # Create new session for each iteration
            session_id = self.context_manager.create_session("benchmark")
            
            # Add initial blocks
            for j in range(initial_blocks):
                content = f"Benchmark content block {j} " + "word " * 50
                self.context_manager.add_context(
                    session_id=session_id,
                    content=content,
                    context_type=ContextType.MEMORY,
                    relevance_score=j/initial_blocks  # Vary relevance
                )
                
            # Force context saturation to trigger pruning
            self.context_manager.sessions[session_id].metrics.context_saturation = 0.95
            
            # Measure pruning operation
            start_time = time.time()
            self.context_manager._prune_context(session_id)
            end_time = time.time()
            
            elapsed = end_time - start_time
            times.append(elapsed)
            
            # Count tokens in pruned context
            token_count = sum(b.token_count for b in 
                             self.context_manager.sessions[session_id].blocks)
            token_counts.append(token_count)
            
        result = BenchmarkResult(
            operation="prune_context",
            times=times,
            token_counts=token_counts
        )
        
        self.results["prune_context"] = result
        return result
    
    def plot_results(self, save_path: str = None) -> None:
        """Plot benchmark results"""
        if not self.results:
            raise ValueError("No benchmark results to plot")
            
        fig, axs = plt.subplots(1, 3, figsize=(18, 6))
        
        operations = list(self.results.keys())
        
        # Plot 1: Mean operation time
        mean_times = [self.results[op].mean_time for op in operations]
        axs[0].bar(operations, mean_times)
        axs[0].set_title('Mean Operation Time (s)')
        axs[0].set_ylabel('Seconds')
        
        # Plot 2: 95th percentile operation time
        p95_times = [self.results[op].p95_time for op in operations]
        axs[1].bar(operations, p95_times)
        axs[1].set_title('P95 Operation Time (s)')
        axs[1].set_ylabel('Seconds')
        
        # Plot 3: Tokens per second
        tps = [self.results[op].tokens_per_second for op in operations]
        axs[2].bar(operations, tps)
        axs[2].set_title('Throughput (tokens/s)')
        axs[2].set_ylabel('Tokens per Second')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path)
        
        plt.show()



Context-Aware MCP Implementation in Production Systems

Implementing MCP in production requires careful consideration of scaling, memory management, and performance optimization. Here’s how to implement these features in real-world systems:


Distributed Context Store

For large-scale deployments, a centralized context manager won’t suffice. Here’s a distributed MCP store implementation that scales horizontally:

import redis
import pickle
from typing import Dict, List, Optional, Union

class DistributedContextStore:
    """Distributed context store using Redis"""
    
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl  # Default TTL in seconds
    
    def _session_key(self, session_id: str) -> str:
        """Generate Redis key for a session"""
        return f"mcp:session:{session_id}"
    
    def _block_key(self, session_id: str, block_id: str) -> str:
        """Generate Redis key for a block"""
        return f"mcp:block:{session_id}:{block_id}"
    
    def store_session(self, session: ContextPackage) -> None:
        """Store session metadata in Redis"""
        session_key = self._session_key(session.session_id)
        
        # Store main session metadata
        session_data = {
            "agent_id": session.agent_id,
            "version": session.version,
            "trace_id": session.trace_id,
            "metrics": pickle.dumps(session.metrics),
            "block_ids": [b.id for b in session.blocks]
        }
        
        # Store in Redis with pipeline for performance
        pipe = self.redis.pipeline()
        pipe.hmset(session_key, session_data)
        pipe.expire(session_key, self.ttl)
        
        # Store each block separately for efficient partial updates
        for block in session.blocks:
            block_key = self._block_key(session.session_id, block.id)
            block_data = pickle.dumps(block)
            pipe.set(block_key, block_data)
            pipe.expire(block_key, self.ttl)
            
        pipe.execute()
    
    def retrieve_session(self, session_id: str) -> Optional[ContextPackage]:
        """Retrieve complete session from Redis"""
        session_key = self._session_key(session_id)
        
        # Get session metadata
        session_data = self.redis.hgetall(session_key)
        if not session_data:
            return None
            
        # Decode metrics
        metrics = pickle.loads(session_data[b"metrics"])
        
        # Get block IDs and retrieve blocks
        block_ids = session_data[b"block_ids"].decode().split(",")
        blocks = []
        
        pipe = self.redis.pipeline()
        for block_id in block_ids:
            block_key = self._block_key(session_id, block_id)
            pipe.get(block_key)
            
        block_data_list = pipe.execute()
        
        for block_data in block_data_list:
            if block_data:
                block = pickle.loads(block_data)
                blocks.append(block)
                
        # Reconstruct session
        return ContextPackage(
            session_id=session_id,
            agent_id=session_data[b"agent_id"].decode(),
            blocks=blocks,
            metrics=metrics,
            version=int(session_data[b"version"]),
            trace_id=session_data[b"trace_id"].decode()
        )
    
    def update_block(self, session_id: str, block: ContextBlock) -> None:
        """Update a specific block in a session"""
        block_key = self._block_key(session_id, block.id)
        block_data = pickle.dumps(block)
        
        pipe = self.redis.pipeline()
        pipe.set(block_key, block_data)
        pipe.expire(block_key, self.ttl)
        pipe.execute()
    
    def delete_session(self, session_id: str) -> None:
        """Delete a session and all its blocks"""
        session_key = self._session_key(session_id)
        
        # Get block IDs
        session_data = self.redis.hgetall(session_key)
        if not session_data:
            return
            
        block_ids = session_data[b"block_ids"].decode().split(",")
        
        # Delete all blocks and session
        pipe = self.redis.pipeline()
        for block_id in block_ids:
            block_key = self._block_key(session_id, block_id)
            pipe.delete(block_key)
            
        pipe.delete(session_key)
        pipe.execute()


Real-time Context Optimization

In high-volume systems, real-time context optimization becomes critical for cost and performance reasons:

import asyncio
import numpy as np
from collections import deque

class RealTimeContextOptimizer:
    """Real-time context optimization strategies for production systems"""
    
    def __init__(self, 
                context_manager: ContextManager,
                token_budget: int = 4096,
                optimization_interval: float = 0.1,  # seconds
                relevance_decay_factor: float = 0.95):
        self.context_manager = context_manager
        self.token_budget = token_budget
        self.optimization_interval = optimization_interval
        self.relevance_decay_factor = relevance_decay_factor
        self.running = False
        self.performance_metrics = deque(maxlen=1000)  # Keep last 1000 metrics
    
    async def start_optimization_loop(self, session_id: str) -> None:
        """Start continuous optimization in background"""
        self.running = True
        try:
            while self.running:
                start_time = time.time()
                
                # Perform optimization
                await self.optimize_session(session_id)
                
                # Measure performance
                end_time = time.time()
                self.performance_metrics.append(end_time - start_time)
                
                # Sleep until next interval
                await asyncio.sleep(self.optimization_interval)
        except Exception as e:
            print(f"Optimization loop error: {e}")
            self.running = False
    
    def stop_optimization_loop(self) -> None:
        """Stop background optimization"""
        self.running = False
    
    async def optimize_session(self, session_id: str) -> None:
        """Perform a single optimization cycle"""
        if session_id not in self.context_manager.sessions:
            return
            
        session = self.context_manager.sessions[session_id]
        
        # Apply time-based relevance decay
        self._apply_relevance_decay(session)
        
        # Update relevance based on recency
        self._update_recency_relevance(session)
        
        # Enforce token budget
        if session.metrics.total_tokens > self.token_budget:
            self._enforce_token_budget(session)
            
        # Re-calculate metrics
        session.calculate_metrics()
    
    def _apply_relevance_decay(self, session: ContextPackage) -> None:
        """Apply time-based decay to relevance scores"""
        current_time = int(time.time() * 1000)
        
        for block in session.blocks:
            # Don't decay system blocks
            if block.type == ContextType.SYSTEM:
                continue
                
            # Calculate age in seconds
            age_seconds = (current_time - block.timestamp) / 1000
            
            # Apply exponential decay
            decay = self.relevance_decay_factor ** (age_seconds / 60)  # Decay per minute
            block.relevance_score *= decay
            
            # Ensure minimum relevance
            block.relevance_score = max(0.1, block.relevance_score)
    
    def _update_recency_relevance(self, session: ContextPackage) -> None:
        """Boost relevance of recent conversational turns"""
        # Sort blocks by timestamp
        recent_blocks = sorted(
            [b for b in session.blocks if b.type in (ContextType.USER, ContextType.AGENT)],
            key=lambda b: b.timestamp,
            reverse=True
        )
        
        # Boost most recent conversation turns
        for i, block in enumerate(recent_blocks[:10]):  # Consider last 10 turns
            recency_boost = 0.95 ** i  # Exponential decay based on recency
            block.relevance_score = min(1.0, block.relevance_score + recency_boost)
    
    def _enforce_token_budget(self, session: ContextPackage) -> None:
        """Ensure token count stays within budget"""
        # Priority order: SYSTEM > recent USER/AGENT > TOOL > MEMORY > KNOWLEDGE
        system_blocks = [b for b in session.blocks if b.type == ContextType.SYSTEM]
        
        # Get non-system blocks, sorted by relevance
        non_system_blocks = [b for b in session.blocks if b.type != ContextType.SYSTEM]
        non_system_blocks.sort(key=lambda b: b.relevance_score, reverse=True)
        
        # Calculate tokens in system blocks
        system_tokens = sum(b.token_count for b in system_blocks)
        
        # Calculate how many tokens we can use for non-system blocks
        available_tokens = self.token_budget - system_tokens
        
        # Keep adding blocks until we hit the limit
        kept_blocks = system_blocks.copy()
        used_tokens = system_tokens
        
        for block in non_system_blocks:
            if used_tokens + block.token_count <= self.token_budget:
                kept_blocks.append(block)
                used_tokens += block.token_count
            # else discard this block
                
        # Update session blocks
        session.blocks = kept_blocks



Hypothetical Implementation Example: Telecom Customer Service

Let’s explore a hypothetical scenario showing how MCP implementation could transform a telecom customer service system handling 50,000+ customer interactions daily: (I am working on a more realistic experimental setup, the result from which I would share in the coming weeks, stay tuned!)

Hypothetical Baseline (Before MCP)

Consider a telecom company using a conventional LLM-based customer service system with these theoretical performance characteristics:

  1. Average completion time: 12.5 seconds per query
  2. Context window utilization: 32% (wasted tokens)
  3. Coherence over multi-turn conversations: 68% (measured by user satisfaction)
  4. Agent handoff success rate: 52% (context lost during transfers)
  5. Daily token costs: $4,200 (for 50,000 interactions)



Hypothetical MCP Implementation Approach

In this scenario, we could implement MCP and optimize context management with the following approach:

async def telecom_service_enhancement():
    # Initialize MCP components
    redis_url = "redis://redis-master.production:6379/0"
    context_store = DistributedContextStore(redis_url)
    
    # Create optimized context manager
    optimized_manager = ContextManager(max_tokens=8192)
    
    # Inject relevance calculator
    relevance_calculator = ContextualRelevanceCalculator()
    
    # Initialize real-time optimizer
    optimizer = RealTimeContextOptimizer(
        context_manager=optimized_manager,
        token_budget=4096,
        optimization_interval=0.05,
        relevance_decay_factor=0.98
    )
    
    # Create agents with MCP
    intent_agent = MCPEnabledIntentAgent(
        llm_client=anthropic_client,
        agent_role="intent_agent",
        context_manager=optimized_manager
    )
    
    technical_agent = MCPEnabledTechnicalAgent(
        llm_client=anthropic_client,
        network_api=network_api,
        context_manager=optimized_manager
    )
    
    # Create coordinator with context transfer capabilities
    coordinator = AgentCoordinator(
        agents={
            "intent": intent_agent,
            "technical": technical_agent,
            # Other specialized agents
        },
        context_manager=optimized_manager,
        context_store=context_store
    )
    
    # Initialize metrics collection
    metrics_collector = MetricsCollector(
        prometheus_endpoint="http://prometheus.monitoring:9090/metrics"
    )
    
    # Start service
    return await start_service(coordinator, metrics_collector)



Projected MCP Implementation Results

Based on the architecture described above, this hypothetical MCP-enabled system could potentially achieve these performance improvements:

  1. Average completion time: Potentially reduced seconds per query
  2. Context window utilization: Could increase to X%
  3. Coherence over multi-turn conversations: Might improve to Y%
  4. Agent handoff success rate: Could increase to Z%
  5. Daily token costs: Potentially reduced to $P (Q% cost savings)

These theoretical improvements would result from:

  1. Efficient context packaging and transmission between agents
  2. Dynamic relevance scoring to prioritize important information
  3. Standardized context exchange protocols enabling seamless agent handoffs
  4. Automatic optimization of context window utilization
  5. Reduced token waste through intelligent pruning



Key Performance Considerations

To implement MCP in your own production system, consider these performance best practices:


Memory Management

class MCPMemoryOptimizer:
    """Optimizes memory usage for MCP in production"""
    
    def __init__(self, context_manager: ContextManager):
        self.context_manager = context_manager
        
    def optimize_memory_usage(self, session_id: str) -> float:
        """Optimize memory usage and return memory saved in MB"""
        session = self.context_manager.sessions[session_id]
        
        # Calculate current memory usage
        initial_memory = self._estimate_memory_usage(session)
        
        # Perform optimizations
        self._deduplicate_content(session)
        self._compress_metadata(session)
        self._truncate_long_blocks(session)
        
        # Calculate new memory usage
        final_memory = self._estimate_memory_usage(session)
        
        # Return memory saved in MB
        return (initial_memory - final_memory) / (1024 * 1024)
    
    def _estimate_memory_usage(self, session: ContextPackage) -> int:
        """Estimate memory usage in bytes"""
        memory_usage = 0
        
        # Session metadata
        memory_usage += len(session.session_id) + len(session.agent_id) + 16  # Base overhead
        
        # Blocks
        for block in session.blocks:
            # Content is the main memory user
            memory_usage += len(block.content) * 2  # Unicode overhead
            
            # Metadata
            if block.metadata:
                for key, value in block.metadata.items():
                    memory_usage += len(key) + len(value)
                    
            # Other fields
            memory_usage += 64  # Base block overhead
            
        return memory_usage
    
    def _deduplicate_content(self, session: ContextPackage) -> None:
        """Remove duplicate content in blocks"""
        content_set = set()
        blocks_to_keep = []
        
        for block in session.blocks:
            # Always keep system blocks
            if block.type == ContextType.SYSTEM:
                blocks_to_keep.append(block)
                continue
                
            # Check for duplicate content
            if block.content in content_set:
                continue  # Skip duplicate
                
            content_set.add(block.content)
            blocks_to_keep.append(block)
            
        session.blocks = blocks_to_keep
    
    def _compress_metadata(self, session: ContextPackage) -> None:
        """Compress metadata by removing unnecessary fields"""
        for block in session.blocks:
            if not block.metadata:
                continue
                
            # Remove empty values
            block.metadata = {k: v for k, v in block.metadata.items() if v}
            
            # Truncate long values
            for key, value in block.metadata.items():
                if len(value) > 100:
                    block.metadata[key] = value[:97] + "..."
    
    def _truncate_long_blocks(self, session: ContextPackage) -> None:
        """Truncate extremely long content blocks"""
        for block in session.blocks:
            # Truncate blocks longer than 1000 tokens (approximately)
            if block.token_count > 1000:
                words = block.content.split()
                truncated_content = " ".join(words[:950]) + " [... content truncated ...]"
                block.content = truncated_content
                block.token_count = len(truncated_content.split())



Concurrency Management

For high-throughput systems, managing concurrency is critical:

import asyncio
from contextlib import asynccontextmanager

class MCPConcurrencyManager:
    """Manages concurrent access to MCP resources"""
    
    def __init__(self, 
                max_concurrent_sessions: int = 1000,
                max_concurrent_contexts: int = 5000):
        self.session_semaphore = asyncio.Semaphore(max_concurrent_sessions)
        self.context_semaphore = asyncio.Semaphore(max_concurrent_contexts)
        self.session_locks = {}
        
    @asynccontextmanager
    async def session_context(self, session_id: str):
        """Manage concurrent access to a session"""
        # Create lock for this session if doesn't exist
        if session_id not in self.session_locks:
            self.session_locks[session_id] = asyncio.Lock()
            
        # Acquire session semaphore and lock
        async with self.session_semaphore:
            async with self.session_locks[session_id]:
                yield
                
    @asynccontextmanager
    async def context_operation(self):
        """Manage concurrent context operations"""
        async with self.context_semaphore:
            yield
            
    def cleanup_session(self, session_id: str):
        """Remove locks for a session when it's no longer needed"""
        if session_id in self.session_locks:
            del self.session_locks[session_id]



Production Deployment Strategy

To deploy MCP in production, we recommend this phased approach:

  1. Pilot phase: Implement MCP for a single agent type with low traffic
  2. Gradual rollout: Extend to specialized agents one by one
  3. A/B testing: Compare performance metrics between MCP and non-MCP systems
  4. Full deployment: Scale horizontally with distributed context stores
  5. Continuous optimization: Implement real-time monitors to tune parameters

The pilot deployment may look like this:

async def pilot_deployment():
    # Initialize distributed components with lower capacity
    redis_url = "redis://redis-staging:6379/0"
    context_store = DistributedContextStore(redis_url, ttl=1800)  # 30 minute TTL
    
    # Create context manager with conservative limits
    context_manager = ContextManager(max_tokens=4096)
    
    # Configure for 5% of traffic
    traffic_ratio = 0.05
    
    # Create MCP-enabled intent agent
    intent_agent = MCPEnabledIntentAgent(
        llm_client=anthropic_client,
        agent_role="intent_agent",
        context_manager=context_manager
    )
    
    # Create monitoring
    monitor = ProductionMonitor(
        datadog_api_key="YOUR_API_KEY",
        experiment_name="mcp_pilot",
        sample_rate=0.1  # Sample 10% of interactions for detailed analysis
    )
    
    # Start pilot with traffic splitting
    return await start_pilot(
        agent=intent_agent,
        context_store=context_store,
        traffic_ratio=traffic_ratio,
        monitor=monitor
    )



Concluding thoughts

Model Context Protocol represents a significant advance in autonomous multi-agent system architecture. By standardizing context management, MCP solves critical challenges around context optimization, agent collaboration, and memory management.

Key takeaways from our implementation:

  1. Standardization matters: Unified context protocols enable seamless interoperability between diverse agent systems
  2. Memory optimization is critical: Real-time context management directly impacts cost and performance
  3. Production deployments require careful scaling: Distributed context stores and concurrency management are essential
  4. Relevance scoring drives optimization: Dynamic scoring algorithms significantly improve context window utilization

While this telecom example is hypothetical, the architectural patterns and implementation strategies described show how MCP could deliver tangible business benefits through reduced costs, faster responses, and improved customer satisfaction in real-world applications.

As autonomous agent systems continue to evolve, Model Context Protocol will likely become a standard component of production agent architectures, enabling more sophisticated agent interactions and improved performance characteristics.

Future directions for MCP development include:

  1. Cross-modal context representation: Supporting efficient encoding of multimodal content
  2. Federated context management: Enabling privacy-preserving context sharing across organizations
  3. Self-optimizing context strategies: Using reinforcement learning to dynamically tune context parameters

By implementing MCP in your own systems, you can achieve similar performance improvements while establishing a foundation for future enhancements to your agent architecture.


Want to learn more about implementing MCP in your organization? Check out my previous articles on autonomous multi-agent systems and context-aware data pipelines.




    Enjoy Reading This Article?

    Here are some more articles you might like to read next:

  • Engineering Autonomous Multi-Agent Systems - A Technical Deep Dive into Telecom Customer Service
  • Engineering Multi-Agent Systems - A Retail Banking Case Study
  • ETLC 2.0 - Building Context-Aware Data Pipelines
  • The End of Data Warehouses? Enter the Age of Dynamic Context Engines
  • (Part 3/3) - Reimagining ETL with Large Language Models—The Path to Intelligent Pipelines