第 3 章 数据流水线(dataset.py) 本章目标:搞懂文本是怎么变成模型能吃的 张量对的。 3.1 章节地图 dataset.py 干三件事: 3.2 完整源码 3.3 数据从哪来:三级回退策略 鲁棒性设计 实现了一个鲁棒的数据获取链: 核心代码 工程亮点逐个讲 ① 缓存优先 避免每次训练都打网络,断网也能跑。这是「数据获取」的第一原则:网络是奢侈品,本地是必需品。 ② 优雅降级(graceful degradation) 库缺失时不崩溃,只把标志位置 False。后续 if 分支跳过 HF 路径,直接走 GitHub 回退。这叫「可选依赖」处理。 对比反面教材: 写在文件顶部不带 try/except,没装 datasets 的用户 import dataset.
本章目标:搞懂文本是怎么变成模型能吃的
(x, y)张量对的。
dataset.py 干三件事:
┌─────────────────────────────────────────────────────────┐ │ 1. get_dataset() 获取文本(三级回退,鲁棒) │ │ 2. tiktoken 编码 文本 → token id 序列 │ │ 3. TextDataset 切片成 (x, y) 样本对 │ │ 4. build_dataloader 批处理 + 工程优化 │ └─────────────────────────────────────────────────────────┘
""" dataset.py ========== 数据获取与处理模块。 主要功能: 1. get_dataset(): 从 HuggingFace Hub 下载微型文本数据集 (tiny_shakespeare), 若下载失败则回退为直接从 GitHub 下载原始文本。 2. TextDataset: 将整段文本用 tiktoken (p50k_base) 编码为 token 序列, 按 block_size 切片,构造自回归训练样本 (x, y)。 3. build_dataloader(): 基于 TextDataset 返回 PyTorch DataLoader。 """ from pathlib import Path import urllib.request import tiktoken import torch from torch.utils.data import Dataset, DataLoader # HuggingFace datasets 仅在 get_dataset 中使用;缺失时优雅降级。 try: from datasets import load_dataset _HAS_DATASETS = True except ImportError: # pragma: no cover _HAS_DATASETS = False # tiny_shakespeare 原始文本的镜像地址(Karpathy 经典数据集)。 _TINY_SHAKESPEARE_URL = ( "karpathy/char-rnn 仓库(GitHub)" "data/tinyshakespeare/input.txt 文件路径" ) # 全局复用同一个 tiktoken 编码器,避免反复初始化(其内部有较大的 BPE 表)。 _ENCODER = tiktoken.get_encoding("p50k_base") def get_encoder() -> "tiktoken.Encoding": """返回全局共享的 tiktoken p50k_base 编码器。""" return _ENCODER def get_dataset(cache_path: str = "data/tiny_shakespeare.txt") -> str: """ 获取微型莎士比亚文本数据集,返回纯文本字符串。 策略: 1) 若本地已有缓存,直接读取,避免重复网络请求。 2) 否则优先尝试 HuggingFace datasets 的 tiny_shakespeare。 3) 若该库不可用或数据集已下线,则回退为直接 HTTP 下载并本地缓存。 参数: cache_path: 本地缓存文件路径。 返回: 完整文本字符串。 """ cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True) # 1) 命中本地缓存。 if cache_path.exists(): print(f"[dataset] 从本地缓存读取: {cache_path}") return cache_path.read_text(encoding="utf-8") text = None # 2) 尝试使用 HuggingFace datasets。 if _HAS_DATASETS: try: print("[dataset] 尝试通过 HuggingFace datasets 下载 tiny_shakespeare ...") ds = load_dataset("tiny_shakespeare", split="train") # 不同版本字段名可能为 'text',统一取第一个字段兜底。 column = "text" if "text" in ds.column_names else ds.column_names[0] text = "\n".join(ds[column]) print("[dataset] HuggingFace 下载成功。") except Exception as e: # noqa: BLE001 print(f"[dataset] HuggingFace 下载失败: {e}") print("[dataset] 回退到直接下载。") # 3) 回退:直接从 GitHub 下载原始文本。 if text is None: print(f"[dataset] 从 {_TINY_SHAKESPEARE_URL} 下载 ...") with urllib.request.urlopen(_TINY_SHAKESPEARE_URL) as resp: # noqa: S310 raw = resp.read() text = raw.decode(encoding="utf-8") print("[dataset] 直接下载成功。") # 4) 写入本地缓存。 cache_path.write_text(text, encoding="utf-8") return text class TextDataset(Dataset): """ 自回归语言建模数据集。 将整段文本编码为长度为 T 的 token 序列后, 取 tokens[i : i+block_size] 作为 x, 取 tokens[i+1 : i+1+block_size] 作为 y(即 x 左移一位)。 这样模型在每一个位置都学习"预测下一个 token"。 """ def __init__(self, text: str, block_size: int): super().__init__() self.block_size = block_size # 编码为 token id 列表(Python int 列表)。 self.tokens = _ENCODER.encode(text) # 每个样本需要 block_size+1 个 token(多出的 1 个作为第一个 y)。 self.n_samples = max(0, len(self.tokens) - block_size - 1) print( f"[dataset] 文本共 {len(self.tokens)} tokens," f"可构造 {self.n_samples} 个长度为 {block_size} 的样本。" ) def __len__(self) -> int: return self.n_samples def __getitem__(self, idx: int): # 截取 block_size+1 长度的片段,前 block_size 为 x,后 block_size 为 y。 chunk = self.tokens[idx : idx + self.block_size + 1] x = torch.tensor(chunk[:-1], dtype=torch.long) y = torch.tensor(chunk[1:], dtype=torch.long) return x, y def build_dataloader( text: str, block_size: int, batch_size: int, num_workers: int = 0, shuffle: bool = True, ) -> DataLoader: """ 构建 PyTorch DataLoader。 参数: text: 原始文本。 block_size: 上下文长度。 batch_size: 每个批次的样本数。 num_workers: 多进程加载的进程数(Windows 建议 0)。 shuffle: 是否打乱。 返回: torch.utils.data.DataLoader """ dataset = TextDataset(text, block_size) return DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, drop_last=True, # 丢弃最后一个不完整的 batch,保持梯度统计稳定 pin_memory=torch.cuda.is_available(), )
get_dataset() 实现了一个鲁棒的数据获取链:
┌──────────────────────────────────────────┐ │ 1. 本地缓存 data/tiny_shakespeare.txt │ ──► 命中?直接返回 └──────────────────────────────────────────┘ │ 未命中 ▼ ┌──────────────────────────────────────────┐ │ 2. HuggingFace datasets.tiny_shakespeare │ ──► 成功?缓存并返回 └──────────────────────────────────────────┘ │ 失败(库缺失/下线) ▼ ┌──────────────────────────────────────────┐ │ 3. GitHub 直链下载原始文本 │ ──► 缓存并返回 └──────────────────────────────────────────┘
def get_dataset(cache_path: str = "data/tiny_shakespeare.txt") -> str: cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True) # 1) 命中本地缓存 if cache_path.exists(): return cache_path.read_text(encoding="utf-8") text = None # 2) 尝试 HuggingFace datasets if _HAS_DATASETS: try: ds = load_dataset("tiny_shakespeare", split="train") column = "text" if "text" in ds.column_names else ds.column_names[0] text = "\n".join(ds[column]) except Exception as e: print(f"HuggingFace 下载失败: {e}") # 3) 回退:直接从 GitHub 下载 if text is None: with urllib.request.urlopen(_TINY_SHAKESPEARE_URL) as resp: text = resp.read().decode(encoding="utf-8") # 4) 写入本地缓存 cache_path.write_text(text, encoding="utf-8") return text
if cache_path.exists(): return cache_path.read_text(encoding="utf-8")
避免每次训练都打网络,断网也能跑。这是「数据获取」的第一原则:网络是奢侈品,本地是必需品。
try: from datasets import load_dataset _HAS_DATASETS = True except ImportError: _HAS_DATASETS = False
datasets 库缺失时不崩溃,只把标志位置 False。后续 if 分支跳过 HF 路径,直接走 GitHub 回退。这叫「可选依赖」处理。
对比反面教材:
from datasets import load_dataset写在文件顶部不带 try/except,没装 datasets 的用户 import dataset.py 就直接 ImportError,连回退机会都没有。
column = "text" if "text" in ds.column_names else ds.column_names[0]
HF 数据集不同版本字段名可能是 text 或 content 或别的。代码做了兜底:优先 text,否则取第一个字段。这是处理「外部数据源不稳定」的常见手法。
cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True)
pathlib.Path 比 os.path.join 更现代。mkdir(parents=True, exist_ok=True) 一行搞定「目录不存在就建、存在不报错」。
模型不认识字符,只认识数字。分词器(tokenizer)把文本切成 token(子词单元),再映射成整数 id:
"To be, or not to be" │ tiktoken.encode ▼ [787, 281, 11, 453, 459, 281, 281] ← token id 序列
| 粒度 | 例子 | 词表大小 | 优点 | 缺点 |
|---|---|---|---|---|
| 字符级 | ['T','o',' ','b','e'] |
~100 | 词表小、能处理任何词 | 序列太长,难学长程依赖 |
| 词级 | ['To','be','or','not'] |
~百万 | 语义清晰 | 不能处理未登录词 |
| 子词级 (BPE) | ['To',' be',' or'] |
~5万 | 兼顾两者 | 实现稍复杂 |
GPT 系列用子词级(BPE),是当前主流。
Byte-Pair Encoding:训练时统计语料里最频繁的相邻字节对,把它们合并成一个新 token,重复 N 次。
初始:['T','h','e',' ','c','a','t'] ← 都是单字符 ↓ 合并 'c'+'a'='ca'(出现频繁) ['T','h','e',' ','ca','t'] ↓ 合并 'ca'+'t'='cat' ['T','h','e',' ','cat']
最终词表是「单字符 + 高频合并子词」的集合。新词用已有子词拼:
"unbelievable" → ["un", "believable"] ← "un" 是常见前缀子词 "helloworld" → ["hello", "world"] ← 没学过的词也能拼出来
p50k_base 是 GPT-2 / text-davinci-002 用的编码,词表大小恰好 50257,与本项目 GPTConfig.vocab_size 对齐。_ENCODER = tiktoken.get_encoding("p50k_base") def get_encoder() -> "tiktoken.Encoding": return _ENCODER
为什么用模块级全局变量?因为 tiktoken 初始化时要加载一张不小的 BPE 合并表(几十 MB),反复 get_encoding 会重复加载。用全局单例一次加载、处处复用,省内存省时间。
💡 这是 Python 模块级变量的经典用法:模块第一次 import 时执行
_ENCODER = ...,之后所有from dataset import _ENCODER都拿到同一个实例。
⚠️ 铁律的来源:
vocab_size=50257是p50k_base决定的,改它会导致 token id 与 embedding 矩阵的行数对不上,模型直接崩。换词表 = 换分词器,两者必须同步。
enc = tiktoken.get_encoding("p50k_base") # 编码:文本 → id 列表 ids = enc.encode("Hello, world!") # [15496, 11, 995, 0] # 解码:id 列表 → 文本 text = enc.decode([15496, 11, 995, 0]) # "Hello, world!" # 词表大小 print(enc.n_vocab) # 50257
预测下一个 token。给定 ['T','o',' ','b','e'],模型要预测下一个是 ,。
所以要把文本切成 (x, y) 对,其中 y 是 x 左移一位:
class TextDataset(Dataset): def __init__(self, text: str, block_size: int): self.block_size = block_size self.tokens = _ENCODER.encode(text) self.n_samples = max(0, len(self.tokens) - block_size - 1) def __getitem__(self, idx): chunk = self.tokens[idx : idx + self.block_size + 1] x = torch.tensor(chunk[:-1], dtype=torch.long) y = torch.tensor(chunk[1:], dtype=torch.long) return x, y
假设 block_size=4,token 序列为 [A, B, C, D, E, F, G, ...]:
位置: 0 1 2 3 4 5 6 token: A B C D E F G ... └────── x ──────┘ └────── y ──────┘ ← y 整体右移一位 样本 idx=0: x=[A,B,C,D] y=[B,C,D,E] 样本 idx=1: x=[B,C,D,E] y=[C,D,E,F] 样本 idx=2: x=[C,D,E,F] y=[D,E,F,G] ...
block_size+1 长度的 chunkchunk = self.tokens[idx : idx + self.block_size + 1] # 长度 block_size+1 x = chunk[:-1] # 前 block_size 个 y = chunk[1:] # 后 block_size 个
为什么 chunk 是 block_size+1 而不是 block_size?因为要同时容纳 x(block_size 个)和 y(block_size 个,与 x 错开一位),总共需要 block_size+1 个 token。
数学验证:chunk[:-1] 长度 = len(chunk) - 1 = block_size,chunk[1:] 长度 = len(chunk) - 1 = block_size ✓。
y = chunk[1:] 不是 chunk[:-1]x 是「输入」,y 是「目标」,y 比 x 滞后一位:
位置 i: x[i] → 预测 y[i] = x[i+1]
模型在位置 i 看到 x[:i+1],要预测 y[i] = x[i+1](即下一个 token)。所以 y 是 x 整体右移一位,用 chunk[1:]。
self.n_samples = max(0, len(self.tokens) - block_size - 1)
为什么减 block_size + 1?因为每个样本要取 block_size + 1 长度的 chunk,最后一个完整 chunk 起点最多到 len - block_size - 1。
max(0, ...) 是兜底:万一文本特别短(比 block_size 还短),返回 0 而不是负数。
注意:虽然一个样本有 block_size 个位置,但不是「一个样本 = 一个预测任务」。
由于 GPT 的因果掩码(causal mask,见第 4 章),位置 0 只看自己预测位置 1,位置 1 看 [0,1] 预测位置 2……所有 block_size 个位置的预测任务同时在一个前向里完成,loss 是它们的平均。
x = [A, B, C, D] 位置: 0 1 2 3 │ │ │ │ ▼ ▼ ▼ ▼ 预测: B C D E ← 4 个预测任务同时进行 位置 0 看 [A] 预测 B 位置 1 看 [A,B] 预测 C 位置 2 看 [A,B,C] 预测 D 位置 3 看 [A,B,C,D] 预测 E
这就是 Transformer 训练高效的根本原因——一次前向 = block_size 个训练信号。
本项目用步长 1 的滑动窗口:
样本 0: [0 : block_size+1] 样本 1: [1 : block_size+2] ← 只右移 1 样本 2: [2 : block_size+3] ...
相邻样本高度重叠(共享 block_size-1 个 token)。优点是数据多(33 万 token 能切 33 万样本),缺点是相邻样本相关性强,等于「一份数据反复用」。
工业训练常用步长 = block_size(不重叠)+ 多 epoch,减少冗余。本项目为简单用步长 1,配合 shuffle 缓解相关性。
def build_dataloader(text, block_size, batch_size, num_workers=0, shuffle=True): dataset = TextDataset(text, block_size) return DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, drop_last=True, # ① 丢弃不完整 batch pin_memory=torch.cuda.is_available(), # ② 锁页内存加速 H2D 拷贝 )
DataLoader 把 Dataset 包装成可迭代对象,自动做:
batch_size 个样本堆叠成张量)。shuffle=True)。num_workers)。__getitem__ 返回的 (x, y) 会被自动堆成 (B, T) 张量。迭代替换:
for x, y in dataloader: # x.shape = (batch_size, block_size) # y.shape = (batch_size, block_size) ...
drop_last=Truedrop_last=True, # 丢弃最后一个不完整的 batch
最后一个不完整的 batch(比如 batch_size=32 但只剩 5 个样本)会被丢掉。
为什么?不完整 batch 的梯度统计与完整 batch 不同(少了 27 个样本的梯度平均),可能让训练末期出现 loss 抖动。丢掉它保持每个 batch 都是 32 个样本,梯度尺度一致。
pin_memory=Truepin_memory=torch.cuda.is_available(),
锁页内存(page-locked memory):让数据预先放到 CPU 的「不可换页」内存区,CPU→GPU 拷贝能走 DMA 异步通道,比普通内存快。
配合训练循环里的:
x = x.to(device, non_blocking=True) # 异步拷贝
实现「数据传输」与「上一步反向传播计算」的重叠,免费的加速。
num_workers 的取舍num_workers=num_workers, # 默认 0
num_workers=0:主进程加载数据,简单稳定,Windows 友好。num_workers>0:开 N 个子进程并行预取,GPU 不用等数据。取舍:
| 场景 | 推荐 |
|---|---|
| Windows | 0(避免 spawn 陷阱) |
| Linux + 大数据集 + 复杂预处理 | 4-8 |
| Linux + 小数据集(本项目) | 0(数据加载本来就快,多进程反而有启动开销) |
把所有环节串一遍,跟踪一个 batch 的生命周期:
1. get_dataset() 读取 data/tiny_shakespeare.txt → 返回 1MB 的纯文本字符串 2. TextDataset.__init__ 把文本编码为 token: _ENCODER.encode(text) → [787, 281, 11, ..., 338025 个 id] n_samples = 338025 - 128 - 1 = 337896 3. DataLoader 用 batch_size=32 shuffle=True 包装 内部生成打乱的索引 [12345, 892, 337000, ...] 4. 训练循环 for x, y in dataloader: 每次 next() 触发: - 按索引取 32 个样本 - 每个样本 __getitem__ 切出 (x_i, y_i) 各 shape=(128,) - 自动堆叠成 x.shape=(32, 128), y.shape=(32, 128) - pin_memory 把它们放到锁页内存 5. x.to(device, non_blocking=True) 异步拷贝到 GPU 6. model(input_ids=x, labels=y) 开始前向 + 算 loss
体验 tiktoken:在 Python 里跑:
from dataset import get_encoder enc = get_encoder() print(enc.encode("Hello, world!")) # [15496, 11, 995, 0] print(enc.decode([15496, 11, 995, 0])) # "Hello, world!" print(enc.encode("unbelievable")) # 观察子词切分 print(enc.encode("你好世界")) # 中文怎么切?
构造数据集:
from dataset import TextDataset ds = TextDataset("Hello world this is a test", block_size=4) print(len(ds)) # 样本数 x, y = ds[0] print(x, y) # 观察 x 和 y 的关系
边界测试:故意把 block_size 设成比总 token 数还大,观察 n_samples 会变成多少(答:0,会被 max(0, ...) 兜底)。
思考题:如果想让两个相邻样本完全不重叠(步长 = block_size 而非 1),该怎么改 __getitem__?
self.tokens[idx*block_size : idx*block_size + block_size + 1],且 n_samples = len(tokens) // (block_size + 1)。观察 drop_last:构造一个 100 个样本的数据集,batch_size=32,迭代一遍数一下实际 yield 了几次(答:3 次,丢了 4 个)。
get_dataset() 用三级回退(本地缓存 → HF datasets → GitHub 直链)保证数据获取鲁棒,断网也能跑。p50k_base 把文本切成子词 id 序列,词表 50257 与模型 vocab_size 对齐。TextDataset 取 block_size+1 长度的 chunk,切成 (x, y) = (chunk[:-1], chunk[1:]),y 是 x 左移一位。drop_last 保持 batch 尺度一致,pin_memory + non_blocking 实现数据传输与计算重叠。数据准备好了,去 第 4 章 模型构建 看怎么搭一个 GPT。