ModelRouter Docs¶
The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.
Key Features¶
- Dynamic model selection based on task complexity and requirements
 - Multi-provider support (OpenAI, Anthropic, Google, etc.)
 - Concurrent and asynchronous execution capabilities
 - Batch processing with memory
 - Automatic error handling and retries
 - Provider-aware routing
 - Cost optimization
 
Constructor Arguments¶
| Parameter | Type | Default | Description | 
|---|---|---|---|
| system_prompt | str | model_router_system_prompt | Custom prompt for guiding model selection behavior | 
| max_tokens | int | 4000 | Maximum token limit for model outputs | 
| temperature | float | 0.5 | Control parameter for response randomness (0.0-1.0) | 
| max_workers | int/str | 10 | Maximum concurrent workers ("auto" for CPU count) | 
| api_key | str | None | API key for model access | 
| max_loops | int | 1 | Maximum number of refinement iterations | 
| *args | Any | None | Additional positional arguments | 
| **kwargs | Any | None | Additional keyword arguments | 
Core Methods¶
run(task: str) -> str¶
Executes a single task through the model router with memory and refinement capabilities.
Installation¶
- Install the latest version of swarms using pip:
 
- Setup your API Keys in your .env file with the following:
 
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format
from swarms import ModelRouter
router = ModelRouter()
# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")
# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")
batch_run(tasks: list) -> list¶
Executes multiple tasks sequentially with result aggregation.
# Multiple analysis tasks
tasks = [
    "Analyze Q1 financial performance",
    "Predict Q2 market trends",
    "Evaluate competitor strategies",
    "Generate growth recommendations"
]
results = router.batch_run(tasks)
# Process results
for task, result in zip(tasks, results):
    print(f"Task: {task}\nResult: {result}\n")
concurrent_run(tasks: list) -> list¶
Parallel execution of multiple tasks using thread pooling.
import asyncio
from typing import List
# Define multiple concurrent tasks
analysis_tasks = [
    "Perform technical analysis of AAPL stock",
    "Analyze market sentiment from social media",
    "Generate trading signals",
    "Calculate risk metrics"
]
# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)
# Process results with error handling
for task, result in zip(analysis_tasks, results):
    try:
        processed_result = process_analysis(result)
        save_to_database(processed_result)
    except Exception as e:
        log_error(f"Error processing {task}: {str(e)}")
async_run(task: str) -> asyncio.Task¶
Asynchronous task execution with coroutine support.
async def process_data_stream():
    tasks = []
    async for data in data_stream:
        task = await router.async_run(f"Process data: {data}")
        tasks.append(task)
    results = await asyncio.gather(*tasks)
    return results
# Usage in async context
async def main():
    router = ModelRouter()
    results = await process_data_stream()
Advanced Usage Examples¶
Financial Analysis System¶
from swarms import ModelRouter
from typing import Dict, List
import pandas as pd
class FinancialAnalysisSystem:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.3,  # Lower temperature for more deterministic outputs
            max_tokens=8000,  # Higher token limit for detailed analysis
            max_loops=2  # Allow for refinement iteration
        )
    def analyze_company_financials(self, financial_data: Dict) -> Dict:
        analysis_task = f"""
        Perform comprehensive financial analysis:
        Financial Metrics:
        - Revenue: ${financial_data['revenue']}M
        - EBITDA: ${financial_data['ebitda']}M
        - Debt/Equity: {financial_data['debt_equity']}
        - Working Capital: ${financial_data['working_capital']}M
        Required Analysis:
        1. Profitability assessment
        2. Liquidity analysis
        3. Growth projections
        4. Risk evaluation
        5. Investment recommendations
        Provide detailed insights and actionable recommendations.
        """
        result = self.router.run(analysis_task)
        return self._parse_analysis_result(result)
    def _parse_analysis_result(self, result: str) -> Dict:
        # Implementation of result parsing
        pass
# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
    'revenue': 150,
    'ebitda': 45,
    'debt_equity': 0.8,
    'working_capital': 25
}
analysis = analyzer.analyze_company_financials(company_data)
Healthcare Data Processing Pipeline¶
from swarms import ModelRouter
import pandas as pd
from typing import List, Dict
class MedicalDataProcessor:
    def __init__(self):
        self.router = ModelRouter(
            max_workers="auto",  # Automatic worker scaling
            temperature=0.2,     # Conservative temperature for medical analysis
            system_prompt="""You are a specialized medical data analyzer focused on:
            1. Clinical terminology interpretation
            2. Patient data analysis
            3. Treatment recommendation review
            4. Medical research synthesis"""
        )
    async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
        analysis_tasks = []
        for record in records:
            task = f"""
            Analyze patient record:
            - Age: {record['age']}
            - Symptoms: {', '.join(record['symptoms'])}
            - Vital Signs: {record['vitals']}
            - Medications: {', '.join(record['medications'])}
            - Lab Results: {record['lab_results']}
            Provide:
            1. Symptom analysis
            2. Medication interaction check
            3. Lab results interpretation
            4. Treatment recommendations
            """
            analysis_tasks.append(task)
        results = await asyncio.gather(*[
            self.router.async_run(task) for task in analysis_tasks
        ])
        return [self._parse_medical_analysis(r) for r in results]
    def _parse_medical_analysis(self, analysis: str) -> Dict:
        # Implementation of medical analysis parsing
        pass
# Usage
async def main():
    processor = MedicalDataProcessor()
    patient_records = [
        {
            'age': 45,
            'symptoms': ['fever', 'cough', 'fatigue'],
            'vitals': {'bp': '120/80', 'temp': '38.5C'},
            'medications': ['lisinopril', 'metformin'],
            'lab_results': 'WBC: 11,000, CRP: 2.5'
        }
        # More records...
    ]
    analyses = await processor.process_patient_records(patient_records)
Natural Language Processing Pipeline¶
from swarms import ModelRouter
from typing import List, Dict
import asyncio
class NLPPipeline:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.4,
            max_loops=2
        )
    def process_documents(self, documents: List[str]) -> List[Dict]:
        tasks = [self._create_nlp_task(doc) for doc in documents]
        results = self.router.concurrent_run(tasks)
        return [self._parse_nlp_result(r) for r in results]
    def _create_nlp_task(self, document: str) -> str:
        return f"""
        Perform comprehensive NLP analysis:
        Text: {document}
        Required Analysis:
        1. Entity recognition
        2. Sentiment analysis
        3. Topic classification
        4. Key phrase extraction
        5. Intent detection
        Provide structured analysis with confidence scores.
        """
    def _parse_nlp_result(self, result: str) -> Dict:
        # Implementation of NLP result parsing
        pass
# Usage
pipeline = NLPPipeline()
documents = [
    "We're extremely satisfied with the new product features!",
    "The customer service response time needs improvement.",
    "Looking to upgrade our subscription plan next month."
]
analyses = pipeline.process_documents(documents)
Available Models and Use Cases¶
| Model | Provider | Optimal Use Cases | Characteristics | 
|---|---|---|---|
| gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff | 
| claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs | 
| gemini-pro | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities | |
| mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance | 
| deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities | 
Provider Capabilities¶
| Provider | Strengths | Best For | Integration Notes | 
|---|---|---|---|
| OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup | 
| Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting | 
| Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available | |
| Groq | High-speed inference | Real-time applications | Optimized for specific models | 
| DeepSeek | Specialized reasoning | Scientific computing | Custom API integration | 
| Mistral | Open source flexibility | General applications | Self-hosted options available | 
Performance Optimization Tips¶
- Token Management
 - Set appropriate max_tokens based on task complexity
 - Monitor token usage for cost optimization
 - 
Use streaming for long outputs
 - 
Concurrency Settings
 - Adjust max_workers based on system resources
 - Use "auto" workers for optimal CPU utilization
 - 
Monitor memory usage with large batch sizes
 - 
Temperature Tuning
 - Lower (0.1-0.3) for factual/analytical tasks
 - Higher (0.7-0.9) for creative tasks
 - 
Mid-range (0.4-0.6) for balanced outputs
 - 
System Prompts
 - Customize for specific domains
 - Include relevant context
 - Define clear output formats
 
Dependencies¶
- asyncio: Asynchronous I/O support
 - concurrent.futures: Thread pool execution
 - pydantic: Data validation
 - litellm: LLM interface standardization