朴素贝叶斯分类器 文本分类 python实现

数据集情况

数据集包含20个类别的语料,每个类包含800个文件作为训练集。测试集3993个文件。

预处理

预处理包括读取文件,分词、去除停用词和大小写转换。其次尝试了对词语进行变体还原、提取词根等方法,但经过测试,变体还原和提取词根反而使得分类效果变差,因此最终只采用了分词、去除停用词和大小写转换作为预处理。预处理过程使用nltk工具实现。

def gen_cleaned_data(input_filepath, output_file):
    i = 0
    rule = re.compile('[a-z]')
    wordnet_lematizer = WordNetLemmatizer()
    lancaster_stemmer = LancasterStemmer()
    def tokenizor(s):
        token_words = word_tokenize(s)
        # 大小写转换
        token_words = [word.lower() for word in token_words]
        # 去除标点符号
        token_words = [word for word in token_words if rule.match(word)]
        # 单词变体还原
        # token_words = [wordnet_lematizer.lemmatize(word) for word in token_words]
        # 提取词根
        # token_words = [lancaster_stemmer.stem(word) for word in token_words]
        # 去除停用词
        # token_words = [word for word in token_words if word not in stopwords.words('english')]
        return token_words

分类模型和实现

本实验采用朴素贝叶斯分类器进行分类。朴素贝叶斯假设特征之间是条件独立的,简化了计算方法,通过计算最大化后验概率的方式预测文本的类别。即分类规则为:

图片包含 文本  描述已自动生成

为了对该项进行计算,需要记录每个类别在训练集中的出现次数,以及每种词项在每个类别的出现次数,计算出先验概率,以及预测文档在每种类别下的条件概率,选择条件概率最大的类别作为预测。

另外,朴素贝叶斯分类器为了避免出现概率为0的情况,会引入平滑项lambda,通常取值为1。

image-20220319175221839

但经过交叉验证和调参尝试,发现平滑项取1时准确率约78%,取0.01时准确率约85%,可以发现平滑项的取值对准确率影响是很大的。经过简单的分析,认为此处由于词项较多,取平滑项为1会导致分母过大,淡化了文本原本的特征,使得模型的分类能力降低。因此这里选择较小的平滑项,如lambda=0.01。

此外对模型的实现进行了封装,包括了初始化、训练、预测、保存模型、读取模型等方法。

import numpy as np
import pickle

class NBClassifier(object):
    def __init__(self, n_tag, n_word, lamb = 0):
        self.n_tag = n_tag      # 标签个数
        self.n_word = n_word    # 词典大小
        self.lamb = lamb        # 平滑项
        self.p_matrix = None    # 概率矩阵
        self.prior = None       # 先验概率

    def fit(self, X, Y):
        print('Start fitting...')
        n_sample = len(X)
        assert(n_sample == len(Y))

        self.p_matrix = np.zeros((self.n_word, self.n_tag))
        self.prior = np.zeros(self.n_tag)

        T = np.zeros((self.n_word, self.n_tag))
        for i in range(n_sample):
            # x_temp = []
            for x in X[i]:
                # if x not in x_temp:
                    # x_temp.append(x)
                T[x, Y[i]] += 1
            self.prior[Y[i]] += 1 / n_sample
        self.prior = np.log(self.prior)
        B = np.zeros(self.n_tag)
        for i in range(self.n_tag):
            B[i] = np.sum(T[:, i])
        for i in range(self.n_word):
            for j in range(self.n_tag):
                self.p_matrix[i, j] = (T[i, j] + self.lamb) / (B[j] + self.lamb * self.n_word)
        self.p_matrix = np.log(self.p_matrix)

    def predict(self, X):
        print('Start predicting...')
        n_test = len(X)
        Y = []
        for i in range(n_test):
            c = self.prior.copy()
            # x_temp = []
            for x in X[i]:
                # if x not in x_temp:
                #     x_temp.append(x)
                for j in range(self.n_tag):
                    c[j] += self.p_matrix[x, j]
            Y.append(np.argmax(c))
        Y = np.asarray(Y)
        return Y

    def saveModel(self, modelPath):
        with open(modelPath, "wb") as f:
            f.write(pickle.dumps(self))

