第 9 章:实战 -- 核心引擎与记忆系统

上一章我们搭好了 MiniClaw 的骨架:认证模块(auth.py)和配置模块(config.py)。 但那个时候 MiniClaw 还只是个空壳子,什么都干不了。

这一章,我们要让它活过来


1. 本章目标

这一章做三件事:

  1. 对话引擎(engine.py) -- 让 MiniClaw 能和 Claude 聊天
  2. 记忆系统(memory.py) -- 让 MiniClaw 记住聊过的内容,下次打开还在
  3. CLI 交互层(cli.py) -- 一个漂亮的终端界面,输入命令就能用

做完之后,MiniClaw 就是一个能聊天、有记忆的 AI 助手了。

本章文件结构

miniclaw/
  __init__.py          # 包标识
  auth.py              # 认证模块(第 8 章写的)
  config.py            # 配置模块(第 8 章写的)
  engine.py            # [新] 对话引擎
  memory.py            # [新] 记忆系统
  cli.py               # [新] CLI 交互层
  __main__.py           # [更新] 入口文件

2. 对话引擎(engine.py)

2.1 它要干什么?

对话引擎是 MiniClaw 的"嘴巴和耳朵"。它负责:

你可以把它想象成一个翻译官:用户说话 -> 翻译官传给 Claude -> Claude 回话 -> 翻译官传回来。

2.2 为什么不直接用 ClaudeSDKClient?

直接用也行,但每次都要写一堆重复代码:创建 options、处理消息类型、统计费用…… 对话引擎把这些脏活累活封装起来,外面只需要一句 engine.chat("你好") 就搞定。

这就是"封装"的意义:把复杂的东西藏起来,对外只露一个简单的接口。

2.3 核心代码

"""对话引擎 - 基于 ClaudeSDKClient 的多轮对话管理。"""

from collections.abc import AsyncIterator
from claude_agent_sdk import (
    ClaudeSDKClient, ClaudeAgentOptions,
    AssistantMessage, ResultMessage,
    TextBlock, ThinkingBlock, ToolUseBlock,
)
from .config import MiniClawConfig


class ChatEngine:
    """对话引擎:封装 ClaudeSDKClient,提供简洁的对话接口。"""

    def __init__(self, config: MiniClawConfig):
        self.config = config
        self._client: ClaudeSDKClient | None = None
        # 费用统计
        self.last_cost: float = 0.0    # 上一次对话的费用
        self.total_cost: float = 0.0   # 累计总费用

    async def start(self) -> None:
        """启动引擎,连接到 Claude。"""
        options = ClaudeAgentOptions(
            system_prompt=self.config.system_prompt,
            model=self.config.model,
            max_turns=self.config.max_turns,
        )
        self._client = ClaudeSDKClient(options)
        await self._client.connect()

    async def chat(self, prompt: str) -> AsyncIterator[str]:
        """发送消息,流式返回回复文本。"""
        if not self._client:
            raise RuntimeError("引擎未启动,请先调用 start()")

        await self._client.query(prompt)

        async for msg in self._client.receive_response():
            if isinstance(msg, AssistantMessage):
                for block in msg.content:
                    if isinstance(block, TextBlock):
                        yield block.text
                    elif isinstance(block, ThinkingBlock):
                        yield f"\n[思考中...]\n"
                    elif isinstance(block, ToolUseBlock):
                        yield f"\n[使用工具: {block.name}]\n"
            elif isinstance(msg, ResultMessage):
                self.last_cost = msg.total_cost_usd or 0.0
                self.total_cost += self.last_cost

    async def stop(self) -> None:
        """停止引擎,断开连接。"""
        if self._client:
            await self._client.disconnect()
            self._client = None

2.4 逐行拆解

__init__:准备工作

def __init__(self, config: MiniClawConfig):
    self.config = config
    self._client: ClaudeSDKClient | None = None
    self.last_cost: float = 0.0
    self.total_cost: float = 0.0

把配置存下来,客户端先设为 None(还没连接呢),费用初始化为 0。

start():建立连接

async def start(self) -> None:
    options = ClaudeAgentOptions(
        system_prompt=self.config.system_prompt,
        model=self.config.model,
        max_turns=self.config.max_turns,
    )
    self._client = ClaudeSDKClient(options)
    await self._client.connect()

从配置里拿出 system_prompt、model、max_turns,创建 ClaudeAgentOptions, 然后创建客户端并连接。这就是第 4 章学的 ClaudeSDKClient 的手动连接方式。

为什么不用 async with?因为 start()stop() 不在同一个函数里。 引擎启动后要一直保持连接,直到用户主动退出才断开。

chat():核心 -- 发送消息并流式返回

async def chat(self, prompt: str) -> AsyncIterator[str]:
    await self._client.query(prompt)

    async for msg in self._client.receive_response():
        if isinstance(msg, AssistantMessage):
            for block in msg.content:
                if isinstance(block, TextBlock):
                    yield block.text
                elif isinstance(block, ThinkingBlock):
                    yield f"\n[思考中...]\n"
                elif isinstance(block, ToolUseBlock):
                    yield f"\n[使用工具: {block.name}]\n"
        elif isinstance(msg, ResultMessage):
            self.last_cost = msg.total_cost_usd or 0.0
            self.total_cost += self.last_cost

这个函数是一个异步生成器(注意 yield 关键字)。它不是一次性把所有回复都返回, 而是像流水线一样,收到一点就吐出一点。调用方可以边收边打印,实现"打字机"效果。

消息处理逻辑:

消息类型 处理方式
TextBlock 直接 yield 文本内容
ThinkingBlock yield 一个"思考中"的提示
ToolUseBlock yield 工具名称(让用户知道 Claude 在做什么)
ResultMessage 记录费用,不 yield(这是统计信息,不是回复内容)

