机器学习
让计算机从数据中学习的科学
第一章 机器学习概述
1.1 什么是机器学习
机器学习是人工智能的一个分支,它使计算机能够从数据中学习并改进性能,而无需被明确编程。
Arthur Samuel (1959) 定义:机器学习是让计算机具有学习能力的研究领域。
Tom Mitchell (1998) 定义:如果一个计算机程序在任务T上的性能(以P衡量)随着经验E而提高,那么它可以说从经验E中学习。
1.2 机器学习分类
1.2.1 按学习方式分类
| 类型 | 说明 | 例子 |
|---|---|---|
| 监督学习 | 有标签数据 | 分类、回归 |
| 无监督学习 | 无标签数据 | 聚类、降维 |
| 半监督学习 | 部分有标签 | 增强学习 |
| 强化学习 | 试错学习 | 游戏、机器人 |
1.2.2 按任务分类
- 分类(Classification):预测离散标签
- 二分类:垃圾邮件检测
- 多分类:图像识别
- 回归(Regression):预测连续值
- 房价预测
- 温度预测
- 聚类(Clustering):将相似样本分组
- 用户分群
- 文档聚类
- 降维(Dimensionality Reduction):减少特征数量
- PCA
- t-SNE
- 推荐(Recommendation):个性化推荐
- 商品推荐
- 内容推荐
1.3 机器学习流程
1. 定义问题
↓
2. 数据收集
↓
3. 数据预处理
↓
4. 特征工程
↓
5. 模型选择
↓
6. 模型训练
↓
7. 模型评估
↓
8. 模型调优
↓
9. 模型部署
第二章 监督学习
2.1 线性回归
线性回归是最简单也是最重要的回归算法之一。
2.1.1 基本原理
线性回归试图找到一条直线(或平面),使预测值与真实值的误差最小。
假设函数:
h(x) = θ₀ + θ₁x
代价函数(均方误差):
J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)²
2.1.2 梯度下降
更新规则:
θⱼ := θⱼ - α ∂J/∂θⱼ
其中 α 是学习率(learning rate),控制每一步更新的幅度。
批量梯度下降:
def gradient_descent(X, y, theta, alpha, num_iterations):
m = len(y)
for i in range(num_iterations):
h = X @ theta
error = h - y
gradient = X.T @ error / m
theta = theta - alpha * gradient
return theta
学习率选择:
- 太小:收敛太慢
- 太大:可能不收敛或发散
2.1.3 多元线性回归
当有多个特征时:
假设函数:
h(x) = Xθ
其中 X 是 m×n 的矩阵(m 样本数,n 特征数),θ 是 n×1 的参数向量。
向量化实现:
import numpy as np
class LinearRegression:
def __init__(self, learning_rate=0.01, n_iterations=1000):
self.learning_rate = learning_rate
self.n_iterations = n_iterations
def fit(self, X, y):
m, n = X.shape
self.theta = np.zeros(n)
for _ in range(self.n_iterations):
predictions = X @ self.theta
errors = predictions - y
gradient = X.T @ errors / m
self.theta -= self.learning_rate * gradient
def predict(self, X):
return X @ self.theta
2.1.4 线性回归扩展
多项式回归:添加高次项
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=2)
X_poly = poly.fit_transform(X)
岭回归(L2正则化):
J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)² + λΣθⱼ²
Lasso回归(L1正则化):
J(θ) = 1/2m Σ(h(x⁽ⁱ⁾) - y⁽ⁱ⁾)² + λΣ|θⱼ|
2.2 逻辑回归
尽管名字中有"回归",逻辑回归实际上是一个分类算法。
2.2.1 基本原理
逻辑回归用于二分类问题的监督学习算法。
Sigmoid函数:
g(z) = 1/(1 + e⁻ᶻ)
Sigmoid函数的特性:
- 输出范围在 (0, 1) 之间
- 当 z=0 时,输出为 0.5
- 当 z→∞ 时,输出趋近于 1
- 当 z→-∞ 时,输出趋近于 0
假设函数:
h(x) = g(θᵀx) = 1/(1 + e^(-θᵀx))
决策边界:
- 当 h(x) ≥ 0.5 时,预测为正类(y=1)
- 当 h(x) < 0.5 时,预测为负类(y=0)
- 即 θᵀx ≥ 0
2.2.2 代价函数
J(θ) = -1/m Σ[y⁽ⁱ⁾log(h(x⁽ⁱ⁾)) + (1-y⁽ⁱ⁾)log(1-h(x⁽ⁱ⁾))]
这个代价函数是凸函数,可以使用梯度下降优化。
2.2.3 多分类:One-vs-All
对于多分类问题,可以使用 One-vs-All(OvA)或 One-vs-One(OvO)策略:
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
model = OneVsRestClassifier(LogisticRegression())
model.fit(X, y)
2.2.4 实现
import numpy as np
class LogisticRegression:
def __init__(self, learning_rate=0.01, n_iterations=1000):
self.lr = learning_rate
self.n_iterations = n_iterations
def sigmoid(self, z):
return 1 / (1 + np.exp(-z))
def fit(self, X, y):
m, n = X.shape
self.theta = np.zeros(n)
for _ in range(self.n_iterations):
h = self.sigmoid(X @ self.theta)
gradient = X.T @ (h - y) / m
self.theta -= self.lr * gradient
def predict_proba(self, X):
return self.sigmoid(X @ self.theta)
def predict(self, X):
return (self.predict_proba(X) >= 0.5).astype(int)
2.3 决策树
决策树通过构建树状模型来进行决策。
2.3.1 基本概念
核心概念:
- 节点(Node):树的每个点
- 分支(Branch):节点的输出连接
- 叶子(Leaf):最终决策节点
树的术语:
- 根节点:树的顶部节点
- 父节点/子节点:上下级关系
- 深度:离根节点的距离
2.3.2 分裂准则
信息增益(Information Gain):
熵(Entropy):
H(S) = -Σ pᵢ log₂(pᵢ)
信息增益:
IG(S, A) = H(S) - Σ (|Sᵥ|/|S|)H(Sᵥ)
其中 Sᵥ 是在属性 A 上值为 v 的样本集合。
信息增益率(C4.5):
GainRatio(S, A) = IG(S, A) / H(S)
基尼指数(Gini Index):
Gini(S) = 1 - Σ pᵢ²
基尼 impurity:
Gini(S, A) = Σ (|Sᵥ|/|S|)Gini(Sᵥ)
2.3.3 决策树算法
ID3算法:
- 使用信息增益选择特征
- 适用于离散特征
- 易于过拟合
C4.5算法:
- 使用信息增益率
- 能处理连续特征
- 有剪枝机制
CART算法:
- 使用基尼指数
- 既可用于分类也可用于回归
- 二叉树
2.3.4 决策树实现
import numpy as np
class DecisionTree:
def __init__(self, max_depth=None, min_samples_split=2):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
def fit(self, X, y):
self.tree = self._build_tree(X, y)
def _build_tree(self, X, y, depth=0):
n_samples, n_features = X.shape
n_classes = len(np.unique(y))
# 停止条件
if (self.max_depth and depth >= self.max_depth) or n_samples < self.min_samples_split or n_classes == 1:
return np.bincount(y).argmax()
# 找最佳分裂
best_gain = -1
best_feature = None
best_threshold = None
for feature in range(n_features):
thresholds = np.unique(X[:, feature])
for threshold in thresholds:
left_mask = X[:, feature] <= threshold
right_mask = ~left_mask
if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
continue
gain = self._information_gain(y, left_mask, right_mask)
if gain > best_gain:
best_gain = gain
best_feature = feature
best_threshold = threshold
# 分裂
if best_gain <= 0:
return np.bincount(y).argmax()
left_mask = X[:, best_feature] <= best_threshold
right_mask = ~left_mask
left_tree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
right_tree = self._build_tree(X[right_mask], y[right_mask], depth + 1)
return {
'feature': best_feature,
'threshold': best_threshold,
'left': left_tree,
'right': right_tree
}
def _information_gain(self, y, left_mask, right_mask):
n = len(y)
if n == 0:
return 0
parent_entropy = self._entropy(y)
n_left, n_right = np.sum(left_mask), np.sum(right_mask)
if n_left == 0 or n_right == 0:
return 0
child_entropy = (n_left / n) * self._entropy(y[left_mask]) + (n_right / n) * self._entropy(y[right_mask])
return parent_entropy - child_entropy
def _entropy(self, y):
if len(y) == 0:
return 0
counts = np.bincount(y)
probs = counts / len(y)
return -np.sum([p * np.log2(p) for p in probs if p > 0])
def predict(self, X):
return np.array([self._predict_one(x, self.tree) for x in X])
def _predict_one(self, x, node):
if isinstance(node, dict):
if x[node['feature']] <= node['threshold']:
return self._predict_one(x, node['left'])
else:
return self._predict_one(x, node['right'])
else:
return node
2.4 支持向量机(SVM)
支持向量机找到一个最优超平面,使两类样本之间的间隔最大。
2.4.1 基本原理
超平面:
w·x + b = 0
间隔:
γ = 2/||w||
优化目标:
min 1/2||w||²
s.t. y⁽ⁱ⁾(w·x⁽ⁱ⁾ + b) ≥ 1
2.4.2 核函数
当数据线性不可分时,使用核函数将数据映射到高维空间。
| 核函数 | 公式 | 特点 |
|---|---|---|
| 线性 | K(x,z) = x·z | 适用于线性可分 |
| 多项式 | K(x,z) = (γx·z + r)ᵈ | 可处理非线性 |
| RBF(高斯) | K(x,z) = exp(-γ | |
| Sigmoid | K(x,z) = tanh(γx·z + r) | 类似神经网络 |
RBF核参数:
- γ 越大:决策边界越复杂,容易过拟合
- γ 越小:决策边界越简单,容易欠拟合
2.4.3 SVM实现
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
# 数据预处理(对SVM很重要)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 训练
svm = SVC(kernel='rbf', C=1.0, gamma='scale')
svm.fit(X_scaled, y)
# 预测
predictions = svm.predict(X_scaled)
2.5 朴素贝叶斯
基于贝叶斯定理的分类算法。
2.5.1 贝叶斯定理
P(y|x) = P(x|y)P(y) / P(x)
2.5.2 朴素假设
假设所有特征条件独立:
P(x₁,x₂,...,xₙ|y) = P(x₁|y)P(x₂|y)...P(xₙ|y)
2.5.3 分类器类型
- 高斯朴素贝叶斯:假设特征服从高斯分布
- 多项式朴素贝叶斯:适用于文本分类
- 伯努利朴素贝叶斯:适用于二元特征
from sklearn.naive_bayes import GaussianNB, MultinomialNB
gnb = GaussianNB()
gnb.fit(X, y)
mnb = MultinomialNB()
mnb.fit(X, y)
2.6 K近邻(KNN)
最简单的分类算法之一。
2.6.1 算法步骤
- 选择K值
- 计算距离(欧氏距离、曼哈顿距离等)
- 找到K个最近邻
- 投票决定类别
2.6.2 距离度量
欧氏距离:
d(x,y) = √Σ(xᵢ - yᵢ)²
曼哈顿距离:
d(x,y) = Σ|xᵢ - yᵢ|
余弦相似度:
cosθ = (x·y) / (||x|| ||y||)
2.6.3 KNN实现
from sklearn.neighbors import KNeighborsClassifier
knn = KNeighborsClassifier(n_neighbors=5, metric='euclidean')
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
2.7 集成学习
集成学习通过组合多个模型来提高性能。
2.7.1 Bagging
随机森林:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
random_state=42
)
rf.fit(X_train, y_train)
随机森林特点:
- Bootstrap采样
- 随机特征选择
- 多棵树投票
- 可并行训练
2.7.2 Boosting
AdaBoost:
from sklearn.ensemble import AdaBoostClassifier
ada = AdaBoostClassifier(
n_estimators=100,
learning_rate=1.0,
random_state=42
)
Gradient Boosting:
from sklearn.ensemble import GradientBoostingClassifier
gb = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42
)
XGBoost:
import xgboost as xgb
xgb_model = xgb.XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42
)
LightGBM:
import lightgbm as lgb
lgb_model = lgb.LGBMClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42
)
第三章 无监督学习
3.1 聚类
聚类是无监督学习,将相似样本分组。
3.1.1 K-Means
算法步骤:
1. 随机选择K个中心
2. 分配样本到最近中心
3. 更新中心为簇均值
4. 重复2-3直到收敛
代价函数:
J = Σ Σ ||x⁽ⁱ⁾ - μc⁽ⁱ⁾||²
实现:
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=3, random_state=42)
kmeans.fit(X)
labels = kmeans.labels_
centers = kmeans.cluster_centers_
选择K值:
- 肘部法则(Elbow Method)
- 轮廓系数(Silhouette Score)
from sklearn.metrics import silhouette_score
# 尝试不同的K值
silhouette_scores = []
for k in range(2, 11):
kmeans = KMeans(n_clusters=k, random_state=42)
labels = kmeans.fit_predict(X)
score = silhouette_score(X, labels)
silhouette_scores.append(score)
3.1.2 层次聚类
凝聚层次聚类:
from sklearn.cluster import AgglomerativeClustering
agg = AgglomerativeClustering(n_clusters=3)
labels = agg.fit_predict(X)
距离度量:
- 最短距离(single linkage)
- 最长距离(complete linkage)
- 平均距离(average linkage)
- Ward距离(最小化方差)
3.1.3 DBSCAN
基于密度的聚类算法。
核心概念:
- 核心点:邻域内至少有MinPts个点
- 边界点:在核心点邻域内但不是核心点
- 噪声点:既不是核心点也不是边界点
from sklearn.cluster import DBSCAN
dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(X)
3.2 降维
3.2.1 PCA(主成分分析)
目标:找到方差最大的正交方向。
步骤:
1. 中心化数据
2. 计算协方差矩阵
3. 特征值分解
4. 选择前K个特征向量
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
print(f"解释方差比: {pca.explained_variance_ratio_}")
选择主成分数量:
# 保留95%方差
pca = PCA(n_components=0.95)
X_reduced = pca.fit_transform(X)
3.2.2 t-SNE
用于可视化的非线性降维方法。
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, random_state=42)
X_tsne = tsne.fit_transform(X)
3.2.3 LDA(线性判别分析)
有监督降维方法,最大化类间距离、最小化类内距离。
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
lda = LDA(n_components=2)
X_lda = lda.fit_transform(X, y)
第四章 深度学习
4.1 神经网络基础
4.1.1 感知机
单层神经网络:
y = f(w·x + b)
激活函数:
import numpy as np
def sigmoid(z):
return 1 / (1 + np.exp(-z))
def relu(z):
return np.maximum(0, z)
def tanh(z):
return np.tanh(z)
4.1.2 多层感知机
网络结构:
- 输入层
- 隐藏层
- 输出层
from sklearn.neural_network import MLPClassifier
mlp = MLPClassifier(
hidden_layer_sizes=(100, 50), # 两层隐藏层
activation='relu',
solver='adam',
max_iter=500,
random_state=42
)
mlp.fit(X_train, y_train)
4.2 PyTorch基础
import torch
import torch.nn as nn
import torch.optim as optim
# 定义网络
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
# 创建模型
model = SimpleNet(784, 256, 10)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
for epoch in range(num_epochs):
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
4.3 TensorFlow基础
import tensorflow as tf
from tensorflow import keras
# 定义模型
model = keras.Sequential([
keras.layers.Dense(256, activation='relu', input_shape=(784,)),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
# 编译
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 训练
model.fit(X_train, y_train, epochs=10, batch_size=32)
第五章 模型评估与优化
5.1 评估指标
5.1.1 分类指标
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix,
classification_report
)
# 准确率
accuracy = accuracy_score(y_true, y_pred)
# 精确率
precision = precision_score(y_true, y_pred)
# 召回率
recall = recall_score(y_true, y_pred)
# F1分数
f1 = f1_score(y_true, y_pred)
# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
# 详细报告
print(classification_report(y_true, y_pred))
5.1.2 回归指标
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
r2_score
)
# 均方误差
mse = mean_squared_error(y_true, y_pred)
# 均方根误差
rmse = np.sqrt(mse)
# 平均绝对误差
mae = mean_absolute_error(y_true, y_pred)
# R²分数
r2 = r2_score(y_true, y_pred)
5.2 过拟合与欠拟合
5.2.1 诊断方法
通过学习曲线判断:
import matplotlib.pyplot as plt
from sklearn.model_selection import learning_curve
train_sizes, train_scores, test_scores = learning_curve(
model, X, y, cv=5, n_jobs=-1
)
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)
plt.plot(train_sizes, train_mean, 'o-', label='Training score')
plt.plot(train_sizes, test_mean, 'o-', label='Cross-validation score')
plt.xlabel('Training Size')
plt.ylabel('Score')
plt.legend()
plt.show()
5.2.2 解决方案
过拟合:
- 增加数据量
- 正则化(L1、L2)
- Dropout
- 早停(Early Stopping)
欠拟合:
- 增加模型复杂度
- 增加特征
- 减少正则化
5.3 交叉验证
from sklearn.model_selection import cross_val_score
scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {scores}")
print(f"Mean: {scores.mean():.4f}, Std: {scores.std():.4f}")
第六章 特征工程
6.1 数据预处理
6.1.1 缺失值处理
from sklearn.impute import SimpleImputer
# 均值填充
imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)
# 中位数填充
imputer = SimpleImputer(strategy='median')
# 众数填充
imputer = SimpleImputer(strategy='most_frequent')
# KNN填充
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=5)
6.1.2 标准化与归一化
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# Z-Score标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Min-Max归一化
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
# RobustScaler(对异常值更鲁棒)
scaler = RobustScaler()
X_robust = scaler.fit_transform(X)
6.1.3 编码
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd
# 标签编码
le = LabelEncoder()
y_encoded = le.fit_transform(y)
# 独热编码
df = pd.get_dummies(df, columns=['category'])
# 独热编码(sklearn)
ohe = OneHotEncoder(sparse=False)
X_encoded = ohe.fit_transform(X_cat.reshape(-1, 1))
6.2 特征选择
from sklearn.feature_selection import (
SelectKBest,
f_classif,
RFE,
SelectFromModel
)
# 单变量特征选择
selector = SelectKBest(score_func=f_classif, k=10)
X_selected = selector.fit_transform(X, y)
# 递归特征消除
from sklearn.ensemble import RandomForestClassifier
rfe = RFE(estimator=RandomForestClassifier(), n_features_to_select=10)
rfe.fit(X, y)
# 基于模型的特征选择
selector = SelectFromModel(RandomForestClassifier())
X_selected = selector.fit_transform(X, y)
第七章 机器学习实战
7.1 完整流程
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
# 1. 加载数据
data = pd.read_csv('data.csv')
# 2. 数据探索
print(data.head())
print(data.info())
print(data.describe())
# 3. 数据预处理
X = data.drop('target', axis=1)
y = data['target']
# 处理缺失值
X = X.fillna(X.mean())
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 4. 建立Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# 5. 训练
pipeline.fit(X_train, y_train)
# 6. 评估
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
# 7. 预测新数据
new_data = pd.read_csv('new_data.csv')
predictions = pipeline.predict(new_data)
7.2 Grid Search
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
param_grid = {
'C': [0.1, 1, 10],
'gamma': [0.01, 0.1, 1],
'kernel': ['rbf', 'linear']
}
grid_search = GridSearchCV(
SVC(),
param_grid,
cv=5,
n_jobs=-1,
scoring='accuracy'
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_}")
# 使用最佳模型
best_model = grid_search.best_estimator_
7.3 Pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import LogisticRegression
pipeline = Pipeline([
('scaler', StandardScaler()),
('poly', PolynomialFeatures(degree=2)),
('classifier', LogisticRegression(max_iter=1000))
])
pipeline.fit(X_train, y_train)
score = pipeline.score(X_test, y_test)
附录:常用代码速查
A.1 数据加载
import pandas as pd
import numpy as np
# CSV
df = pd.read_csv('file.csv')
# Excel
df = pd.read_excel('file.xlsx')
# JSON
df = pd.read_json('file.json')
# Parquet
df = pd.read_parquet('file.parquet')
A.2 数据探索
# 基本信息
df.info()
df.describe()
# 缺失值
df.isnull().sum()
# 分布
df['column'].value_counts()
# 相关性
df.corr()
A.3 可视化
import matplotlib.pyplot as plt
import seaborn as sns
# 折线图
plt.plot(x, y)
plt.show()
# 散点图
plt.scatter(x, y)
plt.show()
# 热力图
sns.heatmap(df.corr(), annot=True)
plt.show()
笔记整理:AI助手
更新时间:2026-03-19
第八章 深度学习进阶
8.1 卷积神经网络(CNN)
8.1.1 CNN结构
import torch
import torch.nn as nn
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.fc1 = nn.Linear(64 * 8 * 8, 256)
self.fc2 = nn.Linear(256, 10)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
x = self.pool(torch.relu(self.conv1(x)))
x = self.pool(torch.relu(self.conv2(x)))
x = x.view(-1, 64 * 8 * 8)
x = self.dropout(torch.relu(self.fc1(x)))
x = self.fc2(x)
return x
8.1.2 经典CNN架构
LeNet-5:早期CNN,用于手写数字识别
AlexNet:2012年ImageNet冠军,8层结构
VGGNet:使用更深的网络(16-19层)
ResNet:引入残差连接,解决梯度消失问题
8.2 循环神经网络(RNN)
8.2.1 基础RNN
class SimpleRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleRNN, self).__init__()
self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, hidden = self.rnn(x)
out = self.fc(out[:, -1, :])
return out
8.2.2 LSTM
长短期记忆网络,解决长序列梯度问题。
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
8.2.3 GRU
门控循环单元,是LSTM的简化版本。
8.3 Transformer
8.3.1 自注意力机制
class SelfAttention(nn.Module):
def __init__(self, embed_size, heads):
super(SelfAttention, self).__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads
self.values = nn.Linear(embed_size, embed_size)
self.keys = nn.Linear(embed_size, embed_size)
self.queries = nn.Linear(embed_size, embed_size)
self.fc_out = nn.Linear(embed_size, embed_size)
def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]
values = self.values(values)
keys = self.keys(keys)
queries = self.queries(query)
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
queries = queries.reshape(N, query_len, self.heads, self.head_dim)
energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
if mask is not None:
energy = energy.masked_fill(mask == 0, float("-1e20"))
attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)
out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
N, query_len, self.heads * self.head_dim
)
return self.fc_out(out)
8.4 生成对抗网络(GAN)
class Generator(nn.Module):
def __init__(self, latent_dim, img_shape):
super(Generator, self).__init__()
self.img_shape = img_shape
self.model = nn.Sequential(
nn.Linear(latent_dim, 128),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(128, 256),
nn.BatchNorm1d(256),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(256, 512),
nn.BatchNorm1d(512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, int(torch.prod(torch.tensor(img_shape)))),
nn.Tanh()
)
def forward(self, z):
img = self.model(z)
img = img.view(img.size(0), *self.img_shape)
return img
class Discriminator(nn.Module):
def __init__(self, img_shape):
super(Discriminator, self).__init__()
self.model = nn.Sequential(
nn.Linear(int(torch.prod(torch.tensor(img_shape))), 512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 256),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, img):
img_flat = img.view(img.size(0), -1)
validity = self.model(img_flat)
return validity
8.5 强化学习
8.5.1 Q-Learning
import numpy as np
class QLearningAgent:
def __init__(self, state_size, action_size, learning_rate=0.1, gamma=0.95):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = 1.0 # 探索率
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.q_table = np.zeros((state_size, action_size))
def choose_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_size)
return np.argmax(self.q_table[state])
def learn(self, state, action, reward, next_state):
current_q = self.q_table[state, action]
max_next_q = np.max(self.q_table[next_state])
new_q = current_q + self.learning_rate * (reward + self.gamma * max_next_q - current_q)
self.q_table[state, action] = new_q
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
第九章 模型部署
9.1 保存和加载模型
import torch
# 保存整个模型
torch.save(model, 'model.pth')
# 加载
model = torch.load('model.pth')
# 只保存参数(推荐)
torch.save(model.state_dict(), 'model_weights.pth')
# 加载参数
model = Model()
model.load_state_dict(torch.load('model_weights.pth'))
9.2 ONNX导出
import torch.onnx
torch.onnx.export(
model,
dummy_input,
'model.onnx',
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)
9.3 Flask部署
from flask import Flask, request, jsonify
import torch
import json
app = Flask(__name__)
model = Model()
model.load_state_dict(torch.load('model.pth'))
model.eval()
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
input_tensor = torch.tensor(data['input'])
output = model(input_tensor)
return jsonify({'prediction': output.tolist()})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
9.4 Docker部署
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY model.pth .
COPY app.py .
EXPOSE 5000
CMD ["python", "app.py"]
第十章 AutoML
10.1 Auto-sklearn
import autosklearn.classification
import sklearn.model_selection
import sklearn.datasets
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
X, y, random_state=1
)
automl = autosklearn.classification.AutoSklearnClassifier()
automl.fit(X_train, y_train)
predictions = automl.predict(X_test)
10.2 TPOT
from tpot import TPOTClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2
)
tpot = TPOTClassifier(generations=5, population_size=20, verbosity=2)
tpot.fit(X_train, y_train)
print(tpot.score(X_test, y_test))
附录:常用代码片段
A.1 数据增强
from torchvision import transforms
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
A.2 学习率调度
# 步骤调度
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
# 余弦退火
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
# ReduceLROnPlateau
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5)
for epoch in range(num_epochs):
train()
val_loss = validate()
scheduler.step(val_loss)
A.3 早停
class EarlyStopping:
def __init__(self, patience=7, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False
def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.counter = 0
更新于:2026-03-19