创建使用MCP服务器的AI Agent(第4部分)

现在到了神奇的部分!在第3部分中,我们构建了一个可以收集和分析反馈的MCP服务器。今天,我们将创建一个使用这个服务器的AI Agent来智能处理客户交互。到最后,您将拥有一个可工作的AI系统!
我们要构建什么
我们将创建一个AI Agent,它:
连接到我们的MCP服务器
使用LLM理解客户需求
自动收集和分析反馈
生成智能报告
建议业务改进
第1步:设置LLM集成
首先,让我们安装所需的内容:
  1. # 确保您的虚拟环境已激活
  2. pip install openai python-dotenv aiohttp
bash
创建一个.env文件来存储您的API密钥:
  1. # .env文件
  2. OPENAI_API_KEY=your-api-key-here
  3. # 从以下地址获取您的密钥:https://platform.openai.com/api-keys
第2步:构建我们的AI Agent
创建feedback_agent.py
  1. #!/usr/bin/env python3
  2. """
  3. AI Agent for Customer Feedback System
  4. Intelligently handles feedback collection and analysis
  5. """

  6. import os
  7. import json
  8. import asyncio
  9. from typing import Dict, Any, List
  10. from datetime import datetime
  11. from dotenv import load_dotenv

  12. # Load environment variables
  13. load_dotenv()

  14. # For LLM integration
  15. from openai import AsyncOpenAI

  16. # For MCP client
  17. from mcp import ClientSession, StdioServerParameters
  18. from mcp.client.stdio import stdio_client

  19. class FeedbackAgent:
  20. def __init__(self):
  21. self.name = "Feedback Assistant"
  22. self.llm = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
  23. self.mcp_session = None
  24. self.server_params = StdioServerParameters(
  25. command="python",
  26. args=["feedback_server.py"]
  27. )

  28. async def connect_to_server(self):
  29. """Connect to our MCP server"""
  30. print(f"🔌 Connecting to feedback server...")

  31. async with stdio_client(self.server_params) as (read, write):
  32. async with ClientSession(read, write) as session:
  33. self.mcp_session = session

  34. # Initialize connection
  35. await session.initialize()

  36. # Get available tools and resources
  37. tools = await session.list_tools()
  38. resources = await session.list_resources()

  39. print(f"✅ Connected! Found {len(tools)} tools and {len(resources)} resources")

  40. # Keep the session active
  41. await self.run_agent_loop()

  42. async def think(self, task: str) -> str:
  43. """Use LLM to understand and plan actions"""
  44. response = await self.llm.chat.completions.create(
  45. model="gpt-3.5-turbo",
  46. messages=[
  47. {
  48. "role": "system",
  49. "content": f"""You are {self.name}, an AI assistant for a café.
  50. You help collect and analyze customer feedback.
  51. Be friendly, professional, and insightful."""
  52. },
  53. {
  54. "role": "user",
  55. "content": task
  56. }
  57. ],
  58. temperature=0.7
  59. )

  60. return response.choices[0].message.content

  61. async def collect_customer_feedback(self, conversation: str) -> Dict[str, Any]:
  62. """Intelligently extract feedback from conversation"""

  63. # Use LLM to extract information
  64. extraction_prompt = f"""
  65. Extract the following from this customer conversation:
  66. 1. Customer name (if mentioned)
  67. 2. The main feedback points
  68. 3. Overall rating (1-5)
  69. 4. Key topics mentioned

  70. Conversation:
  71. {conversation}

  72. Return as JSON format.
  73. """

  74. response = await self.llm.chat.completions.create(
  75. model="gpt-3.5-turbo",
  76. messages=[
  77. {"role": "system", "content": "Extract information and return valid JSON only."},
  78. {"role": "user", "content": extraction_prompt}
  79. ],
  80. temperature=0.3,
  81. response_format={"type": "json_object"}
  82. )

  83. extracted = json.loads(response.choices[0].message.content)

  84. # Use MCP tool to collect feedback
  85. result = await self.mcp_session.call_tool(
  86. "collect_feedback",
  87. arguments={
  88. "customer_name": extracted.get("customer_name", "Anonymous"),
  89. "feedback": extracted.get("feedback", conversation),
  90. "rating": extracted.get("rating", 3)
  91. }
  92. )

  93. return {
  94. "status": "collected",
  95. "details": extracted,
  96. "server_response": result
  97. }

  98. async def analyze_feedback_trends(self) -> str:
  99. """Analyze all feedback and generate insights"""

  100. # Get recent feedback from MCP server
  101. recent_feedback = await self.mcp_session.read_resource("feedback://recent")
  102. summary_data = await self.mcp_session.read_resource("feedback://summary")

  103. # Use LLM to generate insights
  104. analysis_prompt = f"""
  105. Analyze this customer feedback data and provide:
  106. 1. Key themes and patterns
  107. 2. Areas needing immediate attention
  108. 3. Positive aspects to maintain
  109. 4. Specific recommendations for improvement

  110. Recent Feedback:
  111. {recent_feedback}

  112. Summary Statistics:
  113. {summary_data}
  114. """

  115. response = await self.llm.chat.completions.create(
  116. model="gpt-3.5-turbo",
  117. messages=[
  118. {
  119. "role": "system",
  120. "content": "You are a business analyst. Provide actionable insights."
  121. },
  122. {"role": "user", "content": analysis_prompt}
  123. ],
  124. temperature=0.5
  125. )

  126. return response.choices[0].message.content

  127. async def handle_customer_interaction(self, message: str) -> str:
  128. """Main interaction handler"""

  129. # Determine intent
  130. intent_prompt = f"""
  131. Classify this message intent:
  132. - 'give_feedback': Customer wants to share feedback
  133. - 'check_status': Customer asking about their previous feedback
  134. - 'general_question': Other questions

  135. Message: {message}

  136. Return only the intent classification.
  137. """

  138. intent_response = await self.llm.chat.completions.create(
  139. model="gpt-3.5-turbo",
  140. messages=[
  141. {"role": "system", "content": "Classify intent. Return only the classification."},
  142. {"role": "user", "content": intent_prompt}
  143. ],
  144. temperature=0.1
  145. )

  146. intent = intent_response.choices[0].message.content.strip().lower()

  147. if "give_feedback" in intent:
  148. # Collect feedback
  149. feedback_result = await self.collect_customer_feedback(message)

  150. # Generate friendly response
  151. response = await self.think(
  152. f"Customer gave feedback. Details: {feedback_result}. "
  153. "Thank them and mention any immediate actions we'll take."
  154. )

  155. return response

  156. elif "check_status" in intent:
  157. # Get feedback summary
  158. summary = await self.mcp_session.read_resource("feedback://summary")

  159. response = await self.think(
  160. f"Customer asking about feedback status. Our summary: {summary}. "
  161. "Provide a helpful update."
  162. )

  163. return response

  164. else:
  165. # General response
  166. return await self.think(f"Respond helpfully to: {message}")

  167. async def run_agent_loop(self):
  168. """Main agent interaction loop"""
  169. print(f"\n🤖 {self.name} is ready! Type 'quit' to exit.")
  170. print("Try: 'Hi, I'm Sarah. I loved the new latte recipe! 5 stars!'")

  171. while True:
  172. try:
  173. # Get user input
  174. user_input = input("\n👤 You: ").strip()

  175. if user_input.lower() in ['quit', 'exit', 'bye']:
  176. print(f"👋 {self.name}: Goodbye! Have a great day!")
  177. break

  178. if user_input.lower() == 'analyze':
  179. # Run analysis
  180. print(f"\n📊 {self.name}: Analyzing feedback trends...")
  181. analysis = await self.analyze_feedback_trends()
  182. print(f"📈 Analysis:\n{analysis}")
  183. continue

  184. # Handle regular interaction
  185. print(f"\n🤖 {self.name}: Processing...")
  186. response = await self.handle_customer_interaction(user_input)
  187. print(f"🤖 {self.name}: {response}")

  188. except KeyboardInterrupt:
  189. print(f"\n👋 {self.name}: Goodbye!")
  190. break
  191. except Exception as e:
  192. print(f"❌ Error: {e}")

  193. # Main entry point
  194. async def main():
  195. agent = FeedbackAgent()
  196. await agent.connect_to_server()

  197. if __name__ == "__main__":
  198. asyncio.run(main())