stop():断开连接

async def stop(self) -> None:
    if self._client:
        await self._client.disconnect()
        self._client = None

断开连接,把客户端设回 None。加了 if 判断,防止重复断开报错。

2.5 错误恢复

实际使用中,连接可能因为网络问题断开。我们在 engine.py 里加了一个简单的防御:

async def chat(self, prompt: str) -> AsyncIterator[str]:
    if not self._client:
        raise RuntimeError("引擎未启动,请先调用 start()")
    ...

如果引擎没启动就调用 chat(),直接抛异常,而不是让程序莫名其妙地崩溃。 这种"先检查再执行"的模式叫做防御性编程,在写工具类代码时非常有用。


3. 记忆系统(memory.py)

3.1 为什么要记忆?

你可能会问:ClaudeSDKClient 不是已经有上下文记忆了吗?同一个连接里的多轮对话, Claude 都记得啊。

没错,但那是会话内记忆 -- 你关掉程序,记忆就没了。

记忆系统要做的是跨会话记忆: - 今天聊了什么,明天打开还能看到 - 可以浏览历史对话 - 可以开始新对话,也可以回顾旧对话

就好比微信的聊天记录 -- 你关了微信再打开,之前的消息还在。

3.2 用 SQLite 存储

我们用 SQLite 来存对话记录。为什么选 SQLite?

3.3 数据库设计

两张表,很简单:

conversations(对话表)
├── id          TEXT     主键,UUID
├── title       TEXT     对话标题(用第一条消息自动生成)
├── created_at  TEXT     创建时间
└── updated_at  TEXT     最后更新时间

messages(消息表)
├── id               INTEGER  自增主键
├── conversation_id  TEXT     关联 conversations.id
├── role             TEXT     "user" 或 "assistant"
├── content          TEXT     消息内容
└── timestamp        TEXT     时间戳

为什么 conversation id 用 UUID 而不是自增整数?

因为 UUID 是全局唯一的,不会重复。如果以后你要做多设备同步、导出导入之类的功能, UUID 不会出现冲突问题。

3.4 核心代码

"""记忆系统 - 基于 SQLite 的对话历史存储。"""

import sqlite3
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path


@dataclass
class Conversation:
    """对话记录。"""
    id: str
    title: str
    created_at: str
    updated_at: str


@dataclass
class MemoryMessage:
    """消息记录。"""
    id: int
    conversation_id: str
    role: str        # "user" 或 "assistant"
    content: str
    timestamp: str


