SPIMI算法构建倒排索引 python实现
构建倒排索引文件dict.index.txt
预准备
设置分块大小,配置线程池上限
# 配置线程池参数
max_worker = 8 # 线程池上线
filenum = 100000 # 文件总数
blocksize = 5000 # 每个block的文件数量
pool = ThreadPoolExecutor(max_workers = max_worker)
threads = []
blocknum = (filenum - 1) // blocksize + 1 # 计算总共需要几个block
begin_time = time()
print("max_worker:%d\tFilenum:%d\tBlocksize:%d\tBlocknum:%d" %(max_worker, filenum, blocksize, blocknum))
print("开始构建局部索引...")
# 将文档分入不同的block
for i in range(filenum // blocksize):
thread = pool.submit(job, i * blocksize + 1, i * blocksize + blocksize, i + 1)
threads.append(thread)
# 将剩余的文档归入最后一个block
if filenum % blocksize != 0:
thread = pool.submit(job, filenum - (filenum % blocksize) + 1, filenum, filenum // blocksize + 1)
threads.append(thread)
多线程处理分块
- 文本预处理(空格和制表符分词、大小写转换、去除Top100停用词)
- 在内存中生成倒排记录表并进行压缩(Delta编码记录增量)
- 对条目按照词项的字典序排序
# 预处理读入数据
def job(docl, docr, blockid):
wordsDict = {}
lastDocId = {}
stopWords = []
for docid in range(docl, docr + 1):
f = open("data/" + str(docid) + ".txt", 'r', encoding = 'utf-8')
lines = f.readlines()
f.close()
tempDict = {}
for line in lines:
# 按照空格和制表符分割
words = re.split('[ \t]', line.strip())
for word in words:
# 去除停用词
if (word in stopWords): continue
# 将大写转化为小写
word = word.lower()
if (word in tempDict):
continue
tempDict[word] = 1
if not (word in wordsDict):
wordsDict[word] = [str(docid), 1]
lastDocId[word] = docid
else:
# 倒排记录表压缩(Delta编码,记录增量)
wordsDict[word][0] += ',' + str(docid - lastDocId[word])
lastDocId[word] = docid
wordsDict[word][1] += 1
# df[word] += 1
# 词项从小到大按照字典序排序
wordsDict = {key: wordsDict[key] for key in sorted(wordsDict)}
outWordsDict(wordsDict, blockid)
- 将块记录写回磁盘文件
# 将局部索引输出到硬盘上
def outWordsDict(wordsDict, blockid):
f = open("target/block" + str(blockid) + ".txt", 'w', encoding = 'utf-8')
for key in wordsDict.keys():
# print(key)
f.write(key + '\t' + str(wordsDict[key][1]) + '\t' + wordsDict[key][0] + '\n')
f.flush()
f.close()
分块结果如图,存放在target文件夹下作为索引中间文件
合并多个索引块
打开所有块索引文件,读取每个块的第一条索引条目,利用堆结构维护条目,以词项为关键字,维护词项字典序最小的条目
从堆中取出词项字典序最小的条目,若对应文件仍未读完,则将对应文件的下一条条目也插入堆中
根据当前取出的条目,在内存中维护当前词项的docList(对Delta编码解压,记录真实docID)
当前词项的所有条目都处理完毕时(堆顶元素的词项的字典序大于当前词项),对docList再次进行Delta编码压缩,并将该倒排索引记录写回磁盘
重复步骤2)、3)、4),直到每个块索引文件的条目都被处理完毕,即堆元素全部弹出,堆大小为0,此时倒排记录表dict.index.txt生成完毕
# 合并多个索引块,利用堆结构维护
def mergeBlocks(blocknum):
files = []
heap = []
word_now = ''
df_now = 0
result_now = []
f_out = open("dict.index2.txt", 'w', encoding = 'utf-8') # 合并索引所在文件
for i in range(blocknum):
f = open("target/block" + str(i + 1) + ".txt", 'r', encoding = 'utf-8', buffering = 100)
files.append(f)
line = f.readline()
item = line.strip().split('\t')
item.append(i)
heappush(heap, item)
while(len(heap) > 0):
# 取出小根堆顶的索引条目,并继续读取对应的下一个条目
item_now = heappop(heap)
doc = item_now[3]
line = files[doc].readline()
# 若条目未读完,则将下一个条目放入堆中
if line:
item = line.strip().split('\t')
item.append(doc)
heappush(heap, item)
if (item_now[0] != word_now):
# 当前维护的词项与下一条词项不同,说明当前词项已全部读取,将当前词项及索引写回磁盘
if (word_now != ''):
# heapify(result_now)
writeBackFile(f_out, word_now, df_now, result_now)
word_now = item_now[0]
df_now = int(item_now[1])
result_now = []
# 用另一个堆维护当前词项的文档
accmulatedDocId = 0
for docid in item_now[2].split(','):
accmulatedDocId += int(docid)
heappush(result_now, accmulatedDocId)
else:
df_now += int(item_now[1])
# 用另一个堆维护当前词项的文档
accmulatedDocId = 0
for docid in item_now[2].split(','):
# 记录真实docID(累加和)
accmulatedDocId += int(docid)
heappush(result_now, accmulatedDocId)
# 将最后一个条目写回磁盘
if (word_now != ''):
writeBackFile(f_out, word_now, df_now, result_now)
def writeBackFile(f_out, word_now, df_now, result_now):
# 先将result_now一个一个取出,再按照Delta编码增量方式重新压缩
docList = []
while(len(result_now) != 0):
docList.append(heappop(result_now))
for i in range(len(docList) - 1, 0, -1):
# print(i)
docList[i] -= docList[i - 1]
docstr = ''
for doc in docList:
docstr += ',' + str(doc)
docstr = docstr[1:]
f_out.write(word_now + '\t' + str(df_now) + '\t' + docstr + '\n')
倒排索引记录如图:
说明:该合并方法保证了内存中只需要维护一个大小不超过分块个数的堆结构以及当前词项的条目,由于每个分块内的索引都是按词项排序的,所以合并过程中可以只读取每个分块索引最前面的条目,并按照词项字典序从小到大一个一个确定词项条目内容。每确定了一个词项的条目后直接将该条目写回磁盘,不再占用内存空间,这样一来即使最终倒排索引表总大小超过内存限制,在生成倒排表的过程中内存也能容纳所有需要的数据。
布尔查询
读取已构建的倒排索引文件dict.index.txt
记录每个词项在文件中对应的条目(即行号)
import linecache
def getLineDict(filename):
f = open(filename, 'r', encoding='utf-8')
lines = f.readlines()
lineDict = {}
# print(len(lines))
cnt = 0
for line in lines:
word, df, res = line.strip().split('\t')
cnt += 1
lineDict[word] = cnt
return lineDict
读取用户输入,解析布尔查询输入,支持以下三种查询
- A
- A and B
- A or B
根据输入找到词项对应的行号,在倒排索引文件中读取指定行号,获取每个词项所在的docID列表
def getIndex(lineDict, word): if not (word in lineDict): return [] index = [] line = linecache.getline('dict.index.txt', lineDict[word]) # print(line) word, df, res = line.strip().split('\t') res = res.split(',') for i in range(len(res)): res[i] = int(res[i]) # 根据压缩后的倒排记录表计算原记录表 for i in range(1, len(res)): res[i] += res[i - 1] return res
将列表转化为集合,通过求集合交集/补集的方式进行布尔运算
输出运算结果,即为查询结果
while(query[0] != 'exit()'): if (len(query) == 1): docListA = getIndex(lineDict, query[0]) if (len(docListA) == 0): print('no result.', end='') for docId in docListA: print(docId, end=' ') print() else: docListA = getIndex(lineDict, query[0]) docListB = getIndex(lineDict, query[2]) op = query[1] if (op == 'and'): result = set(docListA).intersection(set(docListB)) else: result = set(docListA).union(set(docListB)) result = sorted(list(result)) if (len(result) == 0): print('no result.', end='') for docId in result: print(docId, end=' ') print() query = input().split(' ')
重复步骤3、4、5、6,直到用户中止查询
总结
本次实验使用了SPIMI算法实现了倒排索引表构建,构建分块索引时,由于块与块之间独立,使用了多线程的方式提高构建分块索引的效率,同时对记录表采用Delta编码的方式压缩空间大小。合并索引块时,由于每个分块内的索引都是按词项排序的,所以合并过程中内存中可以只存放每个分块索引当前最前面的条目,并用堆结构维护这些条目。每确定了一个词项的条目后直接将该条目写回磁盘,不再占用内存空间,这样一来即使最终倒排索引表总大小超过内存限制,在生成倒排表的过程中内存也能容纳所有需要的数据,因此可以生成任意大小的倒排索引表。
在执行布尔查询读取倒排索引表时,为了避免索引表过大内存无法容纳,并不会直接将整张索引表读入内存,而仅将(词项-行号)信息读入内存,每个词项对应的docID将在解析用户输入后再去读取文件对应的行号来获取。这样一来内存中只需要存放(词项-行号)的信息,以及当前涉及当前查询词项的docList,即可以对任意大小的倒排索引表进行查询。同时,解析所有词项的docID在时间上效率也较低,由于绝大多数与查询不相关的词项docID是不需要解析的,这样的写法也可以避免这些不必要的解析操作。
实验过程中,存在一些参数设置,如每个分块对应的文件个数以及线程池上限,经程序效率测试发现,分块实际上会使程序整体运行效率降低,分块数量越多效率通常越低。
可能是因为输入数据规模并不是特别大(约50M),无论怎样分块,内存都能够同时容纳所有数据,而分块操作反而增加了一定的IO代价,块数越多则IO代价越大,造成了现在的现象。但对于大规模文本(如100G以上),不进行分块则会由于内存不能够同时容纳全部数据而造成“抖动”现象,对整体执行效率产生较大影响,因此分块的作用可能需要在足够大规模的文本数据集上才能得以体现。
代码
main.py
import time
from concurrent.futures import ThreadPoolExecutor
import re
from heapq import *
from time import *
# 预处理读入数据
def job(docl, docr, blockid):
wordsDict = {}
lastDocId = {}
stopWords = []
for docid in range(docl, docr + 1):
f = open("data/" + str(docid) + ".txt", 'r', encoding = 'utf-8')
lines = f.readlines()
f.close()
tempDict = {}
for line in lines:
# 按照空格和制表符分割
words = re.split('[ \t]', line.strip())
for word in words:
# 去除停用词
if (word in stopWords): continue
# 将大写转化为小写
word = word.lower()
if (word in tempDict):
continue
tempDict[word] = 1
if not (word in wordsDict):
wordsDict[word] = [str(docid), 1]
lastDocId[word] = docid
else:
# 倒排记录表压缩(Delta编码,记录增量)
wordsDict[word][0] += ',' + str(docid - lastDocId[word])
lastDocId[word] = docid
wordsDict[word][1] += 1
# df[word] += 1
# 词项从小到大按照字典序排序
wordsDict = {key: wordsDict[key] for key in sorted(wordsDict)}
outWordsDict(wordsDict, blockid)
# 将局部索引输出到硬盘上
def outWordsDict(wordsDict, blockid):
f = open("target/block" + str(blockid) + ".txt", 'w', encoding = 'utf-8')
for key in wordsDict.keys():
# print(key)
f.write(key + '\t' + str(wordsDict[key][1]) + '\t' + wordsDict[key][0] + '\n')
f.flush()
f.close()
# 合并多个索引块,利用堆结构维护
def mergeBlocks(blocknum):
files = []
heap = []
word_now = ''
df_now = 0
result_now = []
f_out = open("dict.index2.txt", 'w', encoding = 'utf-8') # 合并索引所在文件
for i in range(blocknum):
f = open("target/block" + str(i + 1) + ".txt", 'r', encoding = 'utf-8', buffering = 100)
files.append(f)
line = f.readline()
item = line.strip().split('\t')
item.append(i)
heappush(heap, item)
while(len(heap) > 0):
# 取出小根堆顶的索引条目,并继续读取对应的下一个条目
item_now = heappop(heap)
doc = item_now[3]
line = files[doc].readline()
# 若条目未读完,则将下一个条目放入堆中
if line:
item = line.strip().split('\t')
item.append(doc)
heappush(heap, item)
if (item_now[0] != word_now):
# 当前维护的词项与下一条词项不同,说明当前词项已全部读取,将当前词项及索引写回磁盘
if (word_now != ''):
# heapify(result_now)
writeBackFile(f_out, word_now, df_now, result_now)
word_now = item_now[0]
df_now = int(item_now[1])
result_now = []
# 用另一个堆维护当前词项的文档
accmulatedDocId = 0
for docid in item_now[2].split(','):
accmulatedDocId += int(docid)
heappush(result_now, accmulatedDocId)
else:
df_now += int(item_now[1])
# 用另一个堆维护当前词项的文档
accmulatedDocId = 0
for docid in item_now[2].split(','):
# 记录真实docID(累加和)
accmulatedDocId += int(docid)
heappush(result_now, accmulatedDocId)
# 将最后一个条目写回磁盘
if (word_now != ''):
writeBackFile(f_out, word_now, df_now, result_now)
def writeBackFile(f_out, word_now, df_now, result_now):
# 先将result_now一个一个取出,再按照Delta编码增量方式重新压缩
docList = []
while(len(result_now) != 0):
docList.append(heappop(result_now))
for i in range(len(docList) - 1, 0, -1):
# print(i)
docList[i] -= docList[i - 1]
docstr = ''
for doc in docList:
docstr += ',' + str(doc)
docstr = docstr[1:]
f_out.write(word_now + '\t' + str(df_now) + '\t' + docstr + '\n')
if __name__ == '__main__':
# 配置线程池参数
max_worker = 8 # 线程池上线
filenum = 100000 # 文件总数
blocksize = 5000 # 每个block的文件数量
pool = ThreadPoolExecutor(max_workers = max_worker)
threads = []
blocknum = (filenum - 1) // blocksize + 1 # 计算总共需要几个block
begin_time = time()
print("max_worker:%d\tFilenum:%d\tBlocksize:%d\tBlocknum:%d" %(max_worker, filenum, blocksize, blocknum))
print("开始构建局部索引...")
# 将文档分入不同的block
for i in range(filenum // blocksize):
thread = pool.submit(job, i * blocksize + 1, i * blocksize + blocksize, i + 1)
threads.append(thread)
# 将剩余的文档归入最后一个block
if filenum % blocksize != 0:
thread = pool.submit(job, filenum - (filenum % blocksize) + 1, filenum, filenum // blocksize + 1)
threads.append(thread)
# 等待线程结束
for thread in threads:
thread.result()
print("局部索引构建完毕,开始合并局部索引块... (%.4fs)" %(time() - begin_time))
# 合并所有局部块索引并生成倒排索引
mergeBlocks(blocknum)
print("合并完毕,倒排索引dict.index.txt已生成. (%.4fs)" % (time() - begin_time))
query.py
# 根据已生成的倒排索引处理布尔查询
# 支持以下三种查询
# 1. A
# 2. A and B
# 3. A or B
import linecache
def getLineDict(filename):
f = open(filename, 'r', encoding='utf-8')
lines = f.readlines()
lineDict = {}
# print(len(lines))
cnt = 0
for line in lines:
word, df, res = line.strip().split('\t')
cnt += 1
lineDict[word] = cnt
return lineDict
def getIndex(lineDict, word):
if not (word in lineDict):
return []
index = []
line = linecache.getline('dict.index.txt', lineDict[word])
# print(line)
word, df, res = line.strip().split('\t')
res = res.split(',')
for i in range(len(res)):
res[i] = int(res[i])
# 根据压缩后的倒排记录表计算原记录表
for i in range(1, len(res)):
res[i] += res[i - 1]
return res
if __name__ == '__main__':
lineDict = getLineDict('dict.index.txt')
query = input().split(' ')
# 输入exit()停止程序,否则不断循环执行询问
while(query[0] != 'exit()'):
if (len(query) == 1):
docListA = getIndex(lineDict, query[0])
if (len(docListA) == 0):
print('no result.', end='')
for docId in docListA:
print(docId, end=' ')
print()
else:
docListA = getIndex(lineDict, query[0])
docListB = getIndex(lineDict, query[2])
op = query[1]
if (op == 'and'):
result = set(docListA).intersection(set(docListB))
else:
result = set(docListA).union(set(docListB))
result = sorted(list(result))
if (len(result) == 0):
print('no result.', end='')
for docId in result:
print(docId, end=' ')
print()
query = input().split(' ')