最近在做一个电商客服智能体的项目,用到了扣子智能体框架。电商客服这个场景,业务高峰时咨询量巨大,对话还得有上下文,不能答非所问,挑战不小。今天就把从架构设计到性能调优的实战经验梳理一下,希望能给有类似需求的同学一些参考。

1. 背景与痛点:为什么需要智能体?

做这个项目之前,我们复盘了传统客服系统遇到的几个典型问题:

  • 流量洪峰应对乏力:一到促销或秒杀,咨询量瞬间暴涨。原来的规则引擎或简单机器人,要么排队卡死,要么回复千篇一律,用户体验很差。
  • 上下文频繁丢失:用户经常在多轮对话中切换话题,比如从询问发货时间,突然跳到商品材质比较。传统系统很难记住之前的对话状态,导致每个问题都得重新开始,用户得反复描述。
  • 知识更新滞后:商品信息、活动规则、售后政策变化很快。靠人工维护知识库,不仅工作量大,而且很难做到实时同步,经常出现机器人回答的信息已经过时的情况。
  • 意图识别不准:用户的问题五花八门,同一个意思有多种问法。简单的关键词匹配很容易误判,比如用户问“这个红的好看吗?”,系统可能只匹配到“红”这个颜色词,而忽略了用户其实在问“款式”或“搭配”。

正是这些痛点,促使我们转向基于大语言模型的智能体方案,它能够理解自然语言、维护对话状态、并动态地从知识库中检索信息。

2. 架构设计:规则引擎 vs. LLM智能体

在选型初期,我们重点对比了两种方案:

  • 传统规则引擎
    • 优点:规则明确,响应速度极快(毫秒级),可控性强,对简单、高频、固定模式的问题处理效率高。
    • 缺点:维护成本高,规则会随着业务膨胀变得异常复杂;泛化能力差,无法处理未预定义的问法;缺乏真正的语义理解,多轮对话能力弱。
  • LLM智能体框架
    • 优点:强大的自然语言理解和生成能力,能处理开放域问题;通过微调或提示工程,可以较好地理解业务意图;天然支持多轮对话的上下文管理。
    • 缺点:响应延迟相对较高(几百毫秒到秒级);存在“幻觉”风险,可能生成不准确的信息;对知识实时性的依赖强,需要搭配高效的检索系统。

显然,对于追求服务质量和用户体验的电商客服场景,LLM智能体是更优解。但我们也吸收了规则引擎的优点,在架构上做了融合。

我们的智能体分层架构如下:

[用户端] (App/Web/H5)
      |
      v
[接入层] (API Gateway)
      | 负载均衡、限流、鉴权
      v
[对话引擎层] (Core Agent)
      |-----------------------|
      |                       |
[意图识别模块]          [对话状态管理]
      |                       |
[知识检索模块]          [会话上下文存储]
      |                       |
[LLM推理模块]          [超时/异常处理器]
      |-----------------------|
      |
      v
[数据与服务层]
      |-----------------------|
      |           |           |
[向量知识库]   [业务数据库]  [Redis缓存]
      |           |           |
[商品/文章Embedding] [订单/用户数据] [Session/队列]
  • 接入层:负责协议转换、流量管控和安全防护。
  • 对话引擎层:这是核心。它协调各个模块,接收用户输入,先进行意图识别,然后根据意图决定是直接调用LLM生成,还是先去知识库检索,最后结合历史对话状态,生成最终回复。
  • 数据与服务层:为引擎提供数据支撑。向量知识库用于语义检索,业务数据库提供实时订单/用户数据,Redis则用于缓存高频数据和维护分布式会话。

3. 核心实现:代码与逻辑拆解

3.1 异步消息处理流程

高并发下,同步处理会迅速耗尽资源。我们采用 asyncio 配合 Redis Stream 实现了一个生产-消费者模型的消息队列,确保系统弹性。

import asyncio
import json
import aioredis
from typing import Dict, Any