class Memory:
    """记忆系统:管理对话历史的持久化存储。"""

    def __init__(self, db_path: Path | None = None):
        if db_path is None:
            db_path = Path.home() / ".miniclaw" / "memory.db"

        # 确保目录存在
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self.db_path = db_path
        self._conn: sqlite3.Connection | None = None
        self._init_db()

    def _init_db(self) -> None:
        """初始化数据库,创建表(如果不存在)。"""
        conn = self._get_conn()
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS conversations (
                id         TEXT PRIMARY KEY,
                title      TEXT NOT NULL,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL
            );
            CREATE TABLE IF NOT EXISTS messages (
                id               INTEGER PRIMARY KEY AUTOINCREMENT,
                conversation_id  TEXT NOT NULL,
                role             TEXT NOT NULL,
                content          TEXT NOT NULL,
                timestamp        TEXT NOT NULL,
                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
            );
        """)
        conn.commit()

3.5 核心方法

创建新对话:

def create_conversation(self, title: str = "新对话") -> Conversation:
    """创建一个新对话。"""
    conv = Conversation(
        id=str(uuid.uuid4()),
        title=title,
        created_at=datetime.now(timezone.utc).isoformat(),
        updated_at=datetime.now(timezone.utc).isoformat(),
    )
    conn = self._get_conn()
    conn.execute(
        "INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)",
        (conv.id, conv.title, conv.created_at, conv.updated_at),
    )
    conn.commit()
    return conv

保存一条消息:

def add_message(self, conversation_id: str, role: str, content: str) -> MemoryMessage:
    """保存一条消息到指定对话。"""
    now = datetime.now(timezone.utc).isoformat()
    conn = self._get_conn()

    cursor = conn.execute(
        "INSERT INTO messages (conversation_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
        (conversation_id, role, content, now),
    )
    conn.execute(
        "UPDATE conversations SET updated_at = ? WHERE id = ?",
        (now, conversation_id),
    )
    conn.commit()

    return MemoryMessage(
        id=cursor.lastrowid or 0,
        conversation_id=conversation_id,
        role=role,
        content=content,
        timestamp=now,
    )

注意两个细节:

  1. 保存消息的同时更新对话的 updated_at,这样"最近对话"排序就是正确的
  2. cursor.lastrowid 拿到自增 ID,这是 SQLite 的标准做法

获取对话历史:

def get_history(self, conversation_id: str, limit: int = 50) -> list[MemoryMessage]:
    """获取指定对话的消息历史。"""
    conn = self._get_conn()
    rows = conn.execute(
        "SELECT id, conversation_id, role, content, timestamp "
        "FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC LIMIT ?",
        (conversation_id, limit),
    ).fetchall()

    return [MemoryMessage(*row) for row in rows]

ORDER BY timestamp ASC -- 按时间正序排列,最早的消息在前面。 LIMIT ? -- 防止一次查出几万条消息把内存撑爆。

列出所有对话:

def list_conversations(self, limit: int = 20) -> list[Conversation]:
    """列出所有对话,最近的在前面。"""
    conn = self._get_conn()
    rows = conn.execute(
        "SELECT id, title, created_at, updated_at "
        "FROM conversations ORDER BY updated_at DESC LIMIT ?",
        (limit,),
    ).fetchall()

    return [Conversation(*row) for row in rows]

ORDER BY updated_at DESC -- 最近更新的对话排在前面,和微信聊天列表一样。

3.6 自动创建数据库

注意 __init__ 里这一行:

db_path.parent.mkdir(parents=True, exist_ok=True)

第一次运行时,~/.miniclaw/ 目录还不存在。这行代码会自动创建它。 parents=True 表示递归创建父目录,exist_ok=True 表示目录已存在也不报错。

整个过程对用户完全透明 -- 用户不需要手动创建任何目录或数据库文件。


4. CLI 交互层(cli.py)

4.1 它要干什么?

CLI 交互层是用户直接接触的部分。它负责:

4.2 关于 rich 库

我们的 CLI 会尝试用 rich 库来美化输出。 但如果没装 rich,就退回到纯 print不强制依赖第三方库,这是 CLI 工具的好习惯。

try:
    from rich.console import Console
    from rich.markdown import Markdown
    _console = Console()
    HAS_RICH = True
except ImportError:
    HAS_RICH = False

4.3 命令系统

支持的命令:

命令 说明
/help 显示帮助信息
/history 查看当前对话的消息历史
/list 列出所有对话
/new 开始新对话
/cost 查看费用统计
/quit 退出 MiniClaw

命令处理的逻辑很朴素 -- 就是一堆 if/elif

async def _handle_command(self, command: str) -> bool:
    """处理斜杠命令。返回 True 表示应该继续主循环,False 表示应该退出。"""
    cmd = command.strip().lower()

    if cmd == "/help":
        self._print_help()
    elif cmd == "/history":
        self._show_history()
    elif cmd == "/list":
        self._show_conversations()
    elif cmd == "/new":
        self._new_conversation()
    elif cmd == "/cost":
        self._show_cost()
    elif cmd in ("/quit", "/exit", "/q"):
        return False
    else:
        self._print_info(f"未知命令: {command}。输入 /help 查看帮助。")

    return True

4.4 主循环

主循环是 CLI 的心脏:

async def run(self) -> None:
    """运行 CLI 主循环。"""
    self._print_welcome()

    # 启动引擎
    await self.engine.start()

    # 创建初始对话
    conv = self.memory.create_conversation()
    self.current_conversation_id = conv.id

    try:
        while True:
            # 1. 读取输入
            try:
                user_input = input("\n你: ").strip()
            except (EOFError, KeyboardInterrupt):
                break

            if not user_input:
                continue

            # 2. 处理命令
            if user_input.startswith("/"):
                should_continue = await self._handle_command(user_input)
                if not should_continue:
                    break
                continue

            # 3. 保存用户消息到记忆
            self.memory.add_message(
                self.current_conversation_id, "user", user_input
            )

            # 4. 发给引擎,流式接收回复
            print("\nClaude: ", end="", flush=True)
            full_response = []

            async for chunk in self.engine.chat(user_input):
                print(chunk, end="", flush=True)
                full_response.append(chunk)

            print()  # 换行

            # 5. 保存 Claude 的回复到记忆
            response_text = "".join(full_response)
            if response_text:
                self.memory.add_message(
                    self.current_conversation_id, "assistant", response_text
                )

            # 6. 显示费用(如果有的话)
            if self.engine.last_cost > 0:
                self._print_dim(f"  [费用: ${self.engine.last_cost:.4f}]")

    finally:
        await self.engine.stop()

五步流程,清清楚楚:

读取输入 -> 处理命令 -> 保存用户消息 -> 引擎处理 -> 保存回复

注意 finally 块里的 await self.engine.stop() -- 不管程序是正常退出还是出了异常, 都要断开和 Claude 的连接。这和第 4 章讲的 try/finally 模式一样。

4.5 流式输出的关键

async for chunk in self.engine.chat(user_input):
    print(chunk, end="", flush=True)
    full_response.append(chunk)

两个关键参数:

没有 flush=True 的话,Python 会攒一堆内容再一次性输出,就没有"打字机"效果了。

同时我们把每块文本存到 full_response 列表里,最后拼起来保存到记忆系统。 流式输出和完整保存,两不误。


5. 把它们串起来 -- 更新 main.py

上一章的 __main__.py 只做了认证和配置。现在我们把四个模块串起来:

"""MiniClaw 入口文件。"""

import asyncio
import sys

from .auth import detect_auth_mode
from .config import load_config
from .engine import ChatEngine
from .memory import Memory
from .cli import CLI


def main() -> None:
    """MiniClaw 主入口。"""
    # 1. 检测认证方式
    auth = detect_auth_mode()
    if not auth.is_valid:
        print(f"认证失败: {auth.message}")
        print("请设置 ANTHROPIC_API_KEY 或确保 Claude CLI 已登录。")
        sys.exit(1)

    print(f"认证方式: {auth.mode} ({auth.message})")

    # 2. 加载配置
    config = load_config()

    # 3. 创建核心组件
    engine = ChatEngine(config)
    memory = Memory()
    cli = CLI(engine=engine, memory=memory)

    # 4. 启动 CLI 主循环
    asyncio.run(cli.run())


if __name__ == "__main__":
    main()

调用链路:

main()
  ├── detect_auth_mode()    # 检查能不能用
  ├── load_config()         # 读配置
  ├── ChatEngine(config)    # 创建引擎
  ├── Memory()              # 创建记忆
  ├── CLI(engine, memory)   # 创建 CLI
  └── cli.run()             # 开跑!

每个模块各司其职,互不干扰。这就是"单一职责原则"在实际项目中的体现。


6. 运行效果展示

全部写完了,来看看效果:

# 运行 MiniClaw
python -m miniclaw
认证方式: api_key (使用 API Key 认证)

  MiniClaw v0.1.0 - 你的 AI 助手
  输入消息开始聊天,输入 /help 查看命令。

你: 你好!我叫小明,是个 Python 开发者。

Claude: 你好小明!很高兴认识你。作为 Python 开发者,你平时主要做哪方面的
开发呢?Web 后端、数据分析、还是其他方向?
  [费用: $0.0012]

你: 我主要做后端开发。你能记住我的信息吗?

Claude: 当然!我记住了,你是小明,主要做 Python 后端开发。在我们这次对话
期间,我会一直记得这些信息。有什么我能帮你的吗?
  [费用: $0.0008]

你: /cost

  本次费用: $0.0008
  累计费用: $0.0020

你: /history

  === 对话历史 ===
  [2026-02-25 10:30:15] 你: 你好!我叫小明,是个 Python 开发者。
  [2026-02-25 10:30:18] Claude: 你好小明!很高兴认识你...
  [2026-02-25 10:31:02] 你: 我主要做后端开发。你能记住我的信息吗?
  [2026-02-25 10:31:05] Claude: 当然!我记住了...

你: /new

  已开始新对话。

你: /quit

  再见!累计花费: $0.0020

关键体验点

  1. 多轮对话有记忆:Claude 记得你叫小明、做后端开发
  2. 流式输出:回复是逐字打印的,不是等全部生成完才显示
  3. 费用透明:每次对话后显示费用,/cost 查看累计
  4. 对话历史持久化:下次打开,/history 还能看到之前聊的内容
  5. 命令系统/new 开新话题,/quit 退出

7. 三个模块的协作关系

最后来看看这三个模块是怎么配合的:

用户输入 "你好"
    │
    ▼
 CLI.run()
    │
    ├── memory.add_message("user", "你好")     # 存用户消息
    │
    ├── engine.chat("你好")                     # 发给 Claude
    │     │
    │     ├── ClaudeSDKClient.query("你好")     # SDK 发送
    │     │
    │     └── yield "你好小明..."                # 流式返回
    │
    ├── print("你好小明...", flush=True)         # 实时打印
    │
    └── memory.add_message("assistant", "...")  # 存 Claude 回复

三个模块像齿轮一样咬合:

它们之间通过简单的方法调用通信,没有复杂的事件系统或回调。 简单就是好的 -- 在项目初期不要过度设计。


8. 小结

这一章我们实现了 MiniClaw 的三个核心模块:

模块 文件 核心类 职责
对话引擎 engine.py ChatEngine 封装 ClaudeSDKClient,提供简洁的对话接口
记忆系统 memory.py Memory SQLite 持久化存储对话历史
CLI 交互层 cli.py CLI 终端界面、命令处理、流式输出

学到的关键技术:

  1. 异步生成器(async generator) -- chat() 方法用 yield 流式返回文本
  2. SQLite 持久化 -- 零配置数据库,自动创建目录和表
  3. 流式打印 -- print(text, end="", flush=True) 实现打字机效果
  4. 防御性编程 -- 检查引擎状态,优雅处理异常
  5. 模块协作 -- CLI 协调 Engine 和 Memory,各司其职

现在 MiniClaw 已经是一个能用的 AI 助手了。但它还只是个"话痨" -- 只会聊天。 下一章,我们会给它装上工具,让它不仅能说,还能做。


9. 完整代码

以下是本章所有文件的完整可运行代码。在 miniclaw/ 目录的上级目录执行 python -m miniclaw 即可运行。

miniclaw/__init__.py

"""MiniClaw - 迷你版 AI Agent 助手。"""

__version__ = "0.1.0"

miniclaw/auth.py

"""认证模块 - 检测和管理 Claude API 认证方式。

