词向量和向量数据库
词向量(Embeddings)是一种将非结构化数据,如单词、句子或者整个文档,转化为实数向量的技术。
词向量搜索和关键词搜索的比较
优势1:词向量可以语义搜索
比如百度搜索,使用的是关键词搜索。而词向量搜索,是对句子的语义进行搜索,他会找到意思相近的前k个句子。
优势2:词向量可以对多模态数据进行搜索
当传统数据库存储文字、声音、图像、视频等多种媒介时,很难去将上述多种媒介构建起关联与跨模态的查询方法;但是词向量却可以通过多种向量模型将多种数据映射成统一的向量形式。
缺点:准确度问题
向量数据库
定义:
向量数据库是一种专门用于存储和检索向量数据(embedding)的数据库系统。
在向量数据库中,数据被表示为向量形式,每个向量代表一个数据项。这些向量可以是数字、文本、图像或其他类型的数据。向量数据库使用高效的索引和查询算法来加速向量数据的存储和检索过程。
原理:
向量数据库中的数据以向量作为基本单位,对向量进行存储、处理及检索。向量数据库通过计算与目标向量的余弦距离、点积等获取与目标向量的相似度。当处理大量甚至海量的向量数据时,向量数据库索引和查询算法的效率明显高于传统数据库。
主流向量数据库
- Chroma:是一个轻量级向量数据库,拥有丰富的功能和简单的 API,具有简单、易用、轻量的优点,但功能相对简单且不支持GPU加速,适合初学者使用。
- Weaviate:是一个开源向量数据库。除了支持相似度搜索和最大边际相关性(MMR,Maximal Marginal Relevance)搜索外还可以支持结合多种搜索算法(基于词法搜索、向量搜索)的混合搜索,从而搜索提高结果的相关性和准确性。
- Qdrant:Qdrant使用 Rust 语言开发,有极高的检索效率和RPS(Requests Per Second),支持本地运行、部署在本地服务器及Qdrant云三种部署模式。且可以通过为页面内容和元数据制定不同的键来复用数据。
使用 OpenAI Embedding API
有三种Embedding模型,性能如下所示:
模型 | 每美元页数 | MTEB得分 | MIRACL得分 |
---|---|---|---|
text-embedding-3-large | 9,615 | 54.9 | 64.6 |
text-embedding-3-small | 62,500 | 62.3 | 44.0 |
text-embedding-ada-002 | 12,500 | 61.0 | 31.4 |
- MTEB得分为embedding model分类、聚类、配对等八个任务的平均得分。
- MIRACL得分为embedding model在检索任务上的平均得分。
text-embedding-3-large性能最好,text-embedding-3-small性价比高,text-embedding-ada-002效果不好,不推荐。
如果没有openai apikey可以使用自己的github账号去这个项目领取一个免费的使用:
chatanywhere/GPT_API_free: Free ChatGPT API Key,免费ChatGPT API,支持GPT4 API(免费),ChatGPT国内可用免费转发API,直连无需代理。可以搭配ChatBox等软件/插件使用,极大降低接口使用成本。国内即可无限制畅快聊天。 (github.com)
import os from openai import OpenAI from dotenv import load_dotenv, find_dotenv # 读取本地/项目的环境变量。 # find_dotenv()寻找并定位.env文件的路径 # load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中 # 如果你设置的是全局的环境变量,这行代码则没有任何作用。 _ = load_dotenv(find_dotenv()) # 如果你需要通过代理端口访问,你需要如下配置 os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890' os.environ["HTTP_PROXY"] = 'http://127.0.0.1:7890' def openai_embedding(text: str, model: str=None): # 获取环境变量 OPENAI_API_KEY api_key=os.environ['OPENAI_API_KEY'] client = OpenAI(api_key=api_key) # embedding model:'text-embedding-3-small', 'text-embedding-3-large', 'text-embedding-ada-002' if model == None: model="text-embedding-3-small" response = client.embeddings.create( input=text, model=model ) return response response = openai_embedding(text='要生成 embedding 的输入文本,字符串形式。')
执行代码:
response = openai_embedding(text='要生成 embedding 的输入文本,字符串形式。') # 返回一个Embedding对象 print(response) # 获取真正的Embedding数据,是一个list print(response.data[0].embedding) # 更多 print(f'本次embedding model为:{response.model}') print(f'本次token使用情况为:{response.usage}') ''' 返回对象: CreateEmbeddingResponse(data=[Embedding(embedding=[0.03884002938866615, 0.013516489416360855, -0.0024250170681625605, ... 中间很长省略 ..., 0.002844922710210085, -0.012999682687222958], index=0, object='embedding')], model='text-embedding-3-small', object='list', usage=Usage(prompt_tokens=12, total_tokens=12, completion_tokens=0)) '''
{ "object": "list", "data": [ { "object": "embedding", "index": 0, "embedding": [ -0.006929283495992422, ... (省略) -4.547132266452536e-05, ], } ], "model": "text-embedding-3-small", "usage": { "prompt_tokens": 5, "total_tokens": 5 } }
文档预处理
使用 langchain text_splitter
使用re处理n
、空格、·
。
使用langchain
RecursiveCharacterTextSplitter
按照分隔符的优先级进行递归的文档分割。
设置 单段文本长度(CHUNK_SIZE)、知识库中相邻文本重合长度(OVERLAP_SIZE)。CHUNK_SIZE = 500
OVERLAP_SIZE = 50
import re from langchain.document_loaders.pdf import PyMuPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter # 创建一个 PyMuPDFLoader Class 实例,输入为待加载的 pdf 文档路径 loader = PyMuPDFLoader("books/1-民法典总则编 理解与适用 上.pdf") # 调用 PyMuPDFLoader Class 的函数 load 对 pdf 文件进行加载 pdf_pages = loader.load() # 载入后的变量类型为:<class 'list'>, 该 PDF 一共包含 540 页 print(f"载入后的变量类型为:{type(pdf_pages)},", f"该 PDF 一共包含 {len(pdf_pages)} 页") pdf_page = pdf_pages[100] print(f"每一个元素的类型:{type(pdf_page)}.", f"该文档的描述性数据:{pdf_page.metadata}", f"查看该文档的内容:n{pdf_page.page_content}", sep="n------n") # 清除字符之间的换行符n pattern = re.compile(r'[^u4e00-u9fff](n)[^u4e00-u9fff]', re.DOTALL) # re.sub(匹配模式,对匹配到的内容使用函数处理,被匹配的字符串),如下就是将匹配到的n替换为空字符串 pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('n', ''), pdf_page.page_content) print("清除换行符之后的内容:", pdf_page.page_content, sep="n------n") # 清除 • 和 空格 pdf_page.page_content = pdf_page.page_content.replace('•', '') pdf_page.page_content = pdf_page.page_content.replace(' ', '') # 替换nn pdf_page.page_content = pdf_page.page_content.replace('nn', 'n') print("继续清洗的结果:", pdf_page.page_content, sep="n------n") # 知识库中单段文本长度(CHUNK_SIZE)、知识库中相邻文本重合长度(OVERLAP_SIZE) CHUNK_SIZE = 500 OVERLAP_SIZE = 50 # 使用递归字符文本分割器 (递归地尝试按不同的分隔符进行分割文本。) ''' * RecursiveCharacterTextSplitter 按不同的字符递归地分割(按照这个优先级["nn", "n", " ", ""]),这样就能尽量把所有和语义相关的内容尽可能长时间地保留在同一位置 RecursiveCharacterTextSplitter需要关注的是4个参数: * separators - 分隔符字符串数组 * chunk_size - 每个文档的字符数量限制 * chunk_overlap - 两份文档重叠区域的长度 * length_function - 长度计算函数 ''' text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=OVERLAP_SIZE ) # 将第一页的前 1000 个字符进行分割 chunks = text_splitter.split_text(pdf_page.page_content[0:1000]) for i, chunk in enumerate(chunks, start=1): print(f'第{i}块的内容如下:', chunk, sep="n------n", end="nn") print(f"前1000个字符分块的数量为:{len(chunks)}", sep="n------n") split_docs = text_splitter.split_documents(pdf_pages) print(f"切分后的文件数量:{len(split_docs)}", sep="n------n") print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in split_docs])}")
使用 open-parse
项目地址:Filimoa/open-parse: Improved file parsing for LLM’s (github.com)
open-parse推荐使用语义进行分块,并且预设了一些文档预处理流程,还可以将自己的处理方式添加到管道中,或者是自定义预处理管道。
import os from openparse import processing, DocumentParser basic_doc_path = "books/人生亏钱指南-TEST.pdf" # 使用语义来进行分块,使用 openai embedding 时间上会有点旧 # SemanticIngestionPipeline 是一个预处理管道,中间集成了各种预处理方式,按照顺序执行 semantic_pipeline = processing.SemanticIngestionPipeline( openai_api_key=os.getenv("OPENAI_API_KEY"), model="text-embedding-3-large", min_tokens=64, max_tokens=1024, ) parser = DocumentParser( processing_pipeline=semantic_pipeline, ) parsed_content = parser.parse(basic_doc_path) print('chunk数量:', len(parsed_content.nodes)) for node in parsed_content.nodes: print('chunk:', node.text, end='n') print('---------------------') for data in parsed_content: print(data[0], data[1], end='n')
添加你的处理流程:
from openparse import processing, Node from typing import List class CustomCombineTables(processing.ProcessingStep): """ Let's combine tables that are next to each other """ def process(self, nodes: List[Node]) -> List[Node]: new_nodes = [] print("Combining concurrent tables") for i in range(len(nodes) - 1): if "table" in nodes[i].variant and "table" in nodes[i + 1].variant: new_node = nodes[i] + nodes[i + 1] new_nodes.append(new_node) else: new_nodes.append(nodes[i]) return new_nodes # add a custom processing step to the pipeline custom_pipeline = processing.BasicIngestionPipeline() custom_pipeline.append_transform(CustomCombineTables()) parser = openparse.DocumentParser( table_args={"parsing_algorithm": "pymupdf"}, processing_pipeline=custom_pipeline ) custom_10k = parser.parse(meta10k_path)
自定义整个处理管道:
from openparse import processing, Node from typing import List class BasicIngestionPipeline(processing.IngestionPipeline): """ A basic pipeline for ingesting and processing Nodes. """ def __init__(self): self.transformations = [ processing.RemoveTextInsideTables(), processing.RemoveFullPageStubs(max_area_pct=0.35), ]
更多参考: