第 9 章:实战 -- 核心引擎与记忆系统

上一章我们搭好了 MiniClaw 的骨架:认证模块(auth.py)和配置模块(config.py)。 但那个时候 MiniClaw 还只是个空壳子,什么都干不了。

这一章,我们要让它活过来


1. 本章目标

这一章做三件事:

  1. 对话引擎(engine.py) -- 让 MiniClaw 能和 Claude 聊天
  2. 记忆系统(memory.py) -- 让 MiniClaw 记住聊过的内容,下次打开还在
  3. CLI 交互层(cli.py) -- 一个漂亮的终端界面,输入命令就能用

做完之后,MiniClaw 就是一个能聊天、有记忆的 AI 助手了。

本章文件结构

miniclaw/
  __init__.py          # 包标识
  auth.py              # 认证模块(第 8 章写的)
  config.py            # 配置模块(第 8 章写的)
  engine.py            # [新] 对话引擎
  memory.py            # [新] 记忆系统
  cli.py               # [新] CLI 交互层
  __main__.py           # [更新] 入口文件

2. 对话引擎(engine.py)

2.1 它要干什么?

对话引擎是 MiniClaw 的"嘴巴和耳朵"。它负责:

你可以把它想象成一个翻译官:用户说话 -> 翻译官传给 Claude -> Claude 回话 -> 翻译官传回来。

2.2 为什么不直接用 ClaudeSDKClient?

直接用也行,但每次都要写一堆重复代码:创建 options、处理消息类型、统计费用…… 对话引擎把这些脏活累活封装起来,外面只需要一句 engine.chat("你好") 就搞定。

这就是"封装"的意义:把复杂的东西藏起来,对外只露一个简单的接口。

2.3 核心代码

"""对话引擎 - 基于 ClaudeSDKClient 的多轮对话管理。"""

from collections.abc import AsyncIterator
from claude_agent_sdk import (
    ClaudeSDKClient, ClaudeAgentOptions,
    AssistantMessage, ResultMessage,
    TextBlock, ThinkingBlock, ToolUseBlock,
)
from .config import MiniClawConfig


class ChatEngine:
    """对话引擎:封装 ClaudeSDKClient,提供简洁的对话接口。"""

    def __init__(self, config: MiniClawConfig):
        self.config = config
        self._client: ClaudeSDKClient | None = None
        # 费用统计
        self.last_cost: float = 0.0    # 上一次对话的费用
        self.total_cost: float = 0.0   # 累计总费用

    async def start(self) -> None:
        """启动引擎,连接到 Claude。"""
        options = ClaudeAgentOptions(
            system_prompt=self.config.system_prompt,
            model=self.config.model,
            max_turns=self.config.max_turns,
        )
        self._client = ClaudeSDKClient(options)
        await self._client.connect()

    async def chat(self, prompt: str) -> AsyncIterator[str]:
        """发送消息,流式返回回复文本。"""
        if not self._client:
            raise RuntimeError("引擎未启动,请先调用 start()")

        await self._client.query(prompt)

        async for msg in self._client.receive_response():
            if isinstance(msg, AssistantMessage):
                for block in msg.content:
                    if isinstance(block, TextBlock):
                        yield block.text
                    elif isinstance(block, ThinkingBlock):
                        yield f"\n[思考中...]\n"
                    elif isinstance(block, ToolUseBlock):
                        yield f"\n[使用工具: {block.name}]\n"
            elif isinstance(msg, ResultMessage):
                self.last_cost = msg.total_cost_usd or 0.0
                self.total_cost += self.last_cost

    async def stop(self) -> None:
        """停止引擎,断开连接。"""
        if self._client:
            await self._client.disconnect()
            self._client = None

2.4 逐行拆解

__init__:准备工作

def __init__(self, config: MiniClawConfig):
    self.config = config
    self._client: ClaudeSDKClient | None = None
    self.last_cost: float = 0.0
    self.total_cost: float = 0.0

把配置存下来,客户端先设为 None(还没连接呢),费用初始化为 0。

start():建立连接

async def start(self) -> None:
    options = ClaudeAgentOptions(
        system_prompt=self.config.system_prompt,
        model=self.config.model,
        max_turns=self.config.max_turns,
    )
    self._client = ClaudeSDKClient(options)
    await self._client.connect()