python
第3步:添加工作流引擎
创建workflow_engine.py来管理自动化任务:
  1. #!/usr/bin/env python3
  2. """
  3. Workflow Engine for Feedback System
  4. Manages automated tasks and scheduled operations
  5. """

  6. import asyncio
  7. from typing import Dict, Any
  8. from datetime import datetime, timedelta

  9. class FeedbackWorkflow:
  10. def __init__(self, agent):
  11. self.agent = agent
  12. self.workflows = {
  13. "daily_analysis": self.daily_analysis_workflow,
  14. "negative_feedback_alert": self.negative_feedback_alert,
  15. "weekly_report": self.weekly_report_workflow
  16. }

  17. async def daily_analysis_workflow(self):
  18. """Run daily feedback analysis"""
  19. print(f"\n📊 Running daily analysis...")

  20. # Get current feedback summary
  21. summary = await self.agent.mcp_session.read_resource("feedback://summary")
  22. # Analyze trends
  23. analysis = await self.agent.analyze_feedback_trends()
  24. print(f"📈 Daily Analysis Complete:\n{analysis}")
  25. # Check for urgent issues
  26. if "negative" in summary.lower() and "50" in summary:
  27. print("⚠️ High negative feedback detected!")
  28. await self.negative_feedback_alert()

  29. async def negative_feedback_alert(self, feedback_data: Dict[str, Any] = None):
  30. """Handle negative feedback alerts"""
  31. print(f"\n🚨 Negative feedback alert!")

  32. if feedback_data:
  33. # Create action plan
  34. response_plan = await self.agent.think(
  35. f"Create action plan for this negative feedback: {feedback_data['feedback']}. "
  36. "Include: immediate response, follow-up actions, and prevention measures."
  37. )

  38. print(f"📋 Action Plan:\n{response_plan}")

  39. # In production:
  40. # - Send alert to manager
  41. # - Create follow-up task
  42. # - Track resolution

  43. return response_plan

  44. async def weekly_report_workflow(self):
  45. """Generate comprehensive weekly report"""
  46. print(f"\n📊 Generating weekly report...")

  47. # Get all data
  48. summary = await self.agent.mcp_session.read_resource("feedback://summary")
  49. recent = await self.agent.mcp_session.read_resource("feedback://recent")

  50. # Generate executive summary
  51. report = await self.agent.think(
  52. f"Create executive summary for this week's feedback. "
  53. f"Data: {summary}\nRecent examples: {recent}\n"
  54. "Include: key metrics, trends, recommendations, and success stories."
  55. )

  56. print(f"📄 Weekly Report:\n{report}")
  57. return report

  58. async def run_workflow(self, workflow_name: str, *args, **kwargs):
  59. """Execute a specific workflow"""
  60. if workflow_name in self.workflows:
  61. return await self.workflows[workflow_name](*args, **kwargs)
  62. else:
  63. raise ValueError(f"Unknown workflow: {workflow_name}")

  64. class WorkflowScheduler:
  65. def __init__(self, workflow_engine):
  66. self.engine = workflow_engine
  67. self.scheduled_tasks = []

  68. def schedule_daily(self, hour: int, minute: int, workflow: str):
  69. """Schedule daily workflow"""
  70. self.scheduled_tasks.append({
  71. "type": "daily",
  72. "time": {"hour": hour, "minute": minute},
  73. "workflow": workflow
  74. })

  75. def schedule_on_event(self, event_type: str, workflow: str):
  76. """Schedule event-triggered workflow"""
  77. self.scheduled_tasks.append({
  78. "type": "event",
  79. "trigger": event_type,
  80. "workflow": workflow
  81. })

  82. async def run(self):
  83. """Run the scheduler"""
  84. print("⏰ Workflow scheduler started")

  85. # For demo: simulate some workflows
  86. await asyncio.sleep(2)
  87. await self.engine.run_workflow("daily_analysis")

  88. # Simulate negative feedback
  89. await asyncio.sleep(3)
  90. await self.engine.run_workflow("negative_feedback_alert", {
  91. "customer_name": "Demo User",
  92. "feedback": "The coffee was cold and service was slow",
  93. "rating": 2
  94. })
