一样是记录一下这周的大数据管理研究课学了什么,以防为以后忘了什么叫向前逐步回归。同时,水一篇教程。

为了能一次性输出所有内容,并不加修改直接复制到word中,我在里面添了很多无关紧要的代码,也有很多画蛇添足的操作,希望未来回来查资料的我能发现。

import pandas as pd
from patsy import dmatrices
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

# 读取数据
names = 'Sex,Length,Diameter,Height,WholeWeight,ShuckedWeight,VisceraWeight,ShellWeight,Rings'.split(',')
abalone = pd.read_csv('Homework_2_abalone.txt', names=names)

# 数据分割
abalone_M = abalone[abalone['Sex']=='M']
abalone_F = abalone[abalone['Sex']=='F']
abalone_I = abalone[abalone['Sex']=='I']

# 拆分测试集
split_abalone = train_test_split(abalone, test_size=0.2, random_state=3)
split_abalone_M = train_test_split(abalone_M, test_size=0.2, random_state=3)
split_abalone_F = train_test_split(abalone_F, test_size=0.2, random_state=3)
split_abalone_I = train_test_split(abalone_I, test_size=0.2, random_state=3)


#--------------单变量回归--------------#
def Unary_linear_regression(data, key):
    # 训练模型
    train_y, train_X = dmatrices('Rings ~ {}'.format(key), data[0])
    results = sm.OLS(train_y, train_X).fit()
    coef = results.params[1]
    # 测试模型
    test_y, test_X = dmatrices('Rings ~ {}'.format(key), data[1])
    y_hat = results.predict(test_X)
    MSE = mean_squared_error(test_y, y_hat)
    r2 = r2_score(test_y, y_hat)
    return (key, coef, MSE, r2)
# 打印结果
print('单变量回归:', end='')
for data in [(split_abalone, 'All'), (split_abalone_M, 'Male'), (split_abalone_F, 'Female'), (split_abalone_I, 'Infant')]:
    li = []
    for key in abalone.columns[1:-1]:
        li.append(Unary_linear_regression(data[0], key))
    df = pd.DataFrame(li, columns=['Key', 'Coefficient', 'MSE', 'R²'])
    print('\n'+ data[1] + ':')
    print(df.sort_values(by='MSE').reset_index(drop=True))


#--------------多元回归----------------#
# 用向前逐步回归法来选择模型
def forward_selected(data, target):
    variables = set(data.columns)
    variables.remove('Sex')
    variables.remove(target)
    selected = []
    current_score, best_new_score = float('inf'), float('inf')
    while variables:
        candidates_score = []
        for candidate in variables:
            formula = '{} ~ {}'.format(target, '+'.join(selected+ [candidate]))
            y, X = dmatrices(formula, data)
            aic = sm.OLS(y, X).fit().aic
            candidates_score.append((aic, candidate))
        candidates_score.sort(reverse=True)
        best_new_score, best_candidate = candidates_score.pop()
        if  best_new_score < current_score:
            variables.remove(best_candidate)
            selected.append(best_candidate)
            current_score = best_new_score
        else:
            break
    formula = '{} ~ {}'.format(target, '+'.join(selected))
    return formula

# 训练选择的模型,并对测试集进行测试
# 返回训练模型的结果(系数)
# 返回测试模型的 MSE 和 R²
def multiple_regression(data, target):
    print('\n\n'+ data[1] + ':')
    data = data[0]
    formula = forward_selected(data[0], target)
    train_y, train_X = dmatrices(formula, data[0])
    results = sm.OLS(train_y, train_X).fit()
    coef = results.params
    test_y, test_X = dmatrices(formula, data[1])
    y_hat = results.predict(test_X)
    MSE = mean_squared_error(test_y, y_hat)
    r2 = r2_score(test_y, y_hat)
    print(results.summary())
    return formula, coef[1:], MSE, r2

# 将测试的结果构建字典,方便输出成dataframe
def predict_to_dict(data, target):
    formula, coef, MSE, r2 = multiple_regression(data, target)
    keys = formula.split('~')[1].strip().split('+') + ['MSE', 'R²'] 
    values = coef.tolist() + [MSE] + [r2]
    result_dict = {keys[i]:values[i] for i in range(len(keys))}
    return result_dict

# 打印结果
print('\n\n多元回归:', end='')
li = []
for data in [(split_abalone, 'All'), (split_abalone_M, 'Male'), (split_abalone_F, 'Female'), (split_abalone_I, 'Infant')]:
    li.append(predict_to_dict(data, 'Rings'))
df = pd.DataFrame(li).T
df.columns = ['All', 'Male', 'Female', 'Infant']
print('\n\n\n结果汇总:')
print(df)