从配置里拿出 system_prompt、model、max_turns,创建 ClaudeAgentOptions, 然后创建客户端并连接。这就是第 4 章学的 ClaudeSDKClient 的手动连接方式。

为什么不用 async with?因为 start()stop() 不在同一个函数里。 引擎启动后要一直保持连接,直到用户主动退出才断开。

chat():核心 -- 发送消息并流式返回

async def chat(self, prompt: str) -> AsyncIterator[str]:
    await self._client.query(prompt)

    async for msg in self._client.receive_response():
        if isinstance(msg, AssistantMessage):
            for block in msg.content:
                if isinstance(block, TextBlock):
                    yield block.text
                elif isinstance(block, ThinkingBlock):
                    yield f"\n[思考中...]\n"
                elif isinstance(block, ToolUseBlock):
                    yield f"\n[使用工具: {block.name}]\n"
        elif isinstance(msg, ResultMessage):
            self.last_cost = msg.total_cost_usd or 0.0
            self.total_cost += self.last_cost

这个函数是一个异步生成器(注意 yield 关键字)。它不是一次性把所有回复都返回, 而是像流水线一样,收到一点就吐出一点。调用方可以边收边打印,实现"打字机"效果。

消息处理逻辑:

消息类型 处理方式
TextBlock 直接 yield 文本内容
ThinkingBlock yield 一个"思考中"的提示
ToolUseBlock yield 工具名称(让用户知道 Claude 在做什么)
ResultMessage 记录费用,不 yield(这是统计信息,不是回复内容)

stop():断开连接

async def stop(self) -> None:
    if self._client:
        await self._client.disconnect()
        self._client = None

断开连接,把客户端设回 None。加了 if 判断,防止重复断开报错。

2.5 错误恢复

实际使用中,连接可能因为网络问题断开。我们在 engine.py 里加了一个简单的防御:

async def chat(self, prompt: str) -> AsyncIterator[str]:
    if not self._client:
        raise RuntimeError("引擎未启动,请先调用 start()")
    ...

如果引擎没启动就调用 chat(),直接抛异常,而不是让程序莫名其妙地崩溃。 这种"先检查再执行"的模式叫做防御性编程,在写工具类代码时非常有用。


3. 记忆系统(memory.py)

3.1 为什么要记忆?

你可能会问:ClaudeSDKClient 不是已经有上下文记忆了吗?同一个连接里的多轮对话, Claude 都记得啊。

没错,但那是会话内记忆 -- 你关掉程序,记忆就没了。

记忆系统要做的是跨会话记忆: - 今天聊了什么,明天打开还能看到 - 可以浏览历史对话 - 可以开始新对话,也可以回顾旧对话

就好比微信的聊天记录 -- 你关了微信再打开,之前的消息还在。

3.2 用 SQLite 存储

我们用 SQLite 来存对话记录。为什么选 SQLite?

3.3 数据库设计

两张表,很简单:

conversations(对话表)
├── id          TEXT     主键,UUID
├── title       TEXT     对话标题(用第一条消息自动生成)
├── created_at  TEXT     创建时间
└── updated_at  TEXT     最后更新时间

messages(消息表)
├── id               INTEGER  自增主键
├── conversation_id  TEXT     关联 conversations.id
├── role             TEXT     "user" 或 "assistant"
├── content          TEXT     消息内容
└── timestamp        TEXT     时间戳

为什么 conversation id 用 UUID 而不是自增整数?

因为 UUID 是全局唯一的,不会重复。如果以后你要做多设备同步、导出导入之类的功能, UUID 不会出现冲突问题。

3.4 核心代码

"""记忆系统 - 基于 SQLite 的对话历史存储。"""

import sqlite3
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path


@dataclass
class Conversation:
    """对话记录。"""
    id: str
    title: str
    created_at: str
    updated_at: str


@dataclass
class MemoryMessage:
    """消息记录。"""
    id: int
    conversation_id: str
    role: str        # "user" 或 "assistant"
    content: str
    timestamp: str