class AsyncMessageProcessor:
    def __init__(self, redis_url: str, stream_key: str):
        self.redis = aioredis.from_url(redis_url)
        self.stream_key = stream_key
        self.consumer_group = "customer_service_group"
        self.consumer_name = f"consumer_{asyncio.current_task().name}"

    async def produce_message(self, user_id: str, query: str, session_id: str) -> str:
        """生产消息到Redis Stream。时间复杂度: O(1)"""
        message = {
            "user_id": user_id,
            "query": query,
            "session_id": session_id,
            "timestamp": asyncio.get_event_loop().time()
        }
        msg_id = await self.redis.xadd(self.stream_key, message)
        return msg_id

    async def consume_messages(self, processor_func):
        """消费并处理消息。使用消费者组确保负载均衡和消息不丢失。"""
        # 确保消费者组存在
        try:
            await self.redis.xgroup_create(self.stream_key, self.consumer_group, id="0", mkstream=True)
        except aioredis.ResponseError:
            pass  # 组已存在

        while True:
            try:
                # 从Stream中读取消息,阻塞等待,最多5条
                messages = await self.redis.xreadgroup(
                    self.consumer_group, self.consumer_name,
                    {self.stream_key: ">"}, count=5, block=5000
                )
                if not messages:
                    continue

                for stream, message_list in messages:
                    for message_id, message_data in message_list:
                        # 处理消息核心逻辑
                        await processor_func(message_data)
                        # 确认消息处理完成 (ACK)
                        await self.redis.xack(self.stream_key, self.consumer_group, message_id)
            except Exception as e:
                print(f"Error consuming message: {e}")
                await asyncio.sleep(1)

# 示例处理函数
async def handle_customer_query(message_data: Dict[str, Any]):
    """实际处理用户查询的函数"""
    session_id = message_data.get('session_id')
    query = message_data.get('query')
    # 这里调用对话引擎的核心处理逻辑
    response = await dialogue_engine.process(session_id, query)
    # 将响应推回给用户(例如通过WebSocket)
    # await push_to_user(session_id, response)

这个设计将耗时的LLM推理和知识检索过程异步化,接入层快速响应“已收到请求”,实际处理在后台进行,极大提升了系统吞吐量。

3.2 对话状态机实现

多轮对话的核心是状态管理。我们实现了一个基于内存和Redis的混合状态机,支持超时和恢复。

import time
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any
import pickle

class DialogueState(Enum):
    GREETING = "greeting"
    QA = "qa"
    ORDER_INQUIRY = "order_inquiry"
    COMPLAINT = "complaint"
    TRANSFER = "transfer_to_human"
    END = "end"

@dataclass
class DialogueSession:
    session_id: str
    user_id: str
    current_state: DialogueState
    context: Dict[str, Any]  # 存储对话中的关键信息,如商品ID、订单号
    history: list  # 最近的对话历史
    created_at: float
    last_active_at: float
    ttl: int = 1800  # 会话存活时间,默认30分钟

class DialogueStateManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.local_cache = {}  # 本地缓存,减少Redis读取
        self.state_handlers = self._register_handlers()

    def _register_handlers(self) -> Dict[DialogueState, callable]:
        """注册每个状态对应的处理函数。"""
        return {
            DialogueState.GREETING: self._handle_greeting,
            DialogueState.QA: self._handle_qa,
            DialogueState.ORDER_INQUIRY: self._handle_order_inquiry,
            # ... 其他状态处理函数
        }

    async def get_or_create_session(self, session_id: str, user_id: str) -> DialogueSession:
        """获取或创建会话。空间复杂度: O(n), n为会话中存储的数据量。"""
        # 1. 检查本地缓存
        if session_id in self.local_cache:
            session = self.local_cache[session_id]
            if time.time() - session.last_active_at < session.ttl:
                session.last_active_at = time.time()
                return session

        # 2. 从Redis获取
        redis_key = f"dialogue:session:{session_id}"
        session_data = await self.redis.get(redis_key)
        if session_data:
            session = pickle.loads(session_data)
            if time.time() - session.last_active_at < session.ttl:
                session.last_active_at = time.time()
                self.local_cache[session_id] = session
                # 更新Redis中的存活时间
                await self.redis.setex(redis_key, session.ttl, pickle.dumps(session))
                return session
            else:
                # 会话已超时,删除旧会话
                await self.redis.delete(redis_key)
                del self.local_cache[session_id]

        # 3. 创建新会话
        new_session = DialogueSession(
            session_id=session_id,
            user_id=user_id,
            current_state=DialogueState.GREETING,
            context={},
            history=[],
            created_at=time.time(),
            last_active_at=time.time()
        )
        self.local_cache[session_id] = new_session
        await self.redis.setex(redis_key, new_session.ttl, pickle.dumps(new_session))
        return new_session

    async def process_input(self, session_id: str, user_input: str) -> tuple[DialogueState, str]:
        """处理用户输入,驱动状态转移。"""
        session = await self.get_or_create_session(session_id, "default_user")
        handler = self.state_handlers.get(session.current_state)

        if not handler:
            return DialogueState.END, "系统状态错误。"

        # 执行当前状态处理逻辑,并可能返回下一个状态
        next_state, response, updated_context = await handler(session, user_input)

        # 更新会话状态和上下文
        session.current_state = next_state
        session.context.update(updated_context or {})
        session.history.append((user_input, response))
        # 保持最近10轮历史
        session.history = session.history[-10:]
        session.last_active_at = time.time()

        # 持久化更新
        redis_key = f"dialogue:session:{session_id}"
        self.local_cache[session_id] = session
        await self.redis.setex(redis_key, session.ttl, pickle.dumps(session))

        return next_state, response

    async def _handle_qa(self, session: DialogueSession, user_input: str) -> tuple[DialogueState, str, dict]:
        """处理问答状态。"""
        # 1. 意图识别 (可调用单独的NLU模块)
        intent = await self._classify_intent(user_input)
        # 2. 根据意图,决定是检索知识库还是继续对话
        if intent == "product_query":
            # 从用户输入或上下文中提取商品信息
            product_info = await self._extract_entity(user_input, session.context)
            # 调用知识检索模块
            answer = await knowledge_base.retrieve(product_info)
            # 保持在QA状态,除非用户明确切换话题
            next_state = DialogueState.QA
            updated_context = {"last_product": product_info}
        else:
            # 其他意图处理...
            next_state = DialogueState.QA
            answer = "我还在学习这个问题,您可以换个方式问问吗?"
            updated_context = {}
        return next_state, answer, updated_context

    # ... 其他状态处理函数的具体实现

这个状态机将会话状态、上下文和历史记录封装在一起,并通过TTL机制自动清理僵尸会话,利用本地缓存提升读取性能,通过Redis保证分布式环境下的数据持久化。

4. 生产环境考量

4.1 压力测试与性能调优

系统上线前,我们进行了全面的压力测试。使用 locust 模拟用户并发请求,主要关注两个指标:QPS(每秒查询率)P99响应延迟

测试环境:4核8G容器,连接单独的Redis和向量数据库服务。

  • 观察结果
    1. 在QPS达到约50之前,响应延迟(P99)基本稳定在800ms-1.2s(主要耗时在LLM调用和检索)。
    2. 当QPS超过50后,延迟开始明显上升,系统资源(CPU、内存)成为瓶颈。
    3. 达到80 QPS时,部分请求因超时(设置3s)失败。
  • 调优措施
    • 垂直扩容:将容器规格提升至8核16G,单实例QPS承载能力提升至约100。
    • 水平扩容:基于K8s的HPA,根据CPU利用率和QPS指标自动伸缩实例数。
    • 缓存优化:对高频且不变的知识库内容(如品牌介绍、通用政策)进行应用层缓存,减少向量检索次数。
    • LLM调用批处理:对于非实时性要求极高的场景,将短时间内的多个用户查询合并后批量调用LLM API,显著降低了平均token的调用成本和时间。
