第 3 章 数据流水线(dataset.py)


文档摘要

第 3 章 数据流水线(dataset.py) 本章目标:搞懂文本是怎么变成模型能吃的 张量对的。 3.1 章节地图 dataset.py 干三件事: 3.2 完整源码 3.3 数据从哪来:三级回退策略 鲁棒性设计 实现了一个鲁棒的数据获取链: 核心代码 工程亮点逐个讲 ① 缓存优先 避免每次训练都打网络,断网也能跑。这是「数据获取」的第一原则:网络是奢侈品,本地是必需品。 ② 优雅降级(graceful degradation) 库缺失时不崩溃,只把标志位置 False。后续 if 分支跳过 HF 路径,直接走 GitHub 回退。这叫「可选依赖」处理。 对比反面教材: 写在文件顶部不带 try/except,没装 datasets 的用户 import dataset.

第 3 章 数据流水线(dataset.py)

本章目标:搞懂文本是怎么变成模型能吃的 (x, y) 张量对的。

3.1 章节地图

dataset.py 干三件事:

┌─────────────────────────────────────────────────────────┐ │ 1. get_dataset() 获取文本(三级回退,鲁棒) │ │ 2. tiktoken 编码 文本 → token id 序列 │ │ 3. TextDataset 切片成 (x, y) 样本对 │ │ 4. build_dataloader 批处理 + 工程优化 │ └─────────────────────────────────────────────────────────┘

3.2 完整源码

""" dataset.py ========== 数据获取与处理模块。 主要功能: 1. get_dataset(): 从 HuggingFace Hub 下载微型文本数据集 (tiny_shakespeare), 若下载失败则回退为直接从 GitHub 下载原始文本。 2. TextDataset: 将整段文本用 tiktoken (p50k_base) 编码为 token 序列, 按 block_size 切片,构造自回归训练样本 (x, y)。 3. build_dataloader(): 基于 TextDataset 返回 PyTorch DataLoader。 """ from pathlib import Path import urllib.request import tiktoken import torch from torch.utils.data import Dataset, DataLoader # HuggingFace datasets 仅在 get_dataset 中使用;缺失时优雅降级。 try: from datasets import load_dataset _HAS_DATASETS = True except ImportError: # pragma: no cover _HAS_DATASETS = False # tiny_shakespeare 原始文本的镜像地址(Karpathy 经典数据集)。 _TINY_SHAKESPEARE_URL = ( "karpathy/char-rnn 仓库(GitHub)" "data/tinyshakespeare/input.txt 文件路径" ) # 全局复用同一个 tiktoken 编码器,避免反复初始化(其内部有较大的 BPE 表)。 _ENCODER = tiktoken.get_encoding("p50k_base") def get_encoder() -> "tiktoken.Encoding": """返回全局共享的 tiktoken p50k_base 编码器。""" return _ENCODER def get_dataset(cache_path: str = "data/tiny_shakespeare.txt") -> str: """ 获取微型莎士比亚文本数据集,返回纯文本字符串。 策略: 1) 若本地已有缓存,直接读取,避免重复网络请求。 2) 否则优先尝试 HuggingFace datasets 的 tiny_shakespeare。 3) 若该库不可用或数据集已下线,则回退为直接 HTTP 下载并本地缓存。 参数: cache_path: 本地缓存文件路径。 返回: 完整文本字符串。 """ cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True) # 1) 命中本地缓存。 if cache_path.exists(): print(f"[dataset] 从本地缓存读取: {cache_path}") return cache_path.read_text(encoding="utf-8") text = None # 2) 尝试使用 HuggingFace datasets。 if _HAS_DATASETS: try: print("[dataset] 尝试通过 HuggingFace datasets 下载 tiny_shakespeare ...") ds = load_dataset("tiny_shakespeare", split="train") # 不同版本字段名可能为 'text',统一取第一个字段兜底。 column = "text" if "text" in ds.column_names else ds.column_names[0] text = "\n".join(ds[column]) print("[dataset] HuggingFace 下载成功。") except Exception as e: # noqa: BLE001 print(f"[dataset] HuggingFace 下载失败: {e}") print("[dataset] 回退到直接下载。") # 3) 回退:直接从 GitHub 下载原始文本。 if text is None: print(f"[dataset] 从 {_TINY_SHAKESPEARE_URL} 下载 ...") with urllib.request.urlopen(_TINY_SHAKESPEARE_URL) as resp: # noqa: S310 raw = resp.read() text = raw.decode(encoding="utf-8") print("[dataset] 直接下载成功。") # 4) 写入本地缓存。 cache_path.write_text(text, encoding="utf-8") return text class TextDataset(Dataset): """ 自回归语言建模数据集。 将整段文本编码为长度为 T 的 token 序列后, 取 tokens[i : i+block_size] 作为 x, 取 tokens[i+1 : i+1+block_size] 作为 y(即 x 左移一位)。 这样模型在每一个位置都学习"预测下一个 token"。 """ def __init__(self, text: str, block_size: int): super().__init__() self.block_size = block_size # 编码为 token id 列表(Python int 列表)。 self.tokens = _ENCODER.encode(text) # 每个样本需要 block_size+1 个 token(多出的 1 个作为第一个 y)。 self.n_samples = max(0, len(self.tokens) - block_size - 1) print( f"[dataset] 文本共 {len(self.tokens)} tokens," f"可构造 {self.n_samples} 个长度为 {block_size} 的样本。" ) def __len__(self) -> int: return self.n_samples def __getitem__(self, idx: int): # 截取 block_size+1 长度的片段,前 block_size 为 x,后 block_size 为 y。 chunk = self.tokens[idx : idx + self.block_size + 1] x = torch.tensor(chunk[:-1], dtype=torch.long) y = torch.tensor(chunk[1:], dtype=torch.long) return x, y def build_dataloader( text: str, block_size: int, batch_size: int, num_workers: int = 0, shuffle: bool = True, ) -> DataLoader: """ 构建 PyTorch DataLoader。 参数: text: 原始文本。 block_size: 上下文长度。 batch_size: 每个批次的样本数。 num_workers: 多进程加载的进程数(Windows 建议 0)。 shuffle: 是否打乱。 返回: torch.utils.data.DataLoader """ dataset = TextDataset(text, block_size) return DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, drop_last=True, # 丢弃最后一个不完整的 batch,保持梯度统计稳定 pin_memory=torch.cuda.is_available(), )

