创建使用MCP服务器的AI Agent(第4部分)
现在到了神奇的部分!在第3部分中,我们构建了一个可以收集和分析反馈的MCP服务器。今天,我们将创建一个使用这个服务器的AI Agent来智能处理客户交互。到最后,您将拥有一个可工作的AI系统!
我们要构建什么
我们将创建一个AI Agent,它:
连接到我们的MCP服务器
使用LLM理解客户需求
自动收集和分析反馈
生成智能报告
建议业务改进
第1步:设置LLM集成
首先,让我们安装所需的内容:
- # 确保您的虚拟环境已激活
- pip install openai python-dotenv aiohttp
创建一个.env文件来存储您的API密钥:
- # .env文件
- OPENAI_API_KEY=your-api-key-here
- # 从以下地址获取您的密钥:https://platform.openai.com/api-keys
第2步:构建我们的AI Agent
创建feedback_agent.py:
- #!/usr/bin/env python3
- """
- AI Agent for Customer Feedback System
- Intelligently handles feedback collection and analysis
- """
- import os
- import json
- import asyncio
- from typing import Dict, Any, List
- from datetime import datetime
- from dotenv import load_dotenv
- # Load environment variables
- load_dotenv()
- # For LLM integration
- from openai import AsyncOpenAI
- # For MCP client
- from mcp import ClientSession, StdioServerParameters
- from mcp.client.stdio import stdio_client
- class FeedbackAgent:
- def __init__(self):
- self.name = "Feedback Assistant"
- self.llm = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
- self.mcp_session = None
- self.server_params = StdioServerParameters(
- command="python",
- args=["feedback_server.py"]
- )
- async def connect_to_server(self):
- """Connect to our MCP server"""
- print(f"🔌 Connecting to feedback server...")
- async with stdio_client(self.server_params) as (read, write):
- async with ClientSession(read, write) as session:
- self.mcp_session = session
- # Initialize connection
- await session.initialize()
- # Get available tools and resources
- tools = await session.list_tools()
- resources = await session.list_resources()
- print(f"✅ Connected! Found {len(tools)} tools and {len(resources)} resources")
- # Keep the session active
- await self.run_agent_loop()
- async def think(self, task: str) -> str:
- """Use LLM to understand and plan actions"""
- response = await self.llm.chat.completions.create(
- model="gpt-3.5-turbo",
- messages=[
- {
- "role": "system",
- "content": f"""You are {self.name}, an AI assistant for a café.
- You help collect and analyze customer feedback.
- Be friendly, professional, and insightful."""
- },
- {
- "role": "user",
- "content": task
- }
- ],
- temperature=0.7
- )
- return response.choices[0].message.content
- async def collect_customer_feedback(self, conversation: str) -> Dict[str, Any]:
- """Intelligently extract feedback from conversation"""
- # Use LLM to extract information
- extraction_prompt = f"""
- Extract the following from this customer conversation:
- 1. Customer name (if mentioned)
- 2. The main feedback points
- 3. Overall rating (1-5)
- 4. Key topics mentioned
- Conversation:
- {conversation}
- Return as JSON format.
- """
- response = await self.llm.chat.completions.create(
- model="gpt-3.5-turbo",
- messages=[
- {"role": "system", "content": "Extract information and return valid JSON only."},
- {"role": "user", "content": extraction_prompt}
- ],
- temperature=0.3,
- response_format={"type": "json_object"}
- )
- extracted = json.loads(response.choices[0].message.content)
- # Use MCP tool to collect feedback
- result = await self.mcp_session.call_tool(
- "collect_feedback",
- arguments={
- "customer_name": extracted.get("customer_name", "Anonymous"),
- "feedback": extracted.get("feedback", conversation),
- "rating": extracted.get("rating", 3)
- }
- )
- return {
- "status": "collected",
- "details": extracted,
- "server_response": result
- }
- async def analyze_feedback_trends(self) -> str:
- """Analyze all feedback and generate insights"""
- # Get recent feedback from MCP server
- recent_feedback = await self.mcp_session.read_resource("feedback://recent")
- summary_data = await self.mcp_session.read_resource("feedback://summary")
- # Use LLM to generate insights
- analysis_prompt = f"""
- Analyze this customer feedback data and provide:
- 1. Key themes and patterns
- 2. Areas needing immediate attention
- 3. Positive aspects to maintain
- 4. Specific recommendations for improvement
- Recent Feedback:
- {recent_feedback}
- Summary Statistics:
- {summary_data}
- """
- response = await self.llm.chat.completions.create(
- model="gpt-3.5-turbo",
- messages=[
- {
- "role": "system",
- "content": "You are a business analyst. Provide actionable insights."
- },
- {"role": "user", "content": analysis_prompt}
- ],
- temperature=0.5
- )
- return response.choices[0].message.content
- async def handle_customer_interaction(self, message: str) -> str:
- """Main interaction handler"""
- # Determine intent
- intent_prompt = f"""
- Classify this message intent:
- - 'give_feedback': Customer wants to share feedback
- - 'check_status': Customer asking about their previous feedback
- - 'general_question': Other questions
- Message: {message}
- Return only the intent classification.
- """
- intent_response = await self.llm.chat.completions.create(
- model="gpt-3.5-turbo",
- messages=[
- {"role": "system", "content": "Classify intent. Return only the classification."},
- {"role": "user", "content": intent_prompt}
- ],
- temperature=0.1
- )
- intent = intent_response.choices[0].message.content.strip().lower()
- if "give_feedback" in intent:
- # Collect feedback
- feedback_result = await self.collect_customer_feedback(message)
- # Generate friendly response
- response = await self.think(
- f"Customer gave feedback. Details: {feedback_result}. "
- "Thank them and mention any immediate actions we'll take."
- )
- return response
- elif "check_status" in intent:
- # Get feedback summary
- summary = await self.mcp_session.read_resource("feedback://summary")
- response = await self.think(
- f"Customer asking about feedback status. Our summary: {summary}. "
- "Provide a helpful update."
- )
- return response
- else:
- # General response
- return await self.think(f"Respond helpfully to: {message}")
- async def run_agent_loop(self):
- """Main agent interaction loop"""
- print(f"\n🤖 {self.name} is ready! Type 'quit' to exit.")
- print("Try: 'Hi, I'm Sarah. I loved the new latte recipe! 5 stars!'")
- while True:
- try:
- # Get user input
- user_input = input("\n👤 You: ").strip()
- if user_input.lower() in ['quit', 'exit', 'bye']:
- print(f"👋 {self.name}: Goodbye! Have a great day!")
- break
- if user_input.lower() == 'analyze':
- # Run analysis
- print(f"\n📊 {self.name}: Analyzing feedback trends...")
- analysis = await self.analyze_feedback_trends()
- print(f"📈 Analysis:\n{analysis}")
- continue
- # Handle regular interaction
- print(f"\n🤖 {self.name}: Processing...")
- response = await self.handle_customer_interaction(user_input)
- print(f"🤖 {self.name}: {response}")
- except KeyboardInterrupt:
- print(f"\n👋 {self.name}: Goodbye!")
- break
- except Exception as e:
- print(f"❌ Error: {e}")
- # Main entry point
- async def main():
- agent = FeedbackAgent()
- await agent.connect_to_server()
- if __name__ == "__main__":
- asyncio.run(main())
第3步:添加工作流引擎
创建workflow_engine.py来管理自动化任务:
- #!/usr/bin/env python3
- """
- Workflow Engine for Feedback System
- Manages automated tasks and scheduled operations
- """
- import asyncio
- from typing import Dict, Any
- from datetime import datetime, timedelta
- class FeedbackWorkflow:
- def __init__(self, agent):
- self.agent = agent
- self.workflows = {
- "daily_analysis": self.daily_analysis_workflow,
- "negative_feedback_alert": self.negative_feedback_alert,
- "weekly_report": self.weekly_report_workflow
- }
- async def daily_analysis_workflow(self):
- """Run daily feedback analysis"""
- print(f"\n📊 Running daily analysis...")
- # Get current feedback summary
- summary = await self.agent.mcp_session.read_resource("feedback://summary")
- # Analyze trends
- analysis = await self.agent.analyze_feedback_trends()
- print(f"📈 Daily Analysis Complete:\n{analysis}")
- # Check for urgent issues
- if "negative" in summary.lower() and "50" in summary:
- print("⚠️ High negative feedback detected!")
- await self.negative_feedback_alert()
- async def negative_feedback_alert(self, feedback_data: Dict[str, Any] = None):
- """Handle negative feedback alerts"""
- print(f"\n🚨 Negative feedback alert!")
- if feedback_data:
- # Create action plan
- response_plan = await self.agent.think(
- f"Create action plan for this negative feedback: {feedback_data['feedback']}. "
- "Include: immediate response, follow-up actions, and prevention measures."
- )
- print(f"📋 Action Plan:\n{response_plan}")
- # In production:
- # - Send alert to manager
- # - Create follow-up task
- # - Track resolution
- return response_plan
- async def weekly_report_workflow(self):
- """Generate comprehensive weekly report"""
- print(f"\n📊 Generating weekly report...")
- # Get all data
- summary = await self.agent.mcp_session.read_resource("feedback://summary")
- recent = await self.agent.mcp_session.read_resource("feedback://recent")
- # Generate executive summary
- report = await self.agent.think(
- f"Create executive summary for this week's feedback. "
- f"Data: {summary}\nRecent examples: {recent}\n"
- "Include: key metrics, trends, recommendations, and success stories."
- )
- print(f"📄 Weekly Report:\n{report}")
- return report
- async def run_workflow(self, workflow_name: str, *args, **kwargs):
- """Execute a specific workflow"""
- if workflow_name in self.workflows:
- return await self.workflows[workflow_name](*args, **kwargs)
- else:
- raise ValueError(f"Unknown workflow: {workflow_name}")
- class WorkflowScheduler:
- def __init__(self, workflow_engine):
- self.engine = workflow_engine
- self.scheduled_tasks = []
- def schedule_daily(self, hour: int, minute: int, workflow: str):
- """Schedule daily workflow"""
- self.scheduled_tasks.append({
- "type": "daily",
- "time": {"hour": hour, "minute": minute},
- "workflow": workflow
- })
- def schedule_on_event(self, event_type: str, workflow: str):
- """Schedule event-triggered workflow"""
- self.scheduled_tasks.append({
- "type": "event",
- "trigger": event_type,
- "workflow": workflow
- })
- async def run(self):
- """Run the scheduler"""
- print("⏰ Workflow scheduler started")
- # For demo: simulate some workflows
- await asyncio.sleep(2)
- await self.engine.run_workflow("daily_analysis")
- # Simulate negative feedback
- await asyncio.sleep(3)
- await self.engine.run_workflow("negative_feedback_alert", {
- "customer_name": "Demo User",
- "feedback": "The coffee was cold and service was slow",
- "rating": 2
- })
第4步:整合所有组件
创建run_system.py来运行完整系统:
- #!/usr/bin/env python3
- """
- Run the Complete AI Feedback System
- """
- import asyncio
- import subprocess
- import time
- from feedback_agent import FeedbackAgent
- from workflow_engine import FeedbackWorkflow, WorkflowScheduler
- async def run_complete_system():
- """Run server and agent together"""
- # Start the MCP server in background
- print("🚀 Starting MCP Feedback Server...")
- server_process = subprocess.Popen(
- ["python", "feedback_server.py"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
- # Give server time to start
- time.sleep(2)
- try:
- # Create and connect agent
- print("🤖 Starting AI Agent...")
- agent = FeedbackAgent()
- # Create workflow engine
- workflow = FeedbackWorkflow(agent)
- scheduler = WorkflowScheduler(workflow)
- # Schedule some workflows
- scheduler.schedule_daily(9, 0, "daily_analysis")
- scheduler.schedule_on_event("negative_feedback", "negative_feedback_alert")
- scheduler.schedule_daily(17, 0, "weekly_report")
- # Run the agent
- await agent.connect_to_server()
- finally:
- # Clean up
- server_process.terminate()
- print("\n👋 System shutdown complete")
- if __name__ == "__main__":
- print("🎯 Complete AI Feedback System Starting...\n")
- asyncio.run(run_complete_system())
第5步:测试您的AI系统
让我们测试完整系统:
- # 确保您的.env文件包含您的OpenAI API密钥
- python run_system.py
尝试这些交互:
提供反馈:
"Hi, I'm Sarah. I loved the new latte recipe! 5 stars!"
"The service was terrible today. My order took 30 minutes."
检查分析:
输入:analyze
提问:
"What's the overall feedback trend?"
"How many people gave feedback today?"
我们实现的关键概念
1. Agent-服务器通信
Agent连接到MCP服务器并可以:
读取资源(反馈数据)
调用工具(收集反馈、分析情感)
智能处理结果
2. LLM集成
我们使用GPT-3.5来:
理解自然语言
从对话中提取信息
生成洞察和建议
创建个性化响应
3. 自动化工作流
我们的系统可以:
运行定时分析
响应事件(负面反馈)
自动生成报告
4. 智能决策制定
Agent:
分类用户意图
选择适当的行动
提供上下文响应
下一步是什么?
在我们的最后一部分(第5部分)中,我们将:
使用真实数据库添加数据持久性
实现高级功能(多语言支持、语音输入)
为生产使用部署系统
探索扩展策略
您现在已经构建了一个功能性的AI系统!您将如何扩展它以满足您的业务需求?哪些其他工作流会有帮助?
挑战:扩展您的系统
尝试添加这些功能:
负面反馈的电子邮件通知
情感趋势可视化
客户响应模板
多位置支持
在评论中分享您的扩展!
技术要点总结
AI Agent的核心功能
智能交互:使用LLM理解用户意图
自动反馈收集:从对话中提取结构化信息
趋势分析:生成业务洞察和建议
工作流自动化:定时任务和事件触发
系统架构特点
模块化设计:Agent、服务器、工作流引擎分离
异步处理:支持并发操作和实时响应
智能决策:基于LLM的意图识别和响应生成
可扩展性:易于添加新功能和工作流
实际应用价值
客户服务自动化:24/7智能客服支持
反馈管理:自动收集、分析和响应客户反馈
业务洞察:实时趋势分析和改进建议
工作流优化:自动化重复任务和报告生成
开发最佳实践
环境配置:使用.env文件管理API密钥
错误处理:完善的异常处理机制
日志记录:详细的系统运行日志
测试验证:完整的系统集成测试
通过这个教程,您已经掌握了构建完整AI系统的核心技能,可以开始创建自己的智能应用了!
