知识库文档入库任务系统设计

作者:old wang 发布时间: 2025-03-22 阅读量:11 评论数:0

文档入库不是上传完文件就结束,后面还有解析、分块、Embedding、向量写入、状态更新这些步骤。这里面解析可能失败,Embedding 可能超时,向量库可能写入失败,用户也可能在任务执行中删除文档或取消任务。所以我不会把它做成一个同步接口,而是设计成 上传接口 + 任务状态机 + MQ 异步消费 + 节点日志 + 幂等恢复

我会这样拆:

用户上传文档后,接口只做三件事:

1.保存原始文件、创建文档记录、创建入库任务,然后发送 MQ 消息。

2.真正的解析、分块、Embedding、向量写入由消费者异步执行。

3.每个节点执行前都检查任务状态,执行后记录节点日志。如果某个节点失败,就把任务停在对应失败状态,并记录失败原因,后续可以从失败节点重试,而不是整篇文档从头开始。

实际使用中容易出现的问题是:如果只用一个 PROCESSING / SUCCESS / FAILED 大状态,失败后不知道卡在哪里;如果没有幂等,MQ 重复消费会重复写 chunk 或重复写向量;如果没有取消逻辑,用户删除知识库后,后台任务可能还在继续写向量。

排查时,看任务表当前状态,再看节点日志定位失败节点,最后结合 MQ 消费日志、文档表、chunk 表和向量库状态判断是解析失败、分块失败、Embedding 失败还是索引写入失败。

优化上,可以从状态机细化、节点日志、幂等键、失败重试、死信队列、取消检查、向量写入补偿几个方向处理。

这里的取舍是:状态机和节点日志会增加表设计和开发复杂度,但能换来失败可恢复和问题可追踪。所以我不会把文档入库做成一个同步大方法,而是拆成异步任务系统。

1. 核心表设计

1)文档表:knowledge_document

文档表记录用户上传的文档本身。

CREATE TABLE knowledge_document (
    id BIGINT PRIMARY KEY,
    knowledge_base_id BIGINT NOT NULL,
    file_name VARCHAR(255) NOT NULL,
    file_type VARCHAR(50),
    file_size BIGINT,
    file_hash VARCHAR(128),
    storage_url VARCHAR(512),
    version INT NOT NULL DEFAULT 1,
    status VARCHAR(50) NOT NULL,
    created_by BIGINT,
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    deleted BOOLEAN DEFAULT FALSE
);

关键字段说明:

knowledge_base_id:属于哪个知识库
file_hash:用于判断重复上传
version:文档版本,重建索引时很重要
status:文档状态,比如 UPLOADED、INDEXING、AVAILABLE、FAILED、DELETED
storage_url:原始文件存储位置

常见索引:

CREATE INDEX idx_doc_kb_status ON knowledge_document(knowledge_base_id, status);
CREATE UNIQUE INDEX uk_doc_kb_hash_version ON knowledge_document(knowledge_base_id, file_hash, version);

2)入库任务表:document_ingestion_task

任务表记录一次文档入库流程。

CREATE TABLE document_ingestion_task (
    id BIGINT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL UNIQUE,
    document_id BIGINT NOT NULL,
    knowledge_base_id BIGINT NOT NULL,
    version INT NOT NULL,
    status VARCHAR(50) NOT NULL,
    current_node VARCHAR(50),
    retry_count INT DEFAULT 0,
    max_retry_count INT DEFAULT 3,
    error_code VARCHAR(100),
    error_message TEXT,
    cancel_reason VARCHAR(255),
    created_by BIGINT,
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    started_at TIMESTAMP,
    finished_at TIMESTAMP
);

关键字段说明:

task_id:任务唯一 ID
document_id:对应文档
version:对应文档版本
status:任务状态
current_node:当前执行到哪个节点
retry_count:当前重试次数
error_message:失败原因

常见索引:

CREATE INDEX idx_task_doc_version ON document_ingestion_task(document_id, version);
CREATE INDEX idx_task_status_updated ON document_ingestion_task(status, updated_at);
CREATE INDEX idx_task_kb_status ON document_ingestion_task(knowledge_base_id, status);

3)分片表:document_chunk

分片表记录解析和切分后的 chunk。

CREATE TABLE document_chunk (
    id BIGINT PRIMARY KEY,
    chunk_id VARCHAR(64) NOT NULL UNIQUE,
    document_id BIGINT NOT NULL,
    knowledge_base_id BIGINT NOT NULL,
    version INT NOT NULL,
    chunk_index INT NOT NULL,
    chunk_hash VARCHAR(128) NOT NULL,
    title_path VARCHAR(512),
    content TEXT NOT NULL,
    token_count INT,
    embedding_status VARCHAR(50),
    vector_status VARCHAR(50),
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

关键唯一约束:

CREATE UNIQUE INDEX uk_chunk_doc_version_index
ON document_chunk(document_id, version, chunk_index);

CREATE UNIQUE INDEX uk_chunk_doc_version_hash
ON document_chunk(document_id, version, chunk_hash);

这里的幂等关键点是:

documentId + version + chunkIndex
documentId + version + chunkHash

这样 MQ 重复消费或者任务重试时,不会重复写入相同分片。

4)节点日志表:document_ingestion_node_log

