机器学习

让计算机从数据中学习的科学


第一章 机器学习概述

1.1 什么是机器学习

机器学习是人工智能的一个分支,它使计算机能够从数据中学习并改进性能,而无需被明确编程。

Arthur Samuel (1959) 定义:机器学习是让计算机具有学习能力的研究领域。

Tom Mitchell (1998) 定义:如果一个计算机程序在任务T上的性能(以P衡量)随着经验E而提高,那么它可以说从经验E中学习。

1.2 机器学习分类

1.2.1 按学习方式分类

类型 说明 例子
监督学习 有标签数据 分类、回归
无监督学习 无标签数据 聚类、降维
半监督学习 部分有标签 增强学习
强化学习 试错学习 游戏、机器人

1.2.2 按任务分类

  • 分类(Classification):预测离散标签
  • 二分类:垃圾邮件检测
  • 多分类:图像识别
  • 回归(Regression):预测连续值
  • 房价预测
  • 温度预测
  • 聚类(Clustering):将相似样本分组
  • 用户分群
  • 文档聚类
  • 降维(Dimensionality Reduction):减少特征数量
  • PCA
  • t-SNE
  • 推荐(Recommendation):个性化推荐
  • 商品推荐
  • 内容推荐

1.3 机器学习流程

1. 定义问题
   ↓
2. 数据收集
   ↓
3. 数据预处理
   ↓
4. 特征工程
   ↓
5. 模型选择
   ↓
6. 模型训练
   ↓
7. 模型评估
   ↓
8. 模型调优
   ↓
9. 模型部署

第二章 监督学习

2.1 线性回归

线性回归是最简单也是最重要的回归算法之一。

2.1.1 基本原理

线性回归试图找到一条直线(或平面),使预测值与真实值的误差最小。

假设函数

h(x) = θ₀ + θ₁x

代价函数(均方误差)

J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)²

2.1.2 梯度下降

更新规则

θⱼ := θⱼ - α ∂J/∂θⱼ

其中 α 是学习率(learning rate),控制每一步更新的幅度。

批量梯度下降

def gradient_descent(X, y, theta, alpha, num_iterations):
    m = len(y)
    for i in range(num_iterations):
        h = X @ theta
        error = h - y
        gradient = X.T @ error / m
        theta = theta - alpha * gradient
    return theta

学习率选择
- 太小:收敛太慢
- 太大:可能不收敛或发散

2.1.3 多元线性回归

当有多个特征时:

假设函数

h(x) = Xθ

其中 X 是 m×n 的矩阵(m 样本数,n 特征数),θ 是 n×1 的参数向量。

向量化实现

import numpy as np

class LinearRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations

    def fit(self, X, y):
        m, n = X.shape
        self.theta = np.zeros(n)

        for _ in range(self.n_iterations):
            predictions = X @ self.theta
            errors = predictions - y
            gradient = X.T @ errors / m
            self.theta -= self.learning_rate * gradient

    def predict(self, X):
        return X @ self.theta

2.1.4 线性回归扩展

多项式回归:添加高次项

from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=2)
X_poly = poly.fit_transform(X)

岭回归(L2正则化)

J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)² + λΣθⱼ²

Lasso回归(L1正则化)

J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)² + λΣ|θⱼ|

2.2 逻辑回归

尽管名字中有"回归",逻辑回归实际上是一个分类算法。

2.2.1 基本原理

逻辑回归用于二分类问题的监督学习算法。

Sigmoid函数

g(z) = 1/(1 + e⁻ᶻ)

Sigmoid函数的特性:
- 输出范围在 (0, 1) 之间
- 当 z=0 时,输出为 0.5
- 当 z→∞ 时,输出趋近于 1
- 当 z→-∞ 时,输出趋近于 0

假设函数

h(x) = g(θᵀx) = 1/(1 + e^(-θᵀx))

决策边界
- 当 h(x) ≥ 0.5 时,预测为正类(y=1)
- 当 h(x) < 0.5 时,预测为负类(y=0)
- 即 θᵀx ≥ 0

2.2.2 代价函数

J(θ) = -1/m Σ[y⁽ⁱ⁾log(h(x⁽ⁱ⁾)) + (1-y⁽ⁱ⁾)log(1-h(x⁽ⁱ⁾))]

这个代价函数是凸函数,可以使用梯度下降优化。

2.2.3 多分类:One-vs-All

对于多分类问题,可以使用 One-vs-All(OvA)或 One-vs-One(OvO)策略:

from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression

model = OneVsRestClassifier(LogisticRegression())
model.fit(X, y)

2.2.4 实现

import numpy as np

class LogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.lr = learning_rate
        self.n_iterations = n_iterations

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def fit(self, X, y):
        m, n = X.shape
        self.theta = np.zeros(n)

        for _ in range(self.n_iterations):
            h = self.sigmoid(X @ self.theta)
            gradient = X.T @ (h - y) / m
            self.theta -= self.lr * gradient

    def predict_proba(self, X):
        return self.sigmoid(X @ self.theta)

    def predict(self, X):
        return (self.predict_proba(X) >= 0.5).astype(int)

2.3 决策树

决策树通过构建树状模型来进行决策。

2.3.1 基本概念

核心概念
- 节点(Node):树的每个点
- 分支(Branch):节点的输出连接
- 叶子(Leaf):最终决策节点

树的术语
- 根节点:树的顶部节点
- 父节点/子节点:上下级关系
- 深度:离根节点的距离

2.3.2 分裂准则

信息增益(Information Gain)

熵(Entropy):

H(S) = -Σ pᵢ log₂(pᵢ)

信息增益:

IG(S, A) = H(S) - Σ (|Sᵥ|/|S|)H(Sᵥ)

其中 Sᵥ 是在属性 A 上值为 v 的样本集合。

信息增益率(C4.5)

GainRatio(S, A) = IG(S, A) / H(S)

基尼指数(Gini Index)

Gini(S) = 1 - Σ pᵢ²

基尼 impurity:

Gini(S, A) = Σ (|Sᵥ|/|S|)Gini(Sᵥ)

2.3.3 决策树算法

ID3算法
- 使用信息增益选择特征
- 适用于离散特征
- 易于过拟合

C4.5算法
- 使用信息增益率
- 能处理连续特征
- 有剪枝机制

CART算法
- 使用基尼指数
- 既可用于分类也可用于回归
- 二叉树

2.3.4 决策树实现

import numpy as np

class DecisionTree:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def fit(self, X, y):
        self.tree = self._build_tree(X, y)

    def _build_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))

        # 停止条件
        if (self.max_depth and depth >= self.max_depth) or            n_samples < self.min_samples_split or            n_classes == 1:
            return np.bincount(y).argmax()

        # 找最佳分裂
        best_gain = -1
        best_feature = None
        best_threshold = None

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask

                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue

                gain = self._information_gain(y, left_mask, right_mask)

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        # 分裂
        if best_gain <= 0:
            return np.bincount(y).argmax()

        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask

        left_tree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_tree = self._build_tree(X[right_mask], y[right_mask], depth + 1)

        return {
            'feature': best_feature,
            'threshold': best_threshold,
            'left': left_tree,
            'right': right_tree
        }

    def _information_gain(self, y, left_mask, right_mask):
        n = len(y)
        if n == 0:
            return 0

        parent_entropy = self._entropy(y)
        n_left, n_right = np.sum(left_mask), np.sum(right_mask)

        if n_left == 0 or n_right == 0:
            return 0

        child_entropy = (n_left / n) * self._entropy(y[left_mask]) +                        (n_right / n) * self._entropy(y[right_mask])

        return parent_entropy - child_entropy

    def _entropy(self, y):
        if len(y) == 0:
            return 0
        counts = np.bincount(y)
        probs = counts / len(y)
        return -np.sum([p * np.log2(p) for p in probs if p > 0])

    def predict(self, X):
        return np.array([self._predict_one(x, self.tree) for x in X])

    def _predict_one(self, x, node):
        if isinstance(node, dict):
            if x[node['feature']] <= node['threshold']:
                return self._predict_one(x, node['left'])
            else:
                return self._predict_one(x, node['right'])
        else:
            return node

2.4 支持向量机(SVM)