def loadModel(modelPath):
    print('Start loading model...')
    with open(modelPath, "rb") as f:
        model = pickle.load(f)
    return model

交叉验证

由于测试集中没有文本分类标签,为了评估模型的效果,采取了交叉验证的方式,将训练集分为五折,多次验证模型在测试集上的效果。

经过一定的调参过程,在交叉验证中可以达到约85%的准确率,相较于原来的78%有一定的提升。

if __name__ == '__main__':
    tagList_train, sentenceList_train = loadData_train()
    nameList_test, sentenceList_test = loadData_test()
    itagDict, tagDict, wordDict, tagNum, wordNum = getDict(tagList_train, sentenceList_train, sentenceList_test)
    X_train, X_test, Y_train = getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test)
    NBModel = trainModel(tagNum, wordNum, X_train, Y_train, save_name = 'nb.model')
    # NBModel = loadModel('nb.model')
    Y_pred = NBModel.predict(X_train)
    print('acc on train: %.6f%%' % (np.mean(Y_train == Y_pred) * 100))

    Y_pred = NBModel.predict(X_test)
    saveResult(nameList_test, Y_pred, itagDict)

image-20220319175309223

预测和输出结果

按照格式对测试集进行预测并输出结果:

image-20220319175337320

总结

本次实验使用朴素贝叶斯分类器进行文本分类,包括了数据的预处理,模型的手动实现,通过交叉验证评估模型的预测效果,并对模型进行了封装。过程中对一些细节,如预处理方式的取舍,朴素贝叶斯平滑项的取值等有一定的思考和尝试,使得模型最终的预测效果有所提高。

代码

main.py

import numpy as np
from collections import defaultdict
from sklearn.model_selection import train_test_split
import random
from NBClassifier import NBClassifier
from NBClassifier import loadModel

def loadData_train():
    print('Start reading data_train...')
    f = open('temp/cleaned_train.txt',mode='r',encoding='utf-8')
    lines = f.readlines()

    tagList = []
    sentenceList = []
    for line in lines:
        tag, sentence = line.split('\t')
        tagList.append(tag)
        sentenceList.append(sentence.split(','))
    return tagList, sentenceList

def loadData_test():
    print('Start reading data_test...')
    f = open('temp/cleaned_test.txt',mode='r',encoding='utf-8')
    lines = f.readlines()

    nameList = []
    sentenceList = []
    for line in lines:
        tag, sentence = line.split('\t')
        nameList.append(tag)
        sentenceList.append(sentence.split(','))
    return nameList, sentenceList

def getDict(tagList_train, sentenceList_train, sentenceList_test):
    print('Start getting dictionary...')
    itagDict = defaultdict(str)
    tagDict = defaultdict(int)
    wordDict = defaultdict(int)
    tagNum = 0
    wordNum = 0
    for tag in tagList_train:
        if tag not in tagDict:
            tagDict[tag] = tagNum
            itagDict[tagNum] = tag
            tagNum += 1

    for sentence in sentenceList_train:
        for word in sentence:
            if word not in wordDict:
                wordDict[word] = wordNum
                wordNum += 1

    for sentence in sentenceList_test:
        for word in sentence:
            if word not in wordDict:
                wordDict[word] = wordNum
                wordNum += 1

    return itagDict, tagDict, wordDict, tagNum, wordNum

def getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test):
    print('Start getting data...')
    X_train = []
    Y_train = []
    for tag in tagList_train:
        Y_train.append(tagDict[tag])
    # cnt = 0
    for sentence in sentenceList_train:
        temp = []
        for word in sentence:
            temp.append(wordDict[word])
        X_train.append(temp)

    X_test = []
    for sentence in sentenceList_test:
        temp = []
        for word in sentence:
            temp.append(wordDict[word])
        X_test.append(temp)

    X_train = np.asarray(X_train, dtype=object)
    X_test = np.asarray(X_test, dtype=object)
    Y_train = np.asarray(Y_train, dtype=object)
    return X_train, X_test, Y_train


def trainModel(tagNum, wordNum, X_train, Y_train, save_name = None):
    print('Start training model...')
    NBModel = NBClassifier(tagNum, wordNum, lamb=0.01)
    NBModel.fit(X_train, Y_train)
    if save_name != None:
        NBModel.saveModel(save_name)
    return NBModel

def saveResult(nameList_test, Y_pred, itagDict):
    print('Start saving result...')
    n_test = len(Y_pred)
    f = open('c_10195501416.txt', 'w', encoding='utf-8')
    for i in range(n_test):
        f.write('%s:%s\n' % (nameList_test[i], itagDict[Y_pred[i]]))



if __name__ == '__main__':
    tagList_train, sentenceList_train = loadData_train()
    nameList_test, sentenceList_test = loadData_test()
    itagDict, tagDict, wordDict, tagNum, wordNum = getDict(tagList_train, sentenceList_train, sentenceList_test)
    X_train, X_test, Y_train = getMapping(tagDict, wordDict, tagList_train, sentenceList_train, sentenceList_test)
    NBModel = trainModel(tagNum, wordNum, X_train, Y_train, save_name = 'nb.model')
    # NBModel = loadModel('nb.model')
    Y_pred = NBModel.predict(X_train)
    print('acc on train: %.6f%%' % (np.mean(Y_train == Y_pred) * 100))

    Y_pred = NBModel.predict(X_test)
    saveResult(nameList_test, Y_pred, itagDict)

NBClassifier.py

import numpy as np
import pickle

class NBClassifier(object):
    def __init__(self, n_tag, n_word, lamb = 0):
        self.n_tag = n_tag      # 标签个数
        self.n_word = n_word    # 词典大小
        self.lamb = lamb        # 平滑项
        self.p_matrix = None    # 概率矩阵
        self.prior = None       # 先验概率

    def fit(self, X, Y):
        print('Start fitting...')
        n_sample = len(X)
        assert(n_sample == len(Y))

        self.p_matrix = np.zeros((self.n_word, self.n_tag))
        self.prior = np.zeros(self.n_tag)

        T = np.zeros((self.n_word, self.n_tag))
        for i in range(n_sample):
            # x_temp = []
            for x in X[i]:
                # if x not in x_temp:
                    # x_temp.append(x)
                T[x, Y[i]] += 1
            self.prior[Y[i]] += 1 / n_sample
        self.prior = np.log(self.prior)
        B = np.zeros(self.n_tag)
        for i in range(self.n_tag):
            B[i] = np.sum(T[:, i])
        for i in range(self.n_word):
            for j in range(self.n_tag):
                self.p_matrix[i, j] = (T[i, j] + self.lamb) / (B[j] + self.lamb * self.n_word)
        self.p_matrix = np.log(self.p_matrix)

    def predict(self, X):
        print('Start predicting...')
        n_test = len(X)
        Y = []
        for i in range(n_test):
            c = self.prior.copy()
            # x_temp = []
            for x in X[i]:
                # if x not in x_temp:
                #     x_temp.append(x)
                for j in range(self.n_tag):
                    c[j] += self.p_matrix[x, j]
            Y.append(np.argmax(c))
        Y = np.asarray(Y)
        return Y

    def saveModel(self, modelPath):
        with open(modelPath, "wb") as f:
            f.write(pickle.dumps(self))

def loadModel(modelPath):
    print('Start loading model...')
    with open(modelPath, "rb") as f:
        model = pickle.load(f)
    return model