节点日志记录每一步执行情况。

CREATE TABLE document_ingestion_node_log (
    id BIGINT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL,
    document_id BIGINT NOT NULL,
    node_name VARCHAR(50) NOT NULL,
    status VARCHAR(50) NOT NULL,
    input_summary TEXT,
    output_summary TEXT,
    error_message TEXT,
    retry_count INT DEFAULT 0,
    started_at TIMESTAMP,
    finished_at TIMESTAMP,
    duration_ms BIGINT
);

节点类型可以是:

FETCHER
PARSER
CHUNKER
EMBEDDING
INDEXER
CLEANUP

常见索引:

CREATE INDEX idx_node_task ON document_ingestion_node_log(task_id);
CREATE INDEX idx_node_doc ON document_ingestion_node_log(document_id);
CREATE INDEX idx_node_status ON document_ingestion_node_log(status);

节点日志的价值是:失败时能知道具体失败在哪一步,不是只看到一个 FAILED

2. 状态机设计

我会把状态分成 任务状态节点状态

任务状态

INIT
PENDING
PROCESSING
SUCCESS
FAILED
CANCELING
CANCELED
DEAD

含义:

INIT:任务刚创建
PENDING:等待 MQ 消费
PROCESSING:处理中
SUCCESS:全部入库成功
FAILED:可重试失败
CANCELING:正在取消
CANCELED:已取消
DEAD:超过最大重试次数,进入人工处理

节点状态

WAITING
RUNNING
SUCCESS
FAILED
SKIPPED
CANCELED

3. 状态流转

正常流程:

INIT
 -> PENDING
 -> PROCESSING
 -> PARSE_SUCCESS
 -> CHUNK_SUCCESS
 -> EMBEDDING_SUCCESS
 -> INDEX_SUCCESS
 -> SUCCESS

更工程化一点,可以把节点状态写到节点日志里,任务表只保留总状态和当前节点:

PENDING
 -> PROCESSING(current_node=PARSER)
 -> PROCESSING(current_node=CHUNKER)
 -> PROCESSING(current_node=EMBEDDING)
 -> PROCESSING(current_node=INDEXER)
 -> SUCCESS

失败流程:

PROCESSING
 -> FAILED
 -> PENDING
 -> PROCESSING

超过最大重试:

FAILED
 -> DEAD

取消流程:

PENDING / PROCESSING
 -> CANCELING
 -> CANCELED

4. MQ Topic / Tag 设计

我会按任务粒度和节点粒度选择。

简单版本:一个 Topic,一个 Tag

Topic: RAG_DOCUMENT_INGESTION
Tag: INGEST_DOCUMENT

消息体:

{
  "taskId": "task_001",
  "documentId": 1001,
  "knowledgeBaseId": 10,
  "version": 1,
  "operator": 123,
  "traceId": "trace_xxx"
}

消费者拿到消息后,按状态机顺序执行 Parser、Chunker、Embedding、Indexer。

更细版本:按节点拆 Tag

Topic: RAG_DOCUMENT_INGESTION

Tag:
PARSE_DOCUMENT
CHUNK_DOCUMENT
EMBEDDING_CHUNK
INDEX_VECTOR
CLEANUP_DOCUMENT

优点是每个节点可以单独扩容和重试;缺点是任务编排更复杂。

第一版我会用一个文档入库 Topic,消费者内部按 Pipeline 执行节点。等到文档量变大、Embedding 或向量写入成为瓶颈后,再把 Embedding 和 Indexer 拆成单独 Tag 或单独消费者组。

5. 幂等键怎么设计?

幂等要覆盖三层。

1)任务幂等

documentId + version

同一个文档版本只允许有一个有效入库任务。

CREATE UNIQUE INDEX uk_task_doc_version
ON document_ingestion_task(document_id, version);

2)chunk 幂等

documentId + version + chunkIndex
documentId + version + chunkHash

避免重复消费导致重复分片。

3)向量幂等

向量 ID 必须稳定,不能每次随机生成:

vectorId = documentId + "_" + version + "_" + chunkIndex

或者:

vectorId = chunkId

这样重复写入时可以 upsert,也可以先查后写,不会生成多份向量。

6. 消费幂等怎么做?

消费者执行前先做状态判断:

如果任务是 SUCCESS:直接跳过
如果任务是 CANCELED:直接跳过
如果知识库已删除:标记 CANCELED
如果任务是 PROCESSING 但更新时间很新:说明可能有其他消费者在处理,跳过
如果任务是 FAILED 且允许重试:继续执行

更新状态时用条件更新:

UPDATE document_ingestion_task
SET status = 'PROCESSING',
    current_node = 'PARSER',
    started_at = now()
WHERE task_id = ?
  AND status IN ('PENDING', 'FAILED');

只有更新成功的消费者才有执行权。

7. 重试策略

我会区分 临时失败永久失败

临时失败:可以重试

Embedding 服务超时
向量库连接失败
数据库短暂异常
网络抖动
模型限流