支持两种认证方式:
1. API Key: 通过环境变量 ANTHROPIC_API_KEY 设置
2. Claude CLI 登录: 通过 claude CLI 工具的 OAuth 登录
"""

import os
import shutil
from dataclasses import dataclass


@dataclass
class AuthMode:
    """认证信息。"""

    mode: str  # "api_key" | "cli_login" | "none"
    is_valid: bool  # 是否认证成功
    message: str  # 描述信息


def detect_auth_mode() -> AuthMode:
    """自动检测当前可用的认证方式。

    检测顺序:
    1. 检查环境变量 ANTHROPIC_API_KEY
    2. 检查 Claude CLI 是否已安装并登录

    Returns:
        AuthMode: 包含认证方式、是否有效、描述信息的数据类
    """
    # 方式一: 检查 API Key
    api_key = os.environ.get("ANTHROPIC_API_KEY", "").strip()
    if api_key:
        # 简单校验: API Key 通常以 "sk-ant-" 开头
        if api_key.startswith("sk-ant-"):
            return AuthMode(
                mode="api_key",
                is_valid=True,
                message="使用 API Key 认证",
            )
        else:
            return AuthMode(
                mode="api_key",
                is_valid=True,
                message="使用 API Key 认证(格式可能非标准)",
            )

    # 方式二: 检查 Claude CLI
    claude_path = shutil.which("claude")
    if claude_path:
        return AuthMode(
            mode="cli_login",
            is_valid=True,
            message=f"使用 Claude CLI 认证 ({claude_path})",
        )

    # 都没找到
    return AuthMode(
        mode="none",
        is_valid=False,
        message="未找到认证方式。请设置 ANTHROPIC_API_KEY 或安装 Claude CLI。",
    )

miniclaw/config.py

"""配置模块 - MiniClaw 的配置管理。

配置文件位于 ~/.miniclaw/config.json,首次运行时自动创建。
"""

import json
from dataclasses import asdict, dataclass, field
from pathlib import Path

# 配置文件目录
CONFIG_DIR = Path.home() / ".miniclaw"
CONFIG_FILE = CONFIG_DIR / "config.json"


@dataclass
class MiniClawConfig:
    """MiniClaw 配置。"""

    # Claude 模型名称,None 表示使用默认模型
    model: str | None = None

    # 系统提示词,定义 Claude 的行为风格
    system_prompt: str = (
        "你是 MiniClaw,一个友好的 AI 助手。"
        "你说话简洁、有帮助,善于用通俗易懂的方式解释复杂概念。"
    )

    # 最大对话轮次(防止无限循环),None 表示不限制
    max_turns: int | None = None

    # 额外的自定义设置
    extra: dict = field(default_factory=dict)


def load_config() -> MiniClawConfig:
    """加载配置。如果配置文件不存在,创建默认配置。

    Returns:
        MiniClawConfig: 配置对象
    """
    if CONFIG_FILE.exists():
        try:
            data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
            return MiniClawConfig(**data)
        except (json.JSONDecodeError, TypeError):
            # 配置文件损坏,返回默认配置
            pass

    # 文件不存在或损坏,创建默认配置
    config = MiniClawConfig()
    save_config(config)
    return config


def save_config(config: MiniClawConfig) -> None:
    """保存配置到文件。

    Args:
        config: 要保存的配置对象
    """
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)
    CONFIG_FILE.write_text(
        json.dumps(asdict(config), ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

miniclaw/engine.py

"""对话引擎 - 基于 ClaudeSDKClient 的多轮对话管理。

封装 ClaudeSDKClient,提供简洁的对话接口:
- start()  -- 连接 Claude
- chat()   -- 发送消息,流式返回回复
- stop()   -- 断开连接
"""

from collections.abc import AsyncIterator

from claude_agent_sdk import (
    AssistantMessage,
    ClaudeAgentOptions,
    ClaudeSDKClient,
    ResultMessage,
    TextBlock,
    ThinkingBlock,
    ToolUseBlock,
)

from .config import MiniClawConfig


class ChatEngine:
    """对话引擎:封装 ClaudeSDKClient,提供简洁的对话接口。

    用法::

        engine = ChatEngine(config)
        await engine.start()

        async for text in engine.chat("你好"):
            print(text, end="", flush=True)

        await engine.stop()
    """

    def __init__(self, config: MiniClawConfig) -> None:
        self.config = config
        self._client: ClaudeSDKClient | None = None

        # 费用统计
        self.last_cost: float = 0.0  # 上一次对话的费用(美元)
        self.total_cost: float = 0.0  # 累计总费用(美元)

    @property
    def is_running(self) -> bool:
        """引擎是否正在运行。"""
        return self._client is not None

    async def start(self) -> None:
        """启动引擎,连接到 Claude。

        从配置中读取 system_prompt、model、max_turns,
        创建 ClaudeSDKClient 并建立连接。

        Raises:
            RuntimeError: 如果引擎已经在运行
        """
        if self._client:
            raise RuntimeError("引擎已在运行,请先调用 stop()")

        # 从配置构建 ClaudeAgentOptions
        options = ClaudeAgentOptions(
            system_prompt=self.config.system_prompt,
            model=self.config.model,
            max_turns=self.config.max_turns,
        )

        # 创建客户端并连接
        # 这里用手动 connect() 而不是 async with,
        # 因为 start() 和 stop() 在不同的时间点调用
        self._client = ClaudeSDKClient(options)
        await self._client.connect()

    async def chat(self, prompt: str) -> AsyncIterator[str]:
        """发送消息,流式返回回复文本。

        这是一个异步生成器。每收到一块文本就 yield 出去,
        调用方可以边收边打印,实现"打字机"效果。

        Args:
            prompt: 用户输入的消息

        Yields:
            str: 回复文本的片段

        Raises:
            RuntimeError: 如果引擎未启动
        """
        if not self._client:
            raise RuntimeError("引擎未启动,请先调用 start()")

        # 发送消息
        await self._client.query(prompt)

        # 流式接收回复
        async for msg in self._client.receive_response():
            if isinstance(msg, AssistantMessage):
                # 遍历消息中的所有内容块
                for block in msg.content:
                    if isinstance(block, TextBlock):
                        # 文本块:直接输出文本内容
                        yield block.text

                    elif isinstance(block, ThinkingBlock):
                        # 思考块:显示一个提示(不暴露思考细节)
                        yield "\n[思考中...]\n"

                    elif isinstance(block, ToolUseBlock):
                        # 工具调用块:显示正在使用的工具名称
                        yield f"\n[使用工具: {block.name}]\n"

            elif isinstance(msg, ResultMessage):
                # 结果消息:记录费用(不 yield,这不是回复内容)
                self.last_cost = msg.total_cost_usd or 0.0
                self.total_cost += self.last_cost

    async def stop(self) -> None:
        """停止引擎,断开与 Claude 的连接。

        可以安全地多次调用,不会报错。
        """
        if self._client:
            await self._client.disconnect()
            self._client = None

miniclaw/memory.py

"""记忆系统 - 基于 SQLite 的对话历史存储。