4.2 安全与审计:敏感词与日志

电商客服涉及用户隐私和交易信息,安全至关重要。

  • 敏感词过滤:在对话引擎的最终输出层之前,加入一个过滤模块。我们使用 DFA算法 构建敏感词树,实现O(n)时间复杂度的多模式匹配。不仅过滤政治、暴恐、违禁品等通用敏感词,还结合电商场景定制了“刷单”、“套现”、“违禁品代称”等业务敏感词。匹配到的内容会被替换为预定义的安全回复或直接触发转人工。
    class SensitiveFilter:
        def __init__(self):
            self.dfa_root = {}
            self._build_tree(["坏词1", "敏感词2"]) # 从文件加载
    
        def _build_tree(self, keywords):
            for word in keywords:
                node = self.dfa_root
                for char in word:
                    node = node.setdefault(char, {})
                node['is_end'] = True
    
        def filter(self, text: str) -> (str, bool):
            """过滤文本,返回过滤后的文本和是否包含敏感词的布尔值。时间复杂度: O(n)"""
            # ... DFA遍历实现
            pass
    
  • 审计日志:所有对话请求和响应,在脱敏(移除手机号、身份证号等)后,都会异步写入到Elasticsearch。日志包含:会话ID、用户ID(哈希处理)、时间戳、原始query、AI回复、调用的知识片段ID、耗时、敏感词命中情况等。这既满足了合规要求,也为后续分析对话质量、优化知识库和模型提供了数据基础。

5. 避坑指南与经验总结

  1. 冷启动优化:项目初期,知识库向量化加载慢。我们将全量知识库的向量构建和索引创建放在独立的初始化服务中,主服务启动时,只需从该服务拉取最新的索引文件或连接已建好的向量库,将启动时间从分钟级降到秒级。
  2. 分布式会话一致性:当智能体部署多个实例时,用户请求可能打到不同机器。我们采用 “Redis中心存储 + 本地缓存” 的方案。如上文代码所示,会话数据以Session ID为Key存储在Redis中,并设置TTL。每个服务实例维护一个短时间的本地缓存(如5秒)。写入时,先更新本地缓存再异步写回Redis;读取时,先读本地缓存,未命中或过期则读Redis。这在高并发读场景下大幅减轻了Redis压力,并保证了数据的最终一致性。对于强一致性要求的场景(如支付确认),则在关键状态转移时直接读写Redis。
  3. LLM的“幻觉”控制:这是LLM应用的共性问题。我们的策略是 “检索增强生成” 。即,让LLM的回复尽可能基于我们检索到的知识片段。在提示词中明确指令:“请严格根据以下提供的信息回答问题,如果信息中不包含答案,请直接说‘根据现有资料无法回答该问题’,不要编造信息。” 同时,在知识检索模块,我们提高了召回率,并设计了一个重排序模型,将最相关的片段排在前面提供给LLM。
  4. 错误降级与兜底:必须为每一个可能失败的环节设计降级方案。例如:LLM服务超时 -> 返回预置的通用话术;向量数据库故障 -> 降级到基于关键词的全文检索;状态机异常 -> 重置会话到初始状态,并提示用户重新描述问题。

结尾

通过这一套架构和实现,我们的客服智能体在核心场景下实现了超过95%的自动应答覆盖率,人工客服介入率大幅下降,高峰期服务承载力提升显著。当然,系统仍在持续迭代中。

最后抛出一个我们正在思考的开放性问题,也欢迎大家讨论:在一个用户可能同时通过App、网页、小程序等多个渠道发起咨询的场景下,如何设计一个高效的“跨渠道会话粘性保持机制”? 即,如何让用户在不同渠道间切换时,客服智能体能无缝地延续之前的对话上下文?是建立一个全局统一的用户会话ID映射体系,还是通过更复杂的用户身份识别和状态同步来实现?这里面涉及的技术和产品设计挑战都很有意思。

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