python
第4步:整合所有组件
创建run_system.py来运行完整系统:
  1. #!/usr/bin/env python3
  2. """
  3. Run the Complete AI Feedback System
  4. """

  5. import asyncio
  6. import subprocess
  7. import time
  8. from feedback_agent import FeedbackAgent
  9. from workflow_engine import FeedbackWorkflow, WorkflowScheduler

  10. async def run_complete_system():
  11. """Run server and agent together"""

  12. # Start the MCP server in background
  13. print("🚀 Starting MCP Feedback Server...")
  14. server_process = subprocess.Popen(
  15. ["python", "feedback_server.py"],
  16. stdout=subprocess.PIPE,
  17. stderr=subprocess.PIPE
  18. )

  19. # Give server time to start
  20. time.sleep(2)

  21. try:
  22. # Create and connect agent
  23. print("🤖 Starting AI Agent...")
  24. agent = FeedbackAgent()

  25. # Create workflow engine
  26. workflow = FeedbackWorkflow(agent)
  27. scheduler = WorkflowScheduler(workflow)

  28. # Schedule some workflows
  29. scheduler.schedule_daily(9, 0, "daily_analysis")
  30. scheduler.schedule_on_event("negative_feedback", "negative_feedback_alert")
  31. scheduler.schedule_daily(17, 0, "weekly_report")

  32. # Run the agent
  33. await agent.connect_to_server()

  34. finally:
  35. # Clean up
  36. server_process.terminate()
  37. print("\n👋 System shutdown complete")

  38. if __name__ == "__main__":
  39. print("🎯 Complete AI Feedback System Starting...\n")
  40. asyncio.run(run_complete_system())