3.3 数据从哪来:三级回退策略

鲁棒性设计

get_dataset() 实现了一个鲁棒的数据获取链

┌──────────────────────────────────────────┐ │ 1. 本地缓存 data/tiny_shakespeare.txt │ ──► 命中?直接返回 └──────────────────────────────────────────┘ │ 未命中 ▼ ┌──────────────────────────────────────────┐ │ 2. HuggingFace datasets.tiny_shakespeare │ ──► 成功?缓存并返回 └──────────────────────────────────────────┘ │ 失败(库缺失/下线) ▼ ┌──────────────────────────────────────────┐ │ 3. GitHub 直链下载原始文本 │ ──► 缓存并返回 └──────────────────────────────────────────┘

核心代码

def get_dataset(cache_path: str = "data/tiny_shakespeare.txt") -> str: cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True) # 1) 命中本地缓存 if cache_path.exists(): return cache_path.read_text(encoding="utf-8") text = None # 2) 尝试 HuggingFace datasets if _HAS_DATASETS: try: ds = load_dataset("tiny_shakespeare", split="train") column = "text" if "text" in ds.column_names else ds.column_names[0] text = "\n".join(ds[column]) except Exception as e: print(f"HuggingFace 下载失败: {e}") # 3) 回退:直接从 GitHub 下载 if text is None: with urllib.request.urlopen(_TINY_SHAKESPEARE_URL) as resp: text = resp.read().decode(encoding="utf-8") # 4) 写入本地缓存 cache_path.write_text(text, encoding="utf-8") return text

工程亮点逐个讲

① 缓存优先

if cache_path.exists(): return cache_path.read_text(encoding="utf-8")

避免每次训练都打网络,断网也能跑。这是「数据获取」的第一原则:网络是奢侈品,本地是必需品。

② 优雅降级(graceful degradation)

try: from datasets import load_dataset _HAS_DATASETS = True except ImportError: _HAS_DATASETS = False

datasets 库缺失时不崩溃,只把标志位置 False。后续 if 分支跳过 HF 路径,直接走 GitHub 回退。这叫「可选依赖」处理。

