朴素贝叶斯分类器 文本分类 python实现
数据集情况
数据集包含20个类别的语料,每个类包含800个文件作为训练集。测试集3993个文件。
预处理
预处理包括读取文件,分词、去除停用词和大小写转换。其次尝试了对词语进行变体还原、提取词根等方法,但经过测试,变体还原和提取词根反而使得分类效果变差,因此最终只采用了分词、去除停用词和大小写转换作为预处理。预处理过程使用nltk工具实现。
def gen_cleaned_data(input_filepath, output_file):
i = 0
rule = re.compile('[a-z]')
wordnet_lematizer = WordNetLemmatizer()
lancaster_stemmer = LancasterStemmer()
def tokenizor(s):
token_words = word_tokenize(s)
# 大小写转换
token_words = [word.lower() for word in token_words]
# 去除标点符号
token_words = [word for word in token_words if rule.match(word)]
# 单词变体还原
# token_words = [wordnet_lematizer.lemmatize(word) for word in token_words]
# 提取词根
# token_words = [lancaster_stemmer.stem(word) for word in token_words]
# 去除停用词
# token_words = [word for word in token_words if word not in stopwords.words('english')]
return token_words
分类模型和实现
本实验采用朴素贝叶斯分类器进行分类。朴素贝叶斯假设特征之间是条件独立的,简化了计算方法,通过计算最大化后验概率的方式预测文本的类别。即分类规则为:
为了对该项进行计算,需要记录每个类别在训练集中的出现次数,以及每种词项在每个类别的出现次数,计算出先验概率,以及预测文档在每种类别下的条件概率,选择条件概率最大的类别作为预测。
另外,朴素贝叶斯分类器为了避免出现概率为0的情况,会引入平滑项lambda,通常取值为1。
但经过交叉验证和调参尝试,发现平滑项取1时准确率约78%,取0.01时准确率约85%,可以发现平滑项的取值对准确率影响是很大的。经过简单的分析,认为此处由于词项较多,取平滑项为1会导致分母过大,淡化了文本原本的特征,使得模型的分类能力降低。因此这里选择较小的平滑项,如lambda=0.01。
此外对模型的实现进行了封装,包括了初始化、训练、预测、保存模型、读取模型等方法。
import numpy as np
import pickle
class NBClassifier(object):
def __init__(self, n_tag, n_word, lamb = 0):
self.n_tag = n_tag # 标签个数
self.n_word = n_word # 词典大小
self.lamb = lamb # 平滑项
self.p_matrix = None # 概率矩阵
self.prior = None # 先验概率
def fit(self, X, Y):
print('Start fitting...')
n_sample = len(X)
assert(n_sample == len(Y))
self.p_matrix = np.zeros((self.n_word, self.n_tag))
self.prior = np.zeros(self.n_tag)
T = np.zeros((self.n_word, self.n_tag))
for i in range(n_sample):
# x_temp = []
for x in X[i]:
# if x not in x_temp:
# x_temp.append(x)
T[x, Y[i]] += 1
self.prior[Y[i]] += 1 / n_sample
self.prior = np.log(self.prior)
B = np.zeros(self.n_tag)
for i in range(self.n_tag):
B[i] = np.sum(T[:, i])
for i in range(self.n_word):
for j in range(self.n_tag):
self.p_matrix[i, j] = (T[i, j] + self.lamb) / (B[j] + self.lamb * self.n_word)
self.p_matrix = np.log(self.p_matrix)
def predict(self, X):
print('Start predicting...')
n_test = len(X)
Y = []
for i in range(n_test):
c = self.prior.copy()
# x_temp = []
for x in X[i]:
# if x not in x_temp:
# x_temp.append(x)
for j in range(self.n_tag):
c[j] += self.p_matrix[x, j]
Y.append(np.argmax(c))
Y = np.asarray(Y)
return Y
def saveModel(self, modelPath):
with open(modelPath, "wb") as f:
f.write(pickle.dumps(self))
def loadModel(modelPath):
print('Start loading model...')
with open(modelPath, "rb") as f:
model = pickle.load(f)
return model
交叉验证
由于测试集中没有文本分类标签,为了评估模型的效果,采取了交叉验证的方式,将训练集分为五折,多次验证模型在测试集上的效果。
经过一定的调参过程,在交叉验证中可以达到约85%的准确率,相较于原来的78%有一定的提升。
if __name__ == '__main__':
tagList_train, sentenceList_train = loadData_train()
nameList_test, sentenceList_test = loadData_test()
itagDict, tagDict, wordDict, tagNum, wordNum = getDict(tagList_train, sentenceList_train, sentenceList_test)
X_train, X_test, Y_train = getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test)
NBModel = trainModel(tagNum, wordNum, X_train, Y_train, save_name = 'nb.model')
# NBModel = loadModel('nb.model')
Y_pred = NBModel.predict(X_train)
print('acc on train: %.6f%%' % (np.mean(Y_train == Y_pred) * 100))
Y_pred = NBModel.predict(X_test)
saveResult(nameList_test, Y_pred, itagDict)
预测和输出结果
按照格式对测试集进行预测并输出结果:
总结
本次实验使用朴素贝叶斯分类器进行文本分类,包括了数据的预处理,模型的手动实现,通过交叉验证评估模型的预测效果,并对模型进行了封装。过程中对一些细节,如预处理方式的取舍,朴素贝叶斯平滑项的取值等有一定的思考和尝试,使得模型最终的预测效果有所提高。
代码
main.py
import numpy as np
from collections import defaultdict
from sklearn.model_selection import train_test_split
import random
from NBClassifier import NBClassifier
from NBClassifier import loadModel
def loadData_train():
print('Start reading data_train...')
f = open('temp/cleaned_train.txt',mode='r',encoding='utf-8')
lines = f.readlines()
tagList = []
sentenceList = []
for line in lines:
tag, sentence = line.split('\t')
tagList.append(tag)
sentenceList.append(sentence.split(','))
return tagList, sentenceList
def loadData_test():
print('Start reading data_test...')
f = open('temp/cleaned_test.txt',mode='r',encoding='utf-8')
lines = f.readlines()
nameList = []
sentenceList = []
for line in lines:
tag, sentence = line.split('\t')
nameList.append(tag)
sentenceList.append(sentence.split(','))
return nameList, sentenceList
def getDict(tagList_train, sentenceList_train, sentenceList_test):
print('Start getting dictionary...')
itagDict = defaultdict(str)
tagDict = defaultdict(int)
wordDict = defaultdict(int)
tagNum = 0
wordNum = 0
for tag in tagList_train:
if tag not in tagDict:
tagDict[tag] = tagNum
itagDict[tagNum] = tag
tagNum += 1
for sentence in sentenceList_train:
for word in sentence:
if word not in wordDict:
wordDict[word] = wordNum
wordNum += 1
for sentence in sentenceList_test:
for word in sentence:
if word not in wordDict:
wordDict[word] = wordNum
wordNum += 1
return itagDict, tagDict, wordDict, tagNum, wordNum
def getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test):
print('Start getting data...')
X_train = []
Y_train = []
for tag in tagList_train:
Y_train.append(tagDict[tag])
# cnt = 0
for sentence in sentenceList_train:
temp = []
for word in sentence:
temp.append(wordDict[word])
X_train.append(temp)
X_test = []
for sentence in sentenceList_test:
temp = []
for word in sentence:
temp.append(wordDict[word])
X_test.append(temp)
X_train = np.asarray(X_train, dtype=object)
X_test = np.asarray(X_test, dtype=object)
Y_train = np.asarray(Y_train, dtype=object)
return X_train, X_test, Y_train
def trainModel(tagNum, wordNum, X_train, Y_train, save_name = None):
print('Start training model...')
NBModel = NBClassifier(tagNum, wordNum, lamb=0.01)
NBModel.fit(X_train, Y_train)
if save_name != None:
NBModel.saveModel(save_name)
return NBModel
def saveResult(nameList_test, Y_pred, itagDict):
print('Start saving result...')
n_test = len(Y_pred)
f = open('c_10195501416.txt', 'w', encoding='utf-8')
for i in range(n_test):
f.write('%s:%s\n' % (nameList_test[i], itagDict[Y_pred[i]]))
if __name__ == '__main__':
tagList_train, sentenceList_train = loadData_train()
nameList_test, sentenceList_test = loadData_test()
itagDict, tagDict, wordDict, tagNum, wordNum = getDict(tagList_train, sentenceList_train, sentenceList_test)
X_train, X_test, Y_train = getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test)
NBModel = trainModel(tagNum, wordNum, X_train, Y_train, save_name = 'nb.model')
# NBModel = loadModel('nb.model')
Y_pred = NBModel.predict(X_train)
print('acc on train: %.6f%%' % (np.mean(Y_train == Y_pred) * 100))
Y_pred = NBModel.predict(X_test)
saveResult(nameList_test, Y_pred, itagDict)
NBClassifier.py
import numpy as np
import pickle
class NBClassifier(object):
def __init__(self, n_tag, n_word, lamb = 0):
self.n_tag = n_tag # 标签个数
self.n_word = n_word # 词典大小
self.lamb = lamb # 平滑项
self.p_matrix = None # 概率矩阵
self.prior = None # 先验概率
def fit(self, X, Y):
print('Start fitting...')
n_sample = len(X)
assert(n_sample == len(Y))
self.p_matrix = np.zeros((self.n_word, self.n_tag))
self.prior = np.zeros(self.n_tag)
T = np.zeros((self.n_word, self.n_tag))
for i in range(n_sample):
# x_temp = []
for x in X[i]:
# if x not in x_temp:
# x_temp.append(x)
T[x, Y[i]] += 1
self.prior[Y[i]] += 1 / n_sample
self.prior = np.log(self.prior)
B = np.zeros(self.n_tag)
for i in range(self.n_tag):
B[i] = np.sum(T[:, i])
for i in range(self.n_word):
for j in range(self.n_tag):
self.p_matrix[i, j] = (T[i, j] + self.lamb) / (B[j] + self.lamb * self.n_word)
self.p_matrix = np.log(self.p_matrix)
def predict(self, X):
print('Start predicting...')
n_test = len(X)
Y = []
for i in range(n_test):
c = self.prior.copy()
# x_temp = []
for x in X[i]:
# if x not in x_temp:
# x_temp.append(x)
for j in range(self.n_tag):
c[j] += self.p_matrix[x, j]
Y.append(np.argmax(c))
Y = np.asarray(Y)
return Y
def saveModel(self, modelPath):
with open(modelPath, "wb") as f:
f.write(pickle.dumps(self))
def loadModel(modelPath):
print('Start loading model...')
with open(modelPath, "rb") as f:
model = pickle.load(f)
return model