处理方式:

retry_count + 1
状态改为 FAILED
记录失败节点和原因
重新投递 MQ 或等待定时任务扫描重试

永久失败:不应该反复重试

文件损坏
文件格式不支持
文件不存在
知识库已删除
用户无权限

处理方式:

状态改为 FAILED 或 CANCELED
记录明确失败原因
不再自动重试

最大重试次数可以配置,例如:

maxRetryCount = 3

超过后:

状态改为 DEAD
进入死信处理或人工处理

8. 如果 Embedding 成功但向量写入失败怎么办?

我的处理方式是:状态停在向量写入失败节点,重试时从 Indexer 节点恢复

前提是 chunk 已经落库,Embedding 结果或者可重算文本还在。

如果 embedding 向量已经持久化:

直接读取 embedding 结果,重新写向量库

如果 embedding 没有持久化:

读取 chunk 文本,重新计算 embedding,再写向量库

关键是不能把任务直接标记成功,也不能每次都从解析开始。

节点日志记录:

PARSER SUCCESS
CHUNKER SUCCESS
EMBEDDING SUCCESS
INDEXER FAILED

下次恢复时:

从 INDEXER 开始

9. 任务取消逻辑

任务取消主要发生在:

用户取消入库
用户删除文档
用户删除知识库
管理员下架知识库

取消流程:

第一,把任务状态改为 CANCELING

UPDATE document_ingestion_task
SET status = 'CANCELING',
    cancel_reason = 'USER_DELETE_DOCUMENT'
WHERE document_id = ?
  AND status IN ('PENDING', 'PROCESSING', 'FAILED');

第二,消费者每个节点执行前检查任务状态。

如果 status = CANCELING / CANCELED,则停止执行

第三,清理已经产生的数据。

删除 chunk
删除向量
删除临时文件
更新文档状态

第四,状态改为 CANCELED

CANCELING -> CANCELED

注意:已经在执行中的 Embedding 请求可能无法立刻取消,但执行完后不能继续写向量,必须再次检查任务状态。

10. 用户删除知识库时,还有任务在跑怎么办?

我会这样处理:

第一,知识库先标记为 DELETING,不直接物理删除。

第二,禁止新的入库任务进入。

第三,把该知识库下所有未完成任务标记为 CANCELING

第四,消费者执行前检查 knowledgeBase 状态。

如果 knowledgeBase = DELETING / DELETED
直接停止任务

第五,异步清理该知识库下的:

document
chunk
vector
task
node_log
原始文件
缓存

第六,清理完成后标记知识库为 DELETED

这里的重点是:删除是一个状态流转,不是简单 delete 一行数据

11. 查询接口怎么设计?

1)查询文档入库任务状态

GET /api/knowledge/documents/{documentId}/ingestion-task

返回:

{
  "taskId": "task_001",
  "documentId": 1001,
  "status": "PROCESSING",
  "currentNode": "EMBEDDING",
  "progress": 65,
  "retryCount": 1,
  "errorMessage": null
}

2)查询节点日志

GET /api/knowledge/ingestion-tasks/{taskId}/nodes

返回:

[
  {
    "nodeName": "PARSER",
    "status": "SUCCESS",
    "durationMs": 1200,
    "outputSummary": "pages=10, chars=23000"
  },
  {
    "nodeName": "CHUNKER",
    "status": "SUCCESS",
    "durationMs": 800,
    "outputSummary": "chunks=45"
  },
  {
    "nodeName": "EMBEDDING",
    "status": "RUNNING",
    "durationMs": 5000
  }
]

3)手动重试任务

POST /api/knowledge/ingestion-tasks/{taskId}/retry

限制:

只有 FAILED / DEAD 状态可以重试
知识库不能是 DELETING / DELETED
文档不能是 DELETED

4)取消任务

POST /api/knowledge/ingestion-tasks/{taskId}/cancel

5)查询知识库下任务列表

GET /api/knowledge-bases/{kbId}/ingestion-tasks?status=FAILED&page=1&pageSize=20

12. 流程图

用户上传文档
    |
    v
保存原始文件
    |
    v
写 document 表
    |
    v
写 ingestion_task 表:INIT / PENDING
    |
    v
发送 MQ:RAG_DOCUMENT_INGESTION
    |
    v
Consumer 消费
    |
    v
检查任务状态 / 知识库状态
    |
    v
Parser 解析文档
    |
    v
写节点日志:PARSER SUCCESS
    |
    v
Chunker 分块
    |
    v
写 chunk 表,唯一键保证幂等
    |
    v
Embedding 生成向量
    |
    v
Indexer 写向量库
    |
    v
更新任务 SUCCESS

失败分支:

任意节点失败
    |
    v
写节点日志 FAILED
    |
    v
更新 task FAILED + errorMessage
    |
    v
retryCount < maxRetry ? 重新投递 MQ : DEAD

取消分支:

用户删除文档 / 知识库
    |
    v
任务状态 CANCELING
    |
    v
消费者执行前检查到取消
    |
    v
停止执行
    |
    v
清理 chunk / vector / 临时文件
    |
    v
任务状态 CANCELED

评论