python
第5步:测试您的AI系统
让我们测试完整系统:
  1. # 确保您的.env文件包含您的OpenAI API密钥
  2. python run_system.py
bash
尝试这些交互:
提供反馈:
"Hi, I'm Sarah. I loved the new latte recipe! 5 stars!"
"The service was terrible today. My order took 30 minutes."
检查分析:
输入:analyze
提问:
"What's the overall feedback trend?"
"How many people gave feedback today?"
我们实现的关键概念
1. Agent-服务器通信
Agent连接到MCP服务器并可以:
读取资源(反馈数据)
调用工具(收集反馈、分析情感)
智能处理结果
2. LLM集成
我们使用GPT-3.5来:
理解自然语言
从对话中提取信息
生成洞察和建议
创建个性化响应
3. 自动化工作流
我们的系统可以:
运行定时分析
响应事件(负面反馈)
自动生成报告
4. 智能决策制定
Agent:
分类用户意图
选择适当的行动
提供上下文响应
下一步是什么?
在我们的最后一部分(第5部分)中,我们将:
使用真实数据库添加数据持久性
实现高级功能(多语言支持、语音输入)
为生产使用部署系统
探索扩展策略
您现在已经构建了一个功能性的AI系统!您将如何扩展它以满足您的业务需求?哪些其他工作流会有帮助?
挑战:扩展您的系统
尝试添加这些功能:
负面反馈的电子邮件通知
情感趋势可视化
客户响应模板
多位置支持
在评论中分享您的扩展!
技术要点总结
AI Agent的核心功能
智能交互:使用LLM理解用户意图
自动反馈收集:从对话中提取结构化信息
趋势分析:生成业务洞察和建议
工作流自动化:定时任务和事件触发
系统架构特点
模块化设计:Agent、服务器、工作流引擎分离
异步处理:支持并发操作和实时响应
智能决策:基于LLM的意图识别和响应生成
可扩展性:易于添加新功能和工作流
实际应用价值
客户服务自动化:24/7智能客服支持
反馈管理:自动收集、分析和响应客户反馈
业务洞察:实时趋势分析和改进建议
工作流优化:自动化重复任务和报告生成
开发最佳实践
环境配置:使用.env文件管理API密钥
错误处理:完善的异常处理机制
日志记录:详细的系统运行日志
测试验证:完整的系统集成测试
通过这个教程,您已经掌握了构建完整AI系统的核心技能,可以开始创建自己的智能应用了!