将对话记录持久化到 ~/.miniclaw/memory.db,实现跨会话记忆。
数据库包含两张表:
- conversations: 对话列表(id, title, 时间戳)
- messages: 消息记录(id, conversation_id, role, content, 时间戳)
"""

import sqlite3
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path


@dataclass
class Conversation:
    """对话记录。"""

    id: str  # UUID
    title: str  # 对话标题
    created_at: str  # 创建时间 (ISO 格式)
    updated_at: str  # 最后更新时间 (ISO 格式)


@dataclass
class MemoryMessage:
    """消息记录。"""

    id: int  # 自增主键
    conversation_id: str  # 所属对话的 UUID
    role: str  # "user" 或 "assistant"
    content: str  # 消息内容
    timestamp: str  # 时间戳 (ISO 格式)


class Memory:
    """记忆系统:管理对话历史的持久化存储。

    用法::

        memory = Memory()  # 自动创建 ~/.miniclaw/memory.db

        # 创建新对话
        conv = memory.create_conversation("关于 Python 的对话")

        # 保存消息
        memory.add_message(conv.id, "user", "你好")
        memory.add_message(conv.id, "assistant", "你好!有什么可以帮你的?")

        # 查看历史
        messages = memory.get_history(conv.id)
        for msg in messages:
            print(f"[{msg.role}] {msg.content}")

        # 列出所有对话
        conversations = memory.list_conversations()
    """

    def __init__(self, db_path: Path | None = None) -> None:
        """初始化记忆系统。

        Args:
            db_path: 数据库文件路径。默认为 ~/.miniclaw/memory.db。
                     目录不存在时会自动创建。
        """
        if db_path is None:
            db_path = Path.home() / ".miniclaw" / "memory.db"

        # 确保目录存在
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self.db_path = db_path
        self._conn: sqlite3.Connection | None = None

        # 初始化数据库表
        self._init_db()

    def _get_conn(self) -> sqlite3.Connection:
        """获取数据库连接(懒初始化)。"""
        if self._conn is None:
            self._conn = sqlite3.connect(str(self.db_path))
            # 开启外键约束
            self._conn.execute("PRAGMA foreign_keys = ON")
        return self._conn

    def _init_db(self) -> None:
        """初始化数据库,创建表(如果不存在)。"""
        conn = self._get_conn()
        conn.executescript(
            """
            CREATE TABLE IF NOT EXISTS conversations (
                id         TEXT PRIMARY KEY,
                title      TEXT NOT NULL,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS messages (
                id               INTEGER PRIMARY KEY AUTOINCREMENT,
                conversation_id  TEXT NOT NULL,
                role             TEXT NOT NULL CHECK(role IN ('user', 'assistant')),
                content          TEXT NOT NULL,
                timestamp        TEXT NOT NULL,
                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
            );

            CREATE INDEX IF NOT EXISTS idx_messages_conversation
                ON messages(conversation_id);
        """
        )
        conn.commit()

    # ---- 对话管理 ----

    def create_conversation(self, title: str = "新对话") -> Conversation:
        """创建一个新对话。

        Args:
            title: 对话标题,默认为"新对话"

        Returns:
            新创建的 Conversation 对象
        """
        now = datetime.now(timezone.utc).isoformat()
        conv = Conversation(
            id=str(uuid.uuid4()),
            title=title,
            created_at=now,
            updated_at=now,
        )
        conn = self._get_conn()
        conn.execute(
            "INSERT INTO conversations (id, title, created_at, updated_at) "
            "VALUES (?, ?, ?, ?)",
            (conv.id, conv.title, conv.created_at, conv.updated_at),
        )
        conn.commit()
        return conv

    def get_conversation(self, conversation_id: str) -> Conversation | None:
        """根据 ID 获取对话。

        Args:
            conversation_id: 对话 UUID

        Returns:
            Conversation 对象,不存在则返回 None
        """
        conn = self._get_conn()
        row = conn.execute(
            "SELECT id, title, created_at, updated_at "
            "FROM conversations WHERE id = ?",
            (conversation_id,),
        ).fetchone()

        if row is None:
            return None
        return Conversation(*row)

    def list_conversations(self, limit: int = 20) -> list[Conversation]:
        """列出所有对话,最近更新的在前面。

        Args:
            limit: 最多返回多少条,默认 20

        Returns:
            Conversation 列表
        """
        conn = self._get_conn()
        rows = conn.execute(
            "SELECT id, title, created_at, updated_at "
            "FROM conversations ORDER BY updated_at DESC LIMIT ?",
            (limit,),
        ).fetchall()

        return [Conversation(*row) for row in rows]

    def update_conversation_title(
        self, conversation_id: str, title: str
    ) -> None:
        """更新对话标题。

        Args:
            conversation_id: 对话 UUID
            title: 新标题
        """
        conn = self._get_conn()
        conn.execute(
            "UPDATE conversations SET title = ? WHERE id = ?",
            (title, conversation_id),
        )
        conn.commit()

    # ---- 消息管理 ----

    def add_message(
        self, conversation_id: str, role: str, content: str
    ) -> MemoryMessage:
        """保存一条消息到指定对话。

        同时更新对话的 updated_at 时间戳。
        如果是该对话的第一条用户消息,会自动将对话标题更新为消息内容的前 30 个字符。

        Args:
            conversation_id: 对话 UUID
            role: "user" 或 "assistant"
            content: 消息内容

        Returns:
            新创建的 MemoryMessage 对象
        """
        now = datetime.now(timezone.utc).isoformat()
        conn = self._get_conn()

        # 插入消息
        cursor = conn.execute(
            "INSERT INTO messages (conversation_id, role, content, timestamp) "
            "VALUES (?, ?, ?, ?)",
            (conversation_id, role, content, now),
        )

        # 更新对话的最后活跃时间
        conn.execute(
            "UPDATE conversations SET updated_at = ? WHERE id = ?",
            (now, conversation_id),
        )

        # 如果是第一条用户消息,用它做对话标题
        if role == "user":
            msg_count = conn.execute(
                "SELECT COUNT(*) FROM messages "
                "WHERE conversation_id = ? AND role = 'user'",
                (conversation_id,),
            ).fetchone()
            if msg_count and msg_count[0] == 1:
                # 截取前 30 个字符作为标题
                short_title = content[:30] + ("..." if len(content) > 30 else "")
                conn.execute(
                    "UPDATE conversations SET title = ? WHERE id = ?",
                    (short_title, conversation_id),
                )

        conn.commit()

        return MemoryMessage(
            id=cursor.lastrowid or 0,
            conversation_id=conversation_id,
            role=role,
            content=content,
            timestamp=now,
        )

    def get_history(
        self, conversation_id: str, limit: int = 50
    ) -> list[MemoryMessage]:
        """获取指定对话的消息历史。

        Args:
            conversation_id: 对话 UUID
            limit: 最多返回多少条消息,默认 50

        Returns:
            MemoryMessage 列表,按时间正序排列
        """
        conn = self._get_conn()
        rows = conn.execute(
            "SELECT id, conversation_id, role, content, timestamp "
            "FROM messages WHERE conversation_id = ? "
            "ORDER BY timestamp ASC LIMIT ?",
            (conversation_id, limit),
        ).fetchall()

        return [MemoryMessage(*row) for row in rows]

    def count_messages(self, conversation_id: str) -> int:
        """统计指定对话的消息数量。

        Args:
            conversation_id: 对话 UUID

        Returns:
            消息总数
        """
        conn = self._get_conn()
        row = conn.execute(
            "SELECT COUNT(*) FROM messages WHERE conversation_id = ?",
            (conversation_id,),
        ).fetchone()
        return row[0] if row else 0

    # ---- 清理 ----

    def close(self) -> None:
        """关闭数据库连接。"""
        if self._conn:
            self._conn.close()
            self._conn = None

miniclaw/cli.py

"""CLI 交互层 - MiniClaw 的终端用户界面。

提供:
- 欢迎界面
- 斜杠命令系统 (/help, /history, /new, /list, /cost, /quit)
- 流式输出
- 消息的自动记忆存储

可选依赖 rich 库进行终端美化,没装也能正常运行。
"""

from __future__ import annotations

from datetime import datetime

from . import __version__
from .engine import ChatEngine
from .memory import Memory

# 尝试导入 rich 做终端美化,没装就用纯 print
try:
    from rich.console import Console

    _console = Console()
    HAS_RICH = True
except ImportError:
    _console = None  # type: ignore[assignment]
    HAS_RICH = False


class CLI:
    """CLI 交互层:终端界面、命令处理、流式输出。

    用法::

        cli = CLI(engine=engine, memory=memory)
        await cli.run()  # 启动主循环
    """

    def __init__(self, engine: ChatEngine, memory: Memory) -> None:
        self.engine = engine
        self.memory = memory
        self.current_conversation_id: str | None = None

    # ---- 输出辅助方法 ----

    @staticmethod
    def _print_info(text: str) -> None:
        """打印提示信息(带颜色,如果有 rich)。"""
        if HAS_RICH and _console:
            _console.print(f"  [cyan]{text}[/cyan]")
        else:
            print(f"  {text}")

    @staticmethod
    def _print_dim(text: str) -> None:
        """打印灰色/暗淡文本。"""
        if HAS_RICH and _console:
            _console.print(f"[dim]{text}[/dim]")
        else:
            print(text)

    @staticmethod
    def _print_error(text: str) -> None:
        """打印错误信息。"""
        if HAS_RICH and _console:
            _console.print(f"  [red]{text}[/red]")
        else:
            print(f"  [错误] {text}")

    @staticmethod
    def _print_success(text: str) -> None:
        """打印成功信息。"""
        if HAS_RICH and _console:
            _console.print(f"  [green]{text}[/green]")
        else:
            print(f"  {text}")

    # ---- 命令处理 ----

    def _print_welcome(self) -> None:
        """显示欢迎信息。"""
        banner = (
            f"\n  MiniClaw v{__version__} - 你的 AI 助手\n"
            f"  输入消息开始聊天,输入 /help 查看命令。\n"
        )
        if HAS_RICH and _console:
            _console.print(f"[bold blue]{banner}[/bold blue]")
        else:
            print(banner)

    def _print_help(self) -> None:
        """显示帮助信息。"""
        help_text = """
  可用命令:
    /help     显示帮助信息
    /history  查看当前对话的消息历史
    /list     列出所有对话
    /new      开始新对话
    /cost     查看费用统计
    /quit     退出 MiniClaw(也可以用 /exit 或 /q)
"""
        print(help_text)

    def _show_history(self) -> None:
        """显示当前对话的消息历史。"""
        if not self.current_conversation_id:
            self._print_info("当前没有活跃的对话。")
            return

        messages = self.memory.get_history(self.current_conversation_id)
        if not messages:
            self._print_info("当前对话还没有消息。")
            return

        print()
        self._print_info("=== 对话历史 ===")
        for msg in messages:
            # 尝试格式化时间,如果解析失败就显示原始字符串
            try:
                ts = datetime.fromisoformat(msg.timestamp)
                time_str = ts.strftime("%Y-%m-%d %H:%M:%S")
            except ValueError:
                time_str = msg.timestamp

            role_label = "你" if msg.role == "user" else "Claude"
            # 截取内容前 80 个字符
            content_preview = msg.content[:80]
            if len(msg.content) > 80:
                content_preview += "..."

            self._print_dim(f"  [{time_str}] {role_label}: {content_preview}")
        print()

    def _show_conversations(self) -> None:
        """列出所有对话。"""
        conversations = self.memory.list_conversations()
        if not conversations:
            self._print_info("还没有对话记录。")
            return

        print()
        self._print_info("=== 对话列表 ===")
        for i, conv in enumerate(conversations, 1):
            msg_count = self.memory.count_messages(conv.id)
            # 标记当前对话
            marker = " <-- 当前" if conv.id == self.current_conversation_id else ""
            self._print_dim(
                f"  {i}. {conv.title} ({msg_count} 条消息){marker}"
            )
        print()

    def _new_conversation(self) -> None:
        """开始新对话。"""
        conv = self.memory.create_conversation()
        self.current_conversation_id = conv.id
        self._print_success("已开始新对话。")

    def _show_cost(self) -> None:
        """显示费用统计。"""
        print()
        self._print_info(f"本次费用: ${self.engine.last_cost:.4f}")
        self._print_info(f"累计费用: ${self.engine.total_cost:.4f}")
        print()

    async def _handle_command(self, command: str) -> bool:
        """处理斜杠命令。

        Args:
            command: 用户输入的命令字符串(如 "/help")

        Returns:
            True 表示继续主循环,False 表示应该退出
        """
        cmd = command.strip().lower()

        if cmd == "/help":
            self._print_help()
        elif cmd == "/history":
            self._show_history()
        elif cmd == "/list":
            self._show_conversations()
        elif cmd == "/new":
            self._new_conversation()
        elif cmd == "/cost":
            self._show_cost()
        elif cmd in ("/quit", "/exit", "/q"):
            return False
        else:
            self._print_info(
                f"未知命令: {command}。输入 /help 查看帮助。"
            )

        return True

    # ---- 主循环 ----

    async def run(self) -> None:
        """运行 CLI 主循环。

        流程:
        1. 显示欢迎信息
        2. 启动对话引擎
        3. 创建初始对话
        4. 循环读取输入 -> 处理命令 / 发给引擎 -> 保存到记忆
        5. 退出时断开引擎连接
        """
        self._print_welcome()

        # 启动对话引擎
        try:
            await self.engine.start()
        except Exception as e:
            self._print_error(f"引擎启动失败: {e}")
            return

        # 创建初始对话
        conv = self.memory.create_conversation()
        self.current_conversation_id = conv.id

        try:
            while True:
                # 1. 读取用户输入
                try:
                    user_input = input("\n你: ").strip()
                except (EOFError, KeyboardInterrupt):
                    # Ctrl+D 或 Ctrl+C,优雅退出
                    print()
                    break

                # 跳过空输入
                if not user_input:
                    continue

                # 2. 处理斜杠命令
                if user_input.startswith("/"):
                    should_continue = await self._handle_command(user_input)
                    if not should_continue:
                        break
                    continue

                # 3. 保存用户消息到记忆
                if self.current_conversation_id:
                    self.memory.add_message(
                        self.current_conversation_id, "user", user_input
                    )

                # 4. 发给引擎,流式接收并打印回复
                print("\nClaude: ", end="", flush=True)
                full_response: list[str] = []

                try:
                    async for chunk in self.engine.chat(user_input):
                        print(chunk, end="", flush=True)
                        full_response.append(chunk)
                except Exception as e:
                    print()
                    self._print_error(f"对话出错: {e}")
                    continue

                print()  # 换行

                # 5. 保存 Claude 的回复到记忆
                response_text = "".join(full_response)
                if response_text and self.current_conversation_id:
                    self.memory.add_message(
                        self.current_conversation_id,
                        "assistant",
                        response_text,
                    )

                # 6. 显示费用
                if self.engine.last_cost > 0:
                    self._print_dim(
                        f"  [费用: ${self.engine.last_cost:.4f}]"
                    )

        finally:
            # 无论如何都要断开引擎连接
            await self.engine.stop()
            self.memory.close()

            # 打印告别信息
            total = self.engine.total_cost
            if total > 0:
                print(f"\n  再见!累计花费: ${total:.4f}")
            else:
                print("\n  再见!")

miniclaw/__main__.py

"""MiniClaw 入口文件。

用法:
    python -m miniclaw
"""

import asyncio
import sys

from .auth import detect_auth_mode
from .cli import CLI
from .config import load_config
from .engine import ChatEngine
from .memory import Memory


def main() -> None:
    """MiniClaw 主入口。

    执行流程:
    1. 检测认证方式(API Key / CLI 登录)
    2. 加载配置文件(~/.miniclaw/config.json)
    3. 创建核心组件(引擎 + 记忆 + CLI)
    4. 启动 CLI 主循环
    """
    # 1. 检测认证方式
    auth = detect_auth_mode()
    if not auth.is_valid:
        print(f"认证失败: {auth.message}")
        print("请设置 ANTHROPIC_API_KEY 或确保 Claude CLI 已登录。")
        sys.exit(1)

    print(f"认证方式: {auth.mode} ({auth.message})")

    # 2. 加载配置
    config = load_config()

    # 3. 创建核心组件
    engine = ChatEngine(config)
    memory = Memory()
    cli = CLI(engine=engine, memory=memory)

    # 4. 启动 CLI 主循环
    try:
        asyncio.run(cli.run())
    except KeyboardInterrupt:
        # 再兜一层,防止 asyncio.run 被 Ctrl+C 中断时打印丑陋的堆栈
        print("\n  再见!")


# 支持 python -m miniclaw 运行
if __name__ == "__main__":
    main()

本章文件清单

09-实战-核心引擎与记忆/
  README.md                    # 你正在读的这个文件
  miniclaw/
    __init__.py                # 包标识
    auth.py                    # 认证模块(第 8 章)
    config.py                  # 配置模块(第 8 章)
    engine.py                  # 对话引擎(本章新增)
    memory.py                  # 记忆系统(本章新增)
    cli.py                     # CLI 交互层(本章新增)
    __main__.py                # 入口文件(本章更新)