支持向量机找到一个最优超平面,使两类样本之间的间隔最大。

2.4.1 基本原理

超平面

w·x + b = 0

间隔

γ = 2/||w||

优化目标

min 1/2||w||²
s.t. y⁽ⁱ⁾(w·x⁽ⁱ⁾ + b) ≥ 1

2.4.2 核函数

当数据线性不可分时,使用核函数将数据映射到高维空间。

核函数 公式 特点
线性 K(x,z) = x·z 适用于线性可分
多项式 K(x,z) = (γx·z + r)ᵈ 可处理非线性
RBF(高斯) K(x,z) = exp(-γ
Sigmoid K(x,z) = tanh(γx·z + r) 类似神经网络

RBF核参数
- γ 越大:决策边界越复杂,容易过拟合
- γ 越小:决策边界越简单,容易欠拟合

2.4.3 SVM实现

from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler

# 数据预处理(对SVM很重要)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 训练
svm = SVC(kernel='rbf', C=1.0, gamma='scale')
svm.fit(X_scaled, y)

# 预测
predictions = svm.predict(X_scaled)

2.5 朴素贝叶斯

基于贝叶斯定理的分类算法。

2.5.1 贝叶斯定理

P(y|x) = P(x|y)P(y) / P(x)

2.5.2 朴素假设

假设所有特征条件独立:

P(x₁,x₂,...,xₙ|y) = P(x₁|y)P(x₂|y)...P(xₙ|y)

2.5.3 分类器类型

  • 高斯朴素贝叶斯:假设特征服从高斯分布
  • 多项式朴素贝叶斯:适用于文本分类
  • 伯努利朴素贝叶斯:适用于二元特征
from sklearn.naive_bayes import GaussianNB, MultinomialNB

gnb = GaussianNB()
gnb.fit(X, y)

mnb = MultinomialNB()
mnb.fit(X, y)

2.6 K近邻(KNN)

最简单的分类算法之一。

2.6.1 算法步骤

  1. 选择K值
  2. 计算距离(欧氏距离、曼哈顿距离等)
  3. 找到K个最近邻
  4. 投票决定类别

2.6.2 距离度量

欧氏距离

d(x,y) = √Σ(xᵢ - yᵢ)²

曼哈顿距离

d(x,y) = Σ|xᵢ - yᵢ|

余弦相似度

cosθ = (x·y) / (||x|| ||y||)

2.6.3 KNN实现

from sklearn.neighbors import KNeighborsClassifier

knn = KNeighborsClassifier(n_neighbors=5, metric='euclidean')
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)

2.7 集成学习

集成学习通过组合多个模型来提高性能。

2.7.1 Bagging

随机森林

from sklearn.ensemble import RandomForestClassifier

rf = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    min_samples_split=5,
    random_state=42
)
rf.fit(X_train, y_train)

随机森林特点
- Bootstrap采样
- 随机特征选择
- 多棵树投票
- 可并行训练

2.7.2 Boosting

AdaBoost

from sklearn.ensemble import AdaBoostClassifier

ada = AdaBoostClassifier(
    n_estimators=100,
    learning_rate=1.0,
    random_state=42
)

Gradient Boosting

from sklearn.ensemble import GradientBoostingClassifier

gb = GradientBoostingClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    random_state=42
)

XGBoost

import xgboost as xgb

xgb_model = xgb.XGBClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    random_state=42
)

LightGBM

import lightgbm as lgb

lgb_model = lgb.LGBMClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    random_state=42
)

第三章 无监督学习

3.1 聚类

聚类是无监督学习,将相似样本分组。

3.1.1 K-Means

算法步骤
1. 随机选择K个中心
2. 分配样本到最近中心
3. 更新中心为簇均值
4. 重复2-3直到收敛

代价函数

J = Σ Σ ||x⁽ⁱ⁾ - μc⁽ⁱ⁾||²

实现

from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=3, random_state=42)
kmeans.fit(X)
labels = kmeans.labels_
centers = kmeans.cluster_centers_

选择K值
- 肘部法则(Elbow Method)
- 轮廓系数(Silhouette Score)

from sklearn.metrics import silhouette_score

# 尝试不同的K值
silhouette_scores = []
for k in range(2, 11):
    kmeans = KMeans(n_clusters=k, random_state=42)
    labels = kmeans.fit_predict(X)
    score = silhouette_score(X, labels)
    silhouette_scores.append(score)

3.1.2 层次聚类

凝聚层次聚类

from sklearn.cluster import AgglomerativeClustering

agg = AgglomerativeClustering(n_clusters=3)
labels = agg.fit_predict(X)

距离度量
- 最短距离(single linkage)
- 最长距离(complete linkage)
- 平均距离(average linkage)
- Ward距离(最小化方差)

3.1.3 DBSCAN

基于密度的聚类算法。

核心概念
- 核心点:邻域内至少有MinPts个点
- 边界点:在核心点邻域内但不是核心点
- 噪声点:既不是核心点也不是边界点

from sklearn.cluster import DBSCAN

dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(X)

3.2 降维

3.2.1 PCA(主成分分析)

目标:找到方差最大的正交方向。

步骤
1. 中心化数据
2. 计算协方差矩阵
3. 特征值分解
4. 选择前K个特征向量

from sklearn.decomposition import PCA

pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
print(f"解释方差比: {pca.explained_variance_ratio_}")

选择主成分数量

# 保留95%方差
pca = PCA(n_components=0.95)
X_reduced = pca.fit_transform(X)

3.2.2 t-SNE

用于可视化的非线性降维方法。

from sklearn.manifold import TSNE

tsne = TSNE(n_components=2, random_state=42)
X_tsne = tsne.fit_transform(X)

3.2.3 LDA(线性判别分析)

有监督降维方法,最大化类间距离、最小化类内距离。

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

lda = LDA(n_components=2)
X_lda = lda.fit_transform(X, y)

第四章 深度学习

4.1 神经网络基础

4.1.1 感知机

单层神经网络:

y = f(w·x + b)

激活函数:

import numpy as np

def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def relu(z):
    return np.maximum(0, z)

def tanh(z):
    return np.tanh(z)

4.1.2 多层感知机

网络结构
- 输入层
- 隐藏层
- 输出层

from sklearn.neural_network import MLPClassifier

mlp = MLPClassifier(
    hidden_layer_sizes=(100, 50),  # 两层隐藏层
    activation='relu',
    solver='adam',
    max_iter=500,
    random_state=42
)
mlp.fit(X_train, y_train)

4.2 PyTorch基础

import torch
import torch.nn as nn
import torch.optim as optim

# 定义网络
class SimpleNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

# 创建模型
model = SimpleNet(784, 256, 10)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练
for epoch in range(num_epochs):
    for batch_X, batch_y in dataloader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

4.3 TensorFlow基础

import tensorflow as tf
from tensorflow import keras

# 定义模型
model = keras.Sequential([
    keras.layers.Dense(256, activation='relu', input_shape=(784,)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(10, activation='softmax')
])

# 编译
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 训练
model.fit(X_train, y_train, epochs=10, batch_size=32)

第五章 模型评估与优化

5.1 评估指标

5.1.1 分类指标

from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    classification_report
)

# 准确率
accuracy = accuracy_score(y_true, y_pred)

# 精确率
precision = precision_score(y_true, y_pred)

# 召回率
recall = recall_score(y_true, y_pred)

# F1分数
f1 = f1_score(y_true, y_pred)

# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)

# 详细报告
print(classification_report(y_true, y_pred))

5.1.2 回归指标

from sklearn.metrics import (
    mean_squared_error,
    mean_absolute_error,
    r2_score
)

# 均方误差
mse = mean_squared_error(y_true, y_pred)

# 均方根误差
rmse = np.sqrt(mse)

# 平均绝对误差
mae = mean_absolute_error(y_true, y_pred)

# R²分数
r2 = r2_score(y_true, y_pred)

5.2 过拟合与欠拟合

5.2.1 诊断方法

通过学习曲线判断:

import matplotlib.pyplot as plt
from sklearn.model_selection import learning_curve

train_sizes, train_scores, test_scores = learning_curve(
    model, X, y, cv=5, n_jobs=-1
)

train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)

plt.plot(train_sizes, train_mean, 'o-', label='Training score')
plt.plot(train_sizes, test_mean, 'o-', label='Cross-validation score')
plt.xlabel('Training Size')
plt.ylabel('Score')
plt.legend()
plt.show()

5.2.2 解决方案

过拟合
- 增加数据量
- 正则化(L1、L2)
- Dropout
- 早停(Early Stopping)

欠拟合
- 增加模型复杂度
- 增加特征
- 减少正则化

5.3 交叉验证

from sklearn.model_selection import cross_val_score

scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {scores}")
print(f"Mean: {scores.mean():.4f}, Std: {scores.std():.4f}")

第六章 特征工程

6.1 数据预处理

6.1.1 缺失值处理

from sklearn.impute import SimpleImputer

# 均值填充
imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)

# 中位数填充
imputer = SimpleImputer(strategy='median')

# 众数填充
imputer = SimpleImputer(strategy='most_frequent')

# KNN填充
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=5)

6.1.2 标准化与归一化

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler

# Z-Score标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Min-Max归一化
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)

# RobustScaler(对异常值更鲁棒)
scaler = RobustScaler()
X_robust = scaler.fit_transform(X)

6.1.3 编码

from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd

# 标签编码
le = LabelEncoder()
y_encoded = le.fit_transform(y)

# 独热编码
df = pd.get_dummies(df, columns=['category'])

# 独热编码(sklearn)
ohe = OneHotEncoder(sparse=False)
X_encoded = ohe.fit_transform(X_cat.reshape(-1, 1))

6.2 特征选择

from sklearn.feature_selection import (
    SelectKBest, 
    f_classif, 
    RFE,
    SelectFromModel
)

# 单变量特征选择
selector = SelectKBest(score_func=f_classif, k=10)
X_selected = selector.fit_transform(X, y)

# 递归特征消除
from sklearn.ensemble import RandomForestClassifier
rfe = RFE(estimator=RandomForestClassifier(), n_features_to_select=10)
rfe.fit(X, y)

# 基于模型的特征选择
selector = SelectFromModel(RandomForestClassifier())
X_selected = selector.fit_transform(X, y)

第七章 机器学习实战

7.1 完整流程

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# 1. 加载数据
data = pd.read_csv('data.csv')

# 2. 数据探索
print(data.head())
print(data.info())
print(data.describe())

# 3. 数据预处理
X = data.drop('target', axis=1)
y = data['target']

# 处理缺失值
X = X.fillna(X.mean())

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 4. 建立Pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# 5. 训练
pipeline.fit(X_train, y_train)

# 6. 评估
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")

# 7. 预测新数据
new_data = pd.read_csv('new_data.csv')
predictions = pipeline.predict(new_data)

7.2 Grid Search

from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

param_grid = {
    'C': [0.1, 1, 10],
    'gamma': [0.01, 0.1, 1],
    'kernel': ['rbf', 'linear']
}

grid_search = GridSearchCV(
    SVC(),
    param_grid,
    cv=5,
    n_jobs=-1,
    scoring='accuracy'
)

grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_}")

# 使用最佳模型
best_model = grid_search.best_estimator_

7.3 Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import LogisticRegression

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('poly', PolynomialFeatures(degree=2)),
    ('classifier', LogisticRegression(max_iter=1000))
])

pipeline.fit(X_train, y_train)
score = pipeline.score(X_test, y_test)

附录:常用代码速查

A.1 数据加载

import pandas as pd
import numpy as np

# CSV
df = pd.read_csv('file.csv')

# Excel
df = pd.read_excel('file.xlsx')

# JSON
df = pd.read_json('file.json')

# Parquet
df = pd.read_parquet('file.parquet')

A.2 数据探索

# 基本信息
df.info()
df.describe()

# 缺失值
df.isnull().sum()

# 分布
df['column'].value_counts()

# 相关性
df.corr()

A.3 可视化

import matplotlib.pyplot as plt
import seaborn as sns

# 折线图
plt.plot(x, y)
plt.show()

# 散点图
plt.scatter(x, y)
plt.show()