对比反面教材:from datasets import load_dataset 写在文件顶部不带 try/except,没装 datasets 的用户 import dataset.py 就直接 ImportError,连回退机会都没有。

③ 字段兜底

column = "text" if "text" in ds.column_names else ds.column_names[0]

HF 数据集不同版本字段名可能是 textcontent 或别的。代码做了兜底:优先 text,否则取第一个字段。这是处理「外部数据源不稳定」的常见手法。

④ Path 替代字符串拼接

cache_path = Path(cache_path) cache_path.parent.mkdir(parents=True, exist_ok=True)

pathlib.Pathos.path.join 更现代。mkdir(parents=True, exist_ok=True) 一行搞定「目录不存在就建、存在不报错」。

3.4 分词:tiktoken 与 BPE

为什么需要分词

模型不认识字符,只认识数字。分词器(tokenizer)把文本切成 token(子词单元),再映射成整数 id:

"To be, or not to be" │ tiktoken.encode ▼ [787, 281, 11, 453, 459, 281, 281] ← token id 序列

三种分词粒度对比

粒度 例子 词表大小 优点 缺点
字符级 ['T','o',' ','b','e'] ~100 词表小、能处理任何词 序列太长,难学长程依赖
词级 ['To','be','or','not'] ~百万 语义清晰 不能处理未登录词
子词级 (BPE) ['To',' be',' or'] ~5万 兼顾两者 实现稍复杂

GPT 系列用子词级(BPE),是当前主流。

BPE 工作原理(科普)

Byte-Pair Encoding:训练时统计语料里最频繁的相邻字节对,把它们合并成一个新 token,重复 N 次。

初始:['T','h','e',' ','c','a','t'] ← 都是单字符 ↓ 合并 'c'+'a'='ca'(出现频繁) ['T','h','e',' ','ca','t'] ↓ 合并 'ca'+'t'='cat' ['T','h','e',' ','cat']

最终词表是「单字符 + 高频合并子词」的集合。新词用已有子词拼:

"unbelievable" → ["un", "believable"] ← "un" 是常见前缀子词 "helloworld" → ["hello", "world"] ← 没学过的词也能拼出来

为什么选 tiktoken + p50k_base

  • tiktoken 是 OpenAI 开源的高性能 BPE 分词器(Rust 实现,比 HuggingFace 快几倍)。
  • p50k_base 是 GPT-2 / text-davinci-002 用的编码,词表大小恰好 50257,与本项目 GPTConfig.vocab_size 对齐。

全局单例编码器

_ENCODER = tiktoken.get_encoding("p50k_base") def get_encoder() -> "tiktoken.Encoding": return _ENCODER

为什么用模块级全局变量?因为 tiktoken 初始化时要加载一张不小的 BPE 合并表(几十 MB),反复 get_encoding 会重复加载。用全局单例一次加载、处处复用,省内存省时间。

💡 这是 Python 模块级变量的经典用法:模块第一次 import 时执行 _ENCODER = ...,之后所有 from dataset import _ENCODER 都拿到同一个实例。

⚠️ 铁律的来源vocab_size=50257p50k_base 决定的,改它会导致 token id 与 embedding 矩阵的行数对不上,模型直接崩。换词表 = 换分词器,两者必须同步。

编码解码 API

enc = tiktoken.get_encoding("p50k_base") # 编码:文本 → id 列表 ids = enc.encode("Hello, world!") # [15496, 11, 995, 0] # 解码:id 列表 → 文本 text = enc.decode([15496, 11, 995, 0]) # "Hello, world!" # 词表大小 print(enc.n_vocab) # 50257

3.5 TextDataset:自回归样本构造(本章核心)

语言模型的训练目标

预测下一个 token。给定 ['T','o',' ','b','e'],模型要预测下一个是 ,

所以要把文本切成 (x, y) 对,其中 yx 左移一位:

class TextDataset(Dataset): def __init__(self, text: str, block_size: int): self.block_size = block_size self.tokens = _ENCODER.encode(text) self.n_samples = max(0, len(self.tokens) - block_size - 1) def __getitem__(self, idx): chunk = self.tokens[idx : idx + self.block_size + 1] x = torch.tensor(chunk[:-1], dtype=torch.long) y = torch.tensor(chunk[1:], dtype=torch.long) return x, y

切片图解

假设 block_size=4,token 序列为 [A, B, C, D, E, F, G, ...]

位置: 0 1 2 3 4 5 6 token: A B C D E F G ... └────── x ──────┘ └────── y ──────┘ ← y 整体右移一位 样本 idx=0: x=[A,B,C,D] y=[B,C,D,E] 样本 idx=1: x=[B,C,D,E] y=[C,D,E,F] 样本 idx=2: x=[C,D,E,F] y=[D,E,F,G] ...

三个关键细节

细节 1:取 block_size+1 长度的 chunk

chunk = self.tokens[idx : idx + self.block_size + 1] # 长度 block_size+1 x = chunk[:-1] # 前 block_size 个 y = chunk[1:] # 后 block_size 个

为什么 chunk 是 block_size+1 而不是 block_size?因为要同时容纳 x(block_size 个)和 y(block_size 个,与 x 错开一位),总共需要 block_size+1 个 token。

数学验证:chunk[:-1] 长度 = len(chunk) - 1 = block_sizechunk[1:] 长度 = len(chunk) - 1 = block_size ✓。

细节 2:y = chunk[1:] 不是 chunk[:-1]

x 是「输入」,y 是「目标」,y 比 x 滞后一位

位置 i: x[i] → 预测 y[i] = x[i+1]

模型在位置 i 看到 x[:i+1],要预测 y[i] = x[i+1](即下一个 token)。所以 y 是 x 整体右移一位,用 chunk[1:]

细节 3:样本数 = 总 token 数 − block_size − 1

self.n_samples = max(0, len(self.tokens) - block_size - 1)

为什么减 block_size + 1?因为每个样本要取 block_size + 1 长度的 chunk,最后一个完整 chunk 起点最多到 len - block_size - 1

max(0, ...) 是兜底:万一文本特别短(比 block_size 还短),返回 0 而不是负数。

一个 block 同时训练 block_size 个预测任务

注意:虽然一个样本有 block_size 个位置,但不是「一个样本 = 一个预测任务」

由于 GPT 的因果掩码(causal mask,见第 4 章),位置 0 只看自己预测位置 1,位置 1 看 [0,1] 预测位置 2……所有 block_size 个位置的预测任务同时在一个前向里完成,loss 是它们的平均。

x = [A, B, C, D] 位置: 0 1 2 3 │ │ │ │ ▼ ▼ ▼ ▼ 预测: B C D E ← 4 个预测任务同时进行 位置 0 看 [A] 预测 B 位置 1 看 [A,B] 预测 C 位置 2 看 [A,B,C] 预测 D 位置 3 看 [A,B,C,D] 预测 E

这就是 Transformer 训练高效的根本原因——一次前向 = block_size 个训练信号

滑动窗口 vs 不重叠切片

本项目用步长 1 的滑动窗口

样本 0: [0 : block_size+1] 样本 1: [1 : block_size+2] ← 只右移 1 样本 2: [2 : block_size+3] ...

相邻样本高度重叠(共享 block_size-1 个 token)。优点是数据多(33 万 token 能切 33 万样本),缺点是相邻样本相关性强,等于「一份数据反复用」。

工业训练常用步长 = block_size(不重叠)+ 多 epoch,减少冗余。本项目为简单用步长 1,配合 shuffle 缓解相关性。

3.6 build_dataloader:批处理与工程优化

def build_dataloader(text, block_size, batch_size, num_workers=0, shuffle=True): dataset = TextDataset(text, block_size) return DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, drop_last=True, # ① 丢弃不完整 batch pin_memory=torch.cuda.is_available(), # ② 锁页内存加速 H2D 拷贝 )

PyTorch DataLoader 基础

DataLoaderDataset 包装成可迭代对象,自动做:

  • 批处理:每次 yield 一个 batch(batch_size 个样本堆叠成张量)。
  • 打乱:每个 epoch 开始时 shuffle 索引(shuffle=True)。
  • 多进程加载:用 N 个子进程预取数据(num_workers)。
  • 自动拼张量__getitem__ 返回的 (x, y) 会被自动堆成 (B, T) 张量。

迭代替换:

for x, y in dataloader: # x.shape = (batch_size, block_size) # y.shape = (batch_size, block_size) ...

工程细节 1:drop_last=True

drop_last=True, # 丢弃最后一个不完整的 batch

最后一个不完整的 batch(比如 batch_size=32 但只剩 5 个样本)会被丢掉。

为什么?不完整 batch 的梯度统计与完整 batch 不同(少了 27 个样本的梯度平均),可能让训练末期出现 loss 抖动。丢掉它保持每个 batch 都是 32 个样本,梯度尺度一致。

工程细节 2:pin_memory=True

pin_memory=torch.cuda.is_available(),

锁页内存(page-locked memory):让数据预先放到 CPU 的「不可换页」内存区,CPU→GPU 拷贝能走 DMA 异步通道,比普通内存快。

配合训练循环里的:

x = x.to(device, non_blocking=True) # 异步拷贝

实现「数据传输」与「上一步反向传播计算」的重叠,免费的加速

工程细节 3:num_workers 的取舍

num_workers=num_workers, # 默认 0
  • num_workers=0:主进程加载数据,简单稳定,Windows 友好。
  • num_workers>0:开 N 个子进程并行预取,GPU 不用等数据。

取舍:

场景 推荐
Windows 0(避免 spawn 陷阱)
Linux + 大数据集 + 复杂预处理 4-8
Linux + 小数据集(本项目) 0(数据加载本来就快,多进程反而有启动开销)

3.7 数据流串起来

把所有环节串一遍,跟踪一个 batch 的生命周期:

1. get_dataset() 读取 data/tiny_shakespeare.txt → 返回 1MB 的纯文本字符串 2. TextDataset.__init__ 把文本编码为 token: _ENCODER.encode(text) → [787, 281, 11, ..., 338025 个 id] n_samples = 338025 - 128 - 1 = 337896 3. DataLoader 用 batch_size=32 shuffle=True 包装 内部生成打乱的索引 [12345, 892, 337000, ...] 4. 训练循环 for x, y in dataloader: 每次 next() 触发: - 按索引取 32 个样本 - 每个样本 __getitem__ 切出 (x_i, y_i) 各 shape=(128,) - 自动堆叠成 x.shape=(32, 128), y.shape=(32, 128) - pin_memory 把它们放到锁页内存 5. x.to(device, non_blocking=True) 异步拷贝到 GPU 6. model(input_ids=x, labels=y) 开始前向 + 算 loss

3.8 动手实验

  1. 体验 tiktoken:在 Python 里跑:

    from dataset import get_encoder enc = get_encoder() print(enc.encode("Hello, world!")) # [15496, 11, 995, 0] print(enc.decode([15496, 11, 995, 0])) # "Hello, world!" print(enc.encode("unbelievable")) # 观察子词切分 print(enc.encode("你好世界")) # 中文怎么切?
  2. 构造数据集

    from dataset import TextDataset ds = TextDataset("Hello world this is a test", block_size=4) print(len(ds)) # 样本数 x, y = ds[0] print(x, y) # 观察 x 和 y 的关系
  3. 边界测试:故意把 block_size 设成比总 token 数还大,观察 n_samples 会变成多少(答:0,会被 max(0, ...) 兜底)。

  4. 思考题:如果想让两个相邻样本完全不重叠(步长 = block_size 而非 1),该怎么改 __getitem__

    • 提示:返回 self.tokens[idx*block_size : idx*block_size + block_size + 1],且 n_samples = len(tokens) // (block_size + 1)
  5. 观察 drop_last:构造一个 100 个样本的数据集,batch_size=32,迭代一遍数一下实际 yield 了几次(答:3 次,丢了 4 个)。

3.9 小结

  • get_dataset()三级回退(本地缓存 → HF datasets → GitHub 直链)保证数据获取鲁棒,断网也能跑。
  • tiktoken 的 p50k_base 把文本切成子词 id 序列,词表 50257 与模型 vocab_size 对齐。
  • TextDatasetblock_size+1 长度的 chunk,切成 (x, y) = (chunk[:-1], chunk[1:]),y 是 x 左移一位。
  • 一个 block 同时训练 block_size 个预测任务(因果掩码保证只看过去)。
  • DataLoader 的 drop_last 保持 batch 尺度一致,pin_memory + non_blocking 实现数据传输与计算重叠。

3.10 下一章

数据准备好了,去 第 4 章 模型构建 看怎么搭一个 GPT。


发布者: 作者: 转发
评论区 (0)
U