[recipe] Add recipe demo to use StreamingDataset & StreamingDataLoader#93
[recipe] Add recipe demo to use StreamingDataset & StreamingDataLoader#93Jixixi2020 wants to merge 4 commits intoAscend:mainfrom
Conversation
CLA Signature Guide@Jixixi2020 , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
|
@NINGBENZHE Please help to review this recipe~ |
There was a problem hiding this comment.
Pull request overview
Adds a new recipe-style demo (streaming_dataloader_demo.py) showing how to connect multiple asynchronous RL-like pipeline stages via StreamingDataset + StreamingDataLoader, where each stage reads only the fields it needs and writes derived fields back into the same partition.
Changes:
- Introduces a Ray-based, decentralized “worker-per-stage” pipeline demo (rollout/ref/actor/reward/update).
- Demonstrates field-level streaming reads with
StreamingDataset(data_fields=...)and writes back viatq_client.put(..., metadata=batch_meta). - Adds a small driver loop that inserts prompts, waits for stage completion, simulates weight sync, and clears partitions.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -0,0 +1,428 @@ | |||
| import argparse | |||
| num_steps: int | ||
| pipeline_depth: int | ||
| global_batch_size: int | ||
| micro_batch_size: int | ||
| prompt_length: int | ||
| response_length: int |
| weight_sync_seconds: float | ||
| empty_poll_log_interval: int | ||
| num_data_storage_units: int |
| def _run_step(self, step: int) -> None: | ||
| partition_id = f"{self.cfg_demo.partition_prefix}_{step}" | ||
| dataloader = self._build_dataloader(partition_id) | ||
|
|
||
| for batch, batch_meta in dataloader: | ||
| sample_ids = batch["sample_id"].view(-1).tolist() | ||
| logger.info(f"[{self.worker_name}] step={step} consumed sample_ids={sample_ids}") | ||
|
|
||
| output, written_fields = self.compute(batch, batch_meta) | ||
| self.tq_client.put(output, metadata=batch_meta) | ||
|
|
||
| count = ray.get(self.tracker.record.remote(self.stage_name, step, len(sample_ids))) | ||
| logger.info( | ||
| f"[{self.worker_name}] step={step} done -> written_fields={written_fields}, " | ||
| f"{self.stage_name}_count={count}/{self.cfg_demo.global_batch_size}" | ||
| ) | ||
|
|
||
| ray.get(self.tracker.record_done.remote(self.stage_name, step)) | ||
| logger.info(f"[{self.worker_name}] step={step} worker_done recorded") | ||
|
|
||
| def _build_dataloader(self, partition_id: str) -> StreamingDataLoader: | ||
| dataset = StreamingDataset( | ||
| config=self.cfg, | ||
| batch_size=self.cfg_demo.micro_batch_size, | ||
| micro_batch_size=self.cfg_demo.micro_batch_size, | ||
| data_fields=self.input_fields(), | ||
| partition_id=partition_id, | ||
| task_name=f"{self.cfg_demo.task_name_prefix}_{self.stage_name}", | ||
| dp_rank=self.worker_id, | ||
| should_check_consumption_status=True, | ||
| ) | ||
| return StreamingDataLoader(dataset=dataset, num_workers=0, prefetch_factor=None) | ||
|
|
| ray.get(refs) | ||
| logger.info("demo done!") | ||
| return [] |
|
|
||
| logging.basicConfig(level=logging.INFO, format="%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s", datefmt="%H:%M:%S") | ||
| logger = logging.getLogger(__name__) |
CLA Signature Guide@Jixixi2020 , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
|
|
||
| for step in range(self.config.num_steps): | ||
| self._put_prompt(step) | ||
| self._wait_complete(step) |
There was a problem hiding this comment.
这里的数据流转是没有太大问题的,不过这里demo写的算是个on policy的场景,没有体现off policy的逻辑,可以考虑丰富一下
|
|
||
| ray.init() | ||
| try: | ||
| demo = DecentralizedInheritedWorkerPipelineDemo(cfg, build_tq_config(cfg)) |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
There was a problem hiding this comment.
Suggest to reference Relax here
There was a problem hiding this comment.
And we can call it relax_demo.py directly?
|
|
||
| def fit(self) -> list[dict]: | ||
| logger.info("=" * 72) | ||
| logger.info("TransferQueue StreamingDataLoader Decentralized Inherited Worker Pipeline Demo") |
There was a problem hiding this comment.
remember to modify this when changing the file name
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
There was a problem hiding this comment.
Please reference to recipe-check.yml to add this file to workflow
Summary
Adds a new demo to illustrate how to use
StreamingDatasetandStreamingDataLoaderin a simple data-centric, asynchronous RL-style workflow.The demo shows a decentralized worker-per-stage pipeline where each stage independently consumes the fields it needs from the queue and writes its outputs back for downstream stages. It is designed as a readable example of how streaming data access can be used to connect multiple RL pipeline stages without tightly coupling execution to a centralized stage-by-stage scheduler.
Changes
StreamingDatasetandStreamingDataLoaderusage