class Memory:
    """记忆系统:管理对话历史的持久化存储。"""

    def __init__(self, db_path: Path | None = None):
        if db_path is None:
            db_path = Path.home() / ".miniclaw" / "memory.db"

        # 确保目录存在
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self.db_path = db_path
        self._conn: sqlite3.Connection | None = None
        self._init_db()

    def _init_db(self) -> None:
        """初始化数据库,创建表(如果不存在)。"""
        conn = self._get_conn()
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS conversations (
                id         TEXT PRIMARY KEY,
                title      TEXT NOT NULL,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL
            );
            CREATE TABLE IF NOT EXISTS messages (
                id               INTEGER PRIMARY KEY AUTOINCREMENT,
                conversation_id  TEXT NOT NULL,
                role             TEXT NOT NULL,
                content          TEXT NOT NULL,
                timestamp        TEXT NOT NULL,
                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
            );
        """)
        conn.commit()

3.5 核心方法

创建新对话:

def create_conversation(self, title: str = "新对话") -> Conversation:
    """创建一个新对话。"""
    conv = Conversation(
        id=str(uuid.uuid4()),
        title=title,
        created_at=datetime.now(timezone.utc).isoformat(),
        updated_at=datetime.now(timezone.utc).isoformat(),
    )
    conn = self._get_conn()
    conn.execute(
        "INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)",
        (conv.id, conv.title, conv.created_at, conv.updated_at),
    )
    conn.commit()
    return conv

保存一条消息:

def add_message(self, conversation_id: str, role: str, content: str) -> MemoryMessage:
    """保存一条消息到指定对话。"""
    now = datetime.now(timezone.utc).isoformat()
    conn = self._get_conn()

    cursor = conn.execute(
        "INSERT INTO messages (conversation_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
        (conversation_id, role, content, now),
    )
    conn.execute(
        "UPDATE conversations SET updated_at = ? WHERE id = ?",
        (now, conversation_id),
    )
    conn.commit()

    return MemoryMessage(
        id=cursor.lastrowid or 0,
        conversation_id=conversation_id,
        role=role,
        content=content,
        timestamp=now,
    )

注意两个细节:

  1. 保存消息的同时更新对话的 updated_at,这样"最近对话"排序就是正确的
  2. cursor.lastrowid 拿到自增 ID,这是 SQLite 的标准做法

获取对话历史:

def get_history(self, conversation_id: str, limit: int = 50) -> list[MemoryMessage]:
    """获取指定对话的消息历史。"""
    conn = self._get_conn()
    rows = conn.execute(
        "SELECT id, conversation_id, role, content, timestamp "
        "FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC LIMIT ?",
        (conversation_id, limit),
    ).fetchall()

    return [MemoryMessage(*row) for row in rows]

ORDER BY timestamp ASC -- 按时间正序排列,最早的消息在前面。 LIMIT ? -- 防止一次查出几万条消息把内存撑爆。

列出所有对话:

def list_conversations(self, limit: int = 20) -> list[Conversation]:
    """列出所有对话,最近的在前面。"""
    conn = self._get_conn()
    rows = conn.execute(
        "SELECT id, title, created_at, updated_at "
        "FROM conversations ORDER BY updated_at DESC LIMIT ?",
        (limit,),
    ).fetchall()

    return [Conversation(*row) for row in rows]

ORDER BY updated_at DESC -- 最近更新的对话排在前面,和微信聊天列表一样。

3.6 自动创建数据库

注意 __init__ 里这一行:

db_path.parent.mkdir(parents=True, exist_ok=True)

第一次运行时,~/.miniclaw/ 目录还不存在。这行代码会自动创建它。 parents=True 表示递归创建父目录,exist_ok=True 表示目录已存在也不报错。

整个过程对用户完全透明 -- 用户不需要手动创建任何目录或数据库文件。


4. CLI 交互层(cli.py)

4.1 它要干什么?

CLI 交互层是用户直接接触的部分。它负责:

4.2 关于 rich 库

我们的 CLI 会尝试用 rich 库来美化输出。 但如果没装 rich,就退回到纯 print不强制依赖第三方库,这是 CLI 工具的好习惯。

try:
    from rich.console import Console
    from rich.markdown import Markdown
    _console = Console()
    HAS_RICH = True
except ImportError:
    HAS_RICH = False

4.3 命令系统

支持的命令:

命令 说明
/help 显示帮助信息
/history 查看当前对话的消息历史
/list 列出所有对话
/new 开始新对话
/cost 查看费用统计
/quit 退出 MiniClaw

命令处理的逻辑很朴素 -- 就是一堆 if/elif

async def _handle_command(self, command: str) -> bool:
    """处理斜杠命令。返回 True 表示应该继续主循环,False 表示应该退出。"""
    cmd = command.strip().lower()

    if cmd == "/help":
        self._print_help()
    elif cmd == "/history":
        self._show_history()
    elif cmd == "/list":
        self._show_conversations()
    elif cmd == "/new":
        self._new_conversation()
    elif cmd == "/cost":
        self._show_cost()
    elif cmd in ("/quit", "/exit", "/q"):
        return False
    else:
        self._print_info(f"未知命令: {command}。输入 /help 查看帮助。")

    return True

4.4 主循环

主循环是 CLI 的心脏:

async def run(self) -> None:
    """运行 CLI 主循环。"""
    self._print_welcome()

    # 启动引擎
    await self.engine.start()

    # 创建初始对话
    conv = self.memory.create_conversation()
    self.current_conversation_id = conv.id

    try:
        while True:
            # 1. 读取输入
            try:
                user_input = input("\n你: ").strip()
            except (EOFError, KeyboardInterrupt):
                break

            if not user_input:
                continue

            # 2. 处理命令
            if user_input.startswith("/"):
                should_continue = await self._handle_command(user_input)
                if not should_continue:
                    break
                continue

            # 3. 保存用户消息到记忆
            self.memory.add_message(
                self.current_conversation_id, "user", user_input
            )

            # 4. 发给引擎,流式接收回复
            print("\nClaude: ", end="", flush=True)
            full_response = []

            async for chunk in self.engine.chat(user_input):
                print(chunk, end="", flush=True)
                full_response.append(chunk)

            print()  # 换行

            # 5. 保存 Claude 的回复到记忆
            response_text = "".join(full_response)
            if response_text:
                self.memory.add_message(
                    self.current_conversation_id, "assistant", response_text
                )

            # 6. 显示费用(如果有的话)
            if self.engine.last_cost > 0:
                self._print_dim(f"  [费用: ${self.engine.last_cost:.4f}]")

    finally:
        await self.engine.stop()

五步流程,清清楚楚:

读取输入 -> 处理命令 -> 保存用户消息 -> 引擎处理 -> 保存回复

注意 finally 块里的 await self.engine.stop() -- 不管程序是正常退出还是出了异常, 都要断开和 Claude 的连接。这和第 4 章讲的 try/finally 模式一样。

4.5 流式输出的关键

async for chunk in self.engine.chat(user_input):
    print(chunk, end="", flush=True)
    full_response.append(chunk)

两个关键参数:

没有 flush=True 的话,Python 会攒一堆内容再一次性输出,就没有"打字机"效果了。

同时我们把每块文本存到 full_response 列表里,最后拼起来保存到记忆系统。 流式输出和完整保存,两不误。


5. 把它们串起来 -- 更新 main.py

上一章的 __main__.py 只做了认证和配置。现在我们把四个模块串起来:

"""MiniClaw 入口文件。"""

import asyncio
import sys

from .auth import detect_auth_mode
from .config import load_config
from .engine import ChatEngine
from .memory import Memory
from .cli import CLI


def main() -> None:
    """MiniClaw 主入口。"""
    # 1. 检测认证方式
    auth = detect_auth_mode()
    if not auth.is_valid:
        print(f"认证失败: {auth.message}")
        print("请设置 ANTHROPIC_API_KEY 或确保 Claude CLI 已登录。")
        sys.exit(1)

    print(f"认证方式: {auth.mode} ({auth.message})")

    # 2. 加载配置
    config = load_config()

    # 3. 创建核心组件
    engine = ChatEngine(config)
    memory = Memory()
    cli = CLI(engine=engine, memory=memory)

    # 4. 启动 CLI 主循环
    asyncio.run(cli.run())


if __name__ == "__main__":
    main()

调用链路:

main()
  ├── detect_auth_mode()    # 检查能不能用
  ├── load_config()         # 读配置
  ├── ChatEngine(config)    # 创建引擎
  ├── Memory()              # 创建记忆
  ├── CLI(engine, memory)   # 创建 CLI
  └── cli.run()             # 开跑!

每个模块各司其职,互不干扰。这就是"单一职责原则"在实际项目中的体现。


6. 运行效果展示

全部写完了,来看看效果:

# 运行 MiniClaw
python -m miniclaw
认证方式: api_key (使用 API Key 认证)

  MiniClaw v0.1.0 - 你的 AI 助手
  输入消息开始聊天,输入 /help 查看命令。

你: 你好!我叫小明,是个 Python 开发者。

Claude: 你好小明!很高兴认识你。作为 Python 开发者,你平时主要做哪方面的
开发呢?Web 后端、数据分析、还是其他方向?
  [费用: $0.0012]

你: 我主要做后端开发。你能记住我的信息吗?

Claude: 当然!我记住了,你是小明,主要做 Python 后端开发。在我们这次对话
期间,我会一直记得这些信息。有什么我能帮你的吗?
  [费用: $0.0008]

你: /cost

  本次费用: $0.0008
  累计费用: $0.0020

你: /history

  === 对话历史 ===
  [2026-02-25 10:30:15] 你: 你好!我叫小明,是个 Python 开发者。
  [2026-02-25 10:30:18] Claude: 你好小明!很高兴认识你...
  [2026-02-25 10:31:02] 你: 我主要做后端开发。你能记住我的信息吗?
  [2026-02-25 10:31:05] Claude: 当然!我记住了...

你: /new

  已开始新对话。

你: /quit

  再见!累计花费: $0.0020

关键体验点

  1. 多轮对话有记忆:Claude 记得你叫小明、做后端开发
  2. 流式输出:回复是逐字打印的,不是等全部生成完才显示
  3. 费用透明:每次对话后显示费用,/cost 查看累计
  4. 对话历史持久化:下次打开,/history 还能看到之前聊的内容
  5. 命令系统/new 开新话题,/quit 退出

7. 三个模块的协作关系

最后来看看这三个模块是怎么配合的:

用户输入 "你好"
    │
    ▼
 CLI.run()
    │
    ├── memory.add_message("user", "你好")     # 存用户消息
    │
    ├── engine.chat("你好")                     # 发给 Claude
    │     │
    │     ├── ClaudeSDKClient.query("你好")     # SDK 发送
    │     │
    │     └── yield "你好小明..."                # 流式返回
    │
    ├── print("你好小明...", flush=True)         # 实时打印
    │
    └── memory.add_message("assistant", "...")  # 存 Claude 回复

三个模块像齿轮一样咬合:

它们之间通过简单的方法调用通信,没有复杂的事件系统或回调。 简单就是好的 -- 在项目初期不要过度设计。


8. 小结

这一章我们实现了 MiniClaw 的三个核心模块:

模块 文件 核心类 职责
对话引擎 engine.py ChatEngine 封装 ClaudeSDKClient,提供简洁的对话接口
记忆系统 memory.py Memory SQLite 持久化存储对话历史
CLI 交互层 cli.py CLI 终端界面、命令处理、流式输出

学到的关键技术:

  1. 异步生成器(async generator) -- chat() 方法用 yield 流式返回文本
  2. SQLite 持久化 -- 零配置数据库,自动创建目录和表
  3. 流式打印 -- print(text, end="", flush=True) 实现打字机效果
  4. 防御性编程 -- 检查引擎状态,优雅处理异常
  5. 模块协作 -- CLI 协调 Engine 和 Memory,各司其职

现在 MiniClaw 已经是一个能用的 AI 助手了。但它还只是个"话痨" -- 只会聊天。 下一章,我们会给它装上工具,让它不仅能说,还能做。


本章文件清单

09-实战-核心引擎与记忆/
  README.md                    # 你正在读的这个文件
  miniclaw/
    __init__.py                # 包标识
    auth.py                    # 认证模块(第 8 章)
    config.py                  # 配置模块(第 8 章)
    engine.py                  # 对话引擎(本章新增)
    memory.py                  # 记忆系统(本章新增)
    cli.py                     # CLI 交互层(本章新增)
    __main__.py                # 入口文件(本章更新)