Skip to content

Conversation

@Disservin
Copy link
Member

No description provided.

@Disservin
Copy link
Member Author

Disservin commented Sep 24, 2025

@Sopel97 do you think this would work regarding the changes for the multi gpu data loader ?

@vondele
Copy link
Member

vondele commented Sep 24, 2025

before the force push:

/workspace/scratch/packages/trainer/883957633278dc3dcc00c75a1b282295f7c7cfc9_build_dbb89f6f-ba60-4ade-9e85-d51b46a7770d/nnue-pytorch/training_data_loader.cpp: In function ‘int main(int, char**)’:
/workspace/scratch/packages/trainer/883957633278dc3dcc00c75a1b282295f7c7cfc9_build_dbb89f6f-ba60-4ade-9e85-d51b46a7770d/nnue-pytorch/training_data_loader.cpp:1149:45: error: too few arguments to function ‘Stream<SparseBatch>* create_sparse_batch_stream(const char*, int, int, const char* const*, int, bool, DataloaderSkipConfig, DataloaderDDPConfig)’
 1149 |     auto stream = create_sparse_batch_stream("HalfKAv2_hm^", concurrency, file_count, files, batch_size, cyclic, config);
      |                   ~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

@Disservin
Copy link
Member Author

Disservin commented Sep 24, 2025

fixed

Copy link
Member

@Sopel97 Sopel97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine, though were thinking this approach has 2 disadvantages

  1. It requires a lot of seeking at the start
  2. The data being read may eventually converge (in the worst case) to be identical for all ranks

@vondele suggested on discord

[22:39]vondele: i mean, the ith process of N could restrict itself to reading only from chunk k % N == i ..

which fixes both issues. The only one that comes up, implementation-wise is when the number of chunks in the file is lower than number of ranks. Could either just return as a failure in this case, or only reset the index once even cycle to allow it to grow past the number of chunks in the file (gives some chunk, we don't care really).

The code as is could be easily adapted. Skip rank chunks at the start, skip world_size chunks every chunk read. Reinitialize on end when cyclic.

@Disservin
Copy link
Member Author

like this?

@vondele
Copy link
Member

vondele commented Sep 24, 2025

segfaults for me?

 4  /workspace/scratch/packages/trainer/25775c4f832e33f899dde98220c0338a22721bb3/nnue-pytorch/libtraining_data_loader.so(_ZN7binpack41CompressedTrainingDataEntryParallelReader22fetchNextChunkIfNeededERmRSt6vectorIhSaIhEE+0x2a4) [0x40023deba8e4]
 5  /workspace/scratch/packages/trainer/25775c4f832e33f899dde98220c0338a22721bb3/nnue-pytorch/libtraining_data_loader.so(_ZZN7binpack41CompressedTrainingDataEntryParallelReaderC4EiSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaIS7_EESt13_Ios_OpenmodebSt8functionIFbRKNS_17TrainingDataEntryEEEiiENKUlvE_clEv+0x94) [0x40023debb274]

Copy link
Member

@Sopel97 Sopel97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic looks like it always skips N chunks, so if the number of chunks in a file is not divisible by N it will read a different set of chunks on the second cycle, which is undesirable.

Simplest solution I see would be to remove cycle handling from skipChunks, and instead check in fetchNextChunkIfNeeded if we reached the end of file, and if we did reinitialize from the start with skipping rank chunks, but at most once to prevent infinite loop.

@Disservin
Copy link
Member Author

but at most once to prevent infinite loop

but we are using this m_cyclic with true ? so we can just seek to start, skip chunks and be done with it, not sure which at most once logic you are refering to?

@vondele
Copy link
Member

vondele commented Sep 24, 2025

segfaults are gone in my tests.

@Sopel97
Copy link
Member

Sopel97 commented Sep 24, 2025

pretty sure as it is now it will not cycle the dataset the second time because m_files_cycled_for_ddp will be true. In fact I don't how this variable is needed at all.

but we are using this m_cyclic with true ? so we can just seek to start, skip chunks and be done with it, not sure which at most once logic you are refering to?

Not an issue with this implementation. Would be an issue if there was a loop skipping and resetting to start until we got to an actual rankth chunk

@Disservin
Copy link
Member Author

Yeah it's an artifact just wanted to clear things up

@vondele
Copy link
Member

vondele commented Sep 27, 2025

I think the initialization trick with the environment variables is not yet really working, at least locally I get this:

DDP rank: 0, world size: 1
DDP rank: 0, world size: 1
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/4
DDP rank: 2, world size: 4
DDP rank: 3, world size: 4
DDP rank: 2, world size: 4
DDP rank: 1, world size: 4
DDP rank: 3, world size: 4
DDP rank: 1, world size: 4
Initializing distributed: GLOBAL_RANK: 3, MEMBER: 4/4
Initializing distributed: GLOBAL_RANK: 2, MEMBER: 3/4
Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/4
distributed_backend=nccl
All distributed processes registered. Starting with 4 processes

So rank 0 thinks the world size is 1 at the time of creating the stream?

Edit: must say I'm not 100% if the PL autodetection works well here, so results from other systems would be good.

@vondele
Copy link
Member

vondele commented Sep 27, 2025

so, now finished training 2 stages with this branch, and I think this issue shows in the result:

No ddp:
Elo: -57.73 +/- 7.90, nElo: -113.99 +/- 15.30
Elo: -13.39 +/- 3.33, nElo: -25.39 +/- 6.31

ddp:
Elo: -81.26 +/- 9.02, nElo: -155.39 +/- 16.63
Elo: -20.74 +/- 4.60, nElo: -38.90 +/- 8.60

Reading a bit, torch.distributed is not initialized yet at the point where we create the data loader (only during train.fit), so that skipping factor initialization should probably be done lazily since at the time of creation of the data loader it is not yet known that we're in a distributed setup, the env variables are a bit of a hack.

We should probably base it on:

import torch.distributed as dist
if dist.is_available() and dist.is_initialized():
       print(f"Rank {dist.get_rank()} / {dist.get_world_size()}")
else:
        print("Not distributed")

@vondele
Copy link
Member

vondele commented Sep 29, 2025

I'm looking at the training loss, and I think we're not yet fully equivalent, consistent with the result from training (but looking at the validation loss might be quicker).

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants