Rollout System Walkthrough 文件概述 rollout系统是slime中负责数据生成的核心组件,主要由两个文件组成: : 管理rollout引擎和路由器的生命周期; : 处理rollout数据生成和转换 slime rollout工作流程 核心组件详解 RolloutManager - 协调器 作用 RolloutManager是rollout系统的主控制器,负责协调Router、Controller和Engines之间的交互。 初始化流程 RolloutManager初始化 关键方法 A. 数据生成 asyncgenerate方法 B. 评估 asynceval方法 C.
rollout系统是slime中负责数据生成的核心组件,主要由两个文件组成:
slime/ray/rollout.py:class RolloutManager 管理rollout引擎和路由器的生命周期;slime/ray/buffer.py:class RolloutController 处理rollout数据生成和转换
作用
RolloutManager是rollout系统的主控制器,负责协调Router、Controller和Engines之间的交互。
初始化流程
class RolloutManager: def __init__(self, args, pg, wandb_run_id): self.args = args # 1. 启动Router _start_router(args) # 2. 创建Controller self.controller = RolloutController.options( num_cpus=1, num_gpus=0, ).remote(args, wandb_run_id=wandb_run_id) # 3. 创建Engine池 self.all_rollout_engines = create_rollout_engines(args, pg) # 4. 多节点配置:只向每个engine的node-0发送请求 nodes_per_engine = max(1, args.rollout_num_gpus_per_engine // args.rollout_num_gpus_per_node) self.rollout_engines = self.all_rollout_engines[::nodes_per_engine] # 5. 创建锁 self.rollout_engine_lock = Lock.options( num_cpus=1, num_gpus=0, ).remote()
关键方法
A. 数据生成
def async_generate(self, rollout_id): return self.controller.generate.remote(rollout_id)
B. 评估
def async_eval(self, rollout_id): return self.controller.eval.remote(rollout_id)
C. 内存管理 onload/offload
def async_offload(self): return [engine.release_memory_occupation.remote() for engine in self.rollout_engines] def async_onload(self, tags: List[str] = None): return [engine.resume_memory_occupation.remote(tags=tags) for engine in self.rollout_engines]
作用
创建SGLang引擎池,负责模型推理服务。
核心逻辑
def create_rollout_engines(args, pg): if args.debug_train_only: return [] # 计算引擎配置 num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.rollout_num_gpus_per_node) num_engines = args.rollout_num_gpus // num_gpu_per_engine # 创建Ray Actor RolloutRayActor = ray.remote(SGLangEngine) rollout_engines = [] for i in range(num_engines): num_gpus = 0.2 # 每个引擎使用0.2个GPU num_cpus = num_gpus # 设置调度策略 scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_capture_child_tasks=True, placement_group_bundle_index=reordered_bundle_indices[i * num_gpu_per_engine], ) # 创建引擎 rollout_engines.append( RolloutRayActor.options( num_cpus=num_cpus, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, runtime_env={"env_vars": {name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST}}, ).remote(args, rank=i) ) # 端口分配和初始化 # ... 端口分配逻辑 ... # 初始化所有引擎 init_handles = [engine.init.remote(**ports) for engine, ports in zip(rollout_engines, addr_and_ports)] ray.get(init_handles) return rollout_engines
关键特点:
作用
启动SGLang路由器,提供负载均衡服务。
实现细节
def _start_router(args): if args.sglang_router_ip is not None: return # 使用外部Router from sglang_router.launch_router import RouterArgs # 自动分配IP和端口 args.sglang_router_ip = get_host_info()[1] args.sglang_router_port = find_available_port(random.randint(3000, 4000)) # 配置Router参数 router_args = RouterArgs( host=args.sglang_router_ip, port=args.sglang_router_port, balance_abs_threshold=0, ) # 设置日志级别和超时 if hasattr(router_args, "log_level"): router_args.log_level = "warn" if hasattr(router_args, "request_timeout_secs"): router_args.request_timeout_secs = args.sglang_router_request_timeout_secs # 启动Router进程 process = multiprocessing.Process( target=run_router, args=(router_args,), ) process.daemon = True process.start() # 等待启动完成 time.sleep(3) assert process.is_alive()
作用
RolloutController是rollout系统的核心执行器,负责数据生成、转换和管理。
初始化
@ray.remote class RolloutController: def __init__(self, args, wandb_run_id): self.args = args init_wandb_secondary(args, wandb_run_id) # 创建数据源 self.data_source = RolloutDataSourceWithBuffer(args) # 动态加载rollout函数 self.generate_rollout = load_function(self.args.rollout_function_path) self.eval_generate_rollout = load_function(self.args.eval_function_path) print(f"import {self.args.rollout_function_path} as generate_rollout function.") print(f"import {self.args.eval_function_path} as eval_generate_rollout function.")
关键特性:
--rollout-function-path可以切换到SFT模式generate方法 - 核心生成流程
def generate(self, rollout_id): self.rollout_id = rollout_id # 1. 调试模式:从磁盘加载数据 if self.args.load_debug_rollout_data: data = torch.load( open(self.args.load_debug_rollout_data.format(rollout_id=rollout_id), "rb"), )["samples"] data = [Sample.from_dict(sample) for sample in data] else: # 2. 正常模式:调用rollout函数生成数据 data = self.generate_rollout(self.args, rollout_id, self.data_source, evaluation=False) # 3. 扁平化数据(如果是嵌套列表) if isinstance(data[0], list): data = sum(data, []) # 4. 可选:保存调试数据 if (path_template := self.args.save_debug_rollout_data) is not None: path = Path(path_template.format(rollout_id=self.rollout_id)) print(f"Save debug rollout data to {path}") path.parent.mkdir(parents=True, exist_ok=True) torch.save( dict( rollout_id=self.rollout_id, samples=[sample.to_dict() for sample in data], ), path, ) # 5. 转换为训练数据格式 data = self._convert_samples_to_train_data(data) # 6. 包装并返回 return Box(ray.put(data))
生成流程:
eval方法 - 评估流程
def eval(self, rollout_id): if self.args.debug_train_only: return # 调试模式不生成评估数据 # 调用评估rollout函数 data = self.eval_generate_rollout(self.args, rollout_id, self.data_source, evaluation=True) # 记录评估数据 log_eval_data(rollout_id, self.args, data)
作用
将生成的Sample对象转换为训练所需的字典格式。
转换逻辑
def _convert_samples_to_train_data(self, samples: Union[list[Sample], list[list[Sample]]]): """ Convert inference generated samples to training data. """ # 基础训练数据 train_data = { "tokens": [sample.tokens for sample in samples], "response_lengths": [sample.response_length for sample in samples], "rewards": [sample.get_reward_value(self.args) for sample in samples], "truncated": [1 if sample.status == Sample.Status.TRUNCATED else 0 for sample in samples], "sample_indices": [sample.index for sample in samples], } # 处理loss mask loss_masks = [] for sample in samples: # 如果没有提供loss_mask,创建默认的 if sample.loss_mask is None: sample.loss_mask = [1] * sample.response_length # 验证loss_mask长度 assert ( len(sample.loss_mask) == sample.response_length ), f"loss mask length {len(sample.loss_mask)} != response length {sample.response_length}" loss_masks.append(sample.loss_mask) train_data["loss_masks"] = loss_masks # 处理raw reward if samples[0].metadata and "raw_reward" in samples[0].metadata: train_data["raw_reward"] = [sample.metadata["raw_reward"] for sample in samples] # 处理round_number(用于rollout buffer) if samples[0].metadata and "round_number" in samples[0].metadata: train_data["round_number"] = [sample.metadata["round_number"] for sample in samples] return train_data
转换内容:
作用
记录评估数据到wandb和控制台。
def log_eval_data(rollout_id, args, data): log_dict = {} for key in data.keys(): rewards = data[key]["rewards"] log_dict[f"eval/{key}"] = sum(rewards) / len(rewards) if "truncated" in data[key]: truncated = data[key]["truncated"] log_dict[f"eval/{key}-truncated_ratio"] = sum(truncated) / len(truncated) print(f"eval {rollout_id}: {log_dict}") if args.use_wandb: log_dict["eval/step"] = ( rollout_id if not args.wandb_always_use_train_step else rollout_id * args.rollout_batch_size * args.n_samples_per_prompt // args.global_batch_size ) wandb.log(log_dict)

上图展示了slime rollout系统中各组件的关系。整个系统采用分层架构,实现了职责分离和高效协作。
A. 生成请求流
训练进程发起生成请求,经过Manager协调、Controller执行、Engine推理的完整流程,最终返回训练数据。
B. 管理操作流
参考Dataset Walkthrough,数据源交互流程:
RolloutDataSourceWithBuffer实例data_source.get_samples()获取prompt样本# RL模式(默认) --rollout-function-path slime.rollout.sglang_rollout.generate_rollout # SFT模式 --rollout-function-path slime.rollout.sft_rollout.generate_rollout # 自定义模式 --rollout-function-path path.to.custom.generate_rollout
def generate_rollout(args, rollout_id, data_source, evaluation=False) -> list[list[Sample]]: """ Args: args: 全局参数 rollout_id: rollout标识 data_source: 数据源 evaluation: 是否为评估模式 Returns: list[list[Sample]]: 生成的样本组 """ # 实现逻辑 return samples
SFT模式通过自定义rollout函数实现:
| 参数 | 说明 | 默认值 |
|---|---|---|
rollout_num_gpus_per_engine |
每个引擎使用的GPU数量 | 0.2 |
rollout_num_gpus |
总GPU数量 | - |
rollout_function_path |
rollout函数路径 | slime.rollout.sglang_rollout.generate_rollout |
eval_function_path |
评估函数路径 | - |
sglang_router_ip |
Router IP地址 | None(自动分配) |
sglang_router_port |
Router端口 | None(自动分配) |
这个架构使得rollout系统既高效又灵活,能够支持各种复杂的强化学习和监督学习训练场景!