# 热力图
sns.heatmap(df.corr(), annot=True)
plt.show()

笔记整理:AI助手
更新时间:2026-03-19


第八章 深度学习进阶

8.1 卷积神经网络(CNN)

8.1.1 CNN结构

import torch
import torch.nn as nn

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 10)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

8.1.2 经典CNN架构

LeNet-5:早期CNN,用于手写数字识别

AlexNet:2012年ImageNet冠军,8层结构

VGGNet:使用更深的网络(16-19层)

ResNet:引入残差连接,解决梯度消失问题

8.2 循环神经网络(RNN)

8.2.1 基础RNN

class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleRNN, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, hidden = self.rnn(x)
        out = self.fc(out[:, -1, :])
        return out

8.2.2 LSTM

长短期记忆网络,解决长序列梯度问题。

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

8.2.3 GRU

门控循环单元,是LSTM的简化版本。

8.3 Transformer

8.3.1 自注意力机制

class SelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        self.values = nn.Linear(embed_size, embed_size)
        self.keys = nn.Linear(embed_size, embed_size)
        self.queries = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, values, keys, query, mask):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(query)

        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = queries.reshape(N, query_len, self.heads, self.head_dim)

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.head_dim
        )

        return self.fc_out(out)

8.4 生成对抗网络(GAN)

class Generator(nn.Module):
    def __init__(self, latent_dim, img_shape):
        super(Generator, self).__init__()
        self.img_shape = img_shape

        self.model = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, int(torch.prod(torch.tensor(img_shape)))),
            nn.Tanh()
        )

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), *self.img_shape)
        return img

class Discriminator(nn.Module):
    def __init__(self, img_shape):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(int(torch.prod(torch.tensor(img_shape))), 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)
        return validity

8.5 强化学习

8.5.1 Q-Learning

import numpy as np

class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.1, gamma=0.95):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = 1.0  # 探索率
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.q_table = np.zeros((state_size, action_size))

    def choose_action(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.action_size)
        return np.argmax(self.q_table[state])

    def learn(self, state, action, reward, next_state):
        current_q = self.q_table[state, action]
        max_next_q = np.max(self.q_table[next_state])
        new_q = current_q + self.learning_rate * (reward + self.gamma * max_next_q - current_q)
        self.q_table[state, action] = new_q

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

第九章 模型部署

9.1 保存和加载模型

import torch

# 保存整个模型
torch.save(model, 'model.pth')

# 加载
model = torch.load('model.pth')

# 只保存参数(推荐)
torch.save(model.state_dict(), 'model_weights.pth')

# 加载参数
model = Model()
model.load_state_dict(torch.load('model_weights.pth'))

9.2 ONNX导出

import torch.onnx

torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    export_params=True,
    opset_version=11,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)

9.3 Flask部署

from flask import Flask, request, jsonify
import torch
import json

app = Flask(__name__)
model = Model()
model.load_state_dict(torch.load('model.pth'))
model.eval()

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    input_tensor = torch.tensor(data['input'])
    output = model(input_tensor)
    return jsonify({'prediction': output.tolist()})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

9.4 Docker部署

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY model.pth .
COPY app.py .

EXPOSE 5000

CMD ["python", "app.py"]

第十章 AutoML

10.1 Auto-sklearn

import autosklearn.classification
import sklearn.model_selection
import sklearn.datasets

X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    X, y, random_state=1
)

automl = autosklearn.classification.AutoSklearnClassifier()
automl.fit(X_train, y_train)
predictions = automl.predict(X_test)

10.2 TPOT

from tpot import TPOTClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2
)

tpot = TPOTClassifier(generations=5, population_size=20, verbosity=2)
tpot.fit(X_train, y_train)
print(tpot.score(X_test, y_test))

附录:常用代码片段

A.1 数据增强

from torchvision import transforms

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

A.2 学习率调度

# 步骤调度
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# 余弦退火
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

# ReduceLROnPlateau
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5)

for epoch in range(num_epochs):
    train()
    val_loss = validate()
    scheduler.step(val_loss)

A.3 早停

class EarlyStopping:
    def __init__(self, patience=7, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

更新于:2026-03-19