Skip to content
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions doc/fluid/design/data_pipeline/data_pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Design Doc: Fluid Data Pipeline

This document is about how Fluid training and inference programs read data.

## Standard Data Format

### Case 1: Data from Files

Consider a Fluid training program, `resnet50.py`, needs to read data from disk:

```bash
cat data | python resnet50.py
Copy link
Collaborator

@reyoung reyoung Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note about pipe.

Fluid can not only read data fron stdin, but also read from file. A pipe and stdin are special files. We can even use mkfifo to create a named pipe, and it just acts like a real file.
Currently, Fluid support an operator named open_files. It can open multiple files and read them concurrently. We can create many named pipes and use many process to generate data.

mkfifo data1
mkfifo data2
./data_generator > data1 &  # make this process running at background
./data_generator > data2 &  # data1 and data2 will be generated concurrently.

#  in train.py
#  file_obj = fluid.open_files(['data1', 'data2'])
#  img, label = read(file_obj)

python train.py

Copy link
Collaborator Author

@wangkuiyi wangkuiyi Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out!

I agree it is a good idea to improve the data generation performance by reading from multiple data generators via pipes.

I also want to note that it is not a good design to have OpenFiles; instead, what we need are finer grained APIs:

  1. OpenFile -- File, but not Files, which opens a file as a byte stream,
  2. CreateRecordIOReader -- which extracts records, instead of bytes, from the stream,
  3. CreateMultiplexReader -- which monitors multiple readers and reads from one if it has any record ready for reading.

which provides the usage like the following

r = CreateMultiplexReader(
      OpenFile("/afs/resnet50/traininig-data-00000-of-00005"),
      OpenFile("/dev/stdin"),
      OpenFile("a_named_pipe"));

I am adding the above discusson to the document.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JiayiFeng MultiplexReader is an alternative way to implement open_files. Please take a look.

```

Since the person who collects the data might be different from the person who wrote resnet50.py:

1. Fluid operators used in `resnet50.py` can recognize the file format of `data`, or, we need a standard data format.
1. These operators need to be able to read from the standard input.

### Case 2: Data from Online Generators

Instead of files, data might come online. For example:

- Data generator for performance benchmarking.
- Data generator for training special models like [GAN](https://en.wikipedia.org/wiki/Generative_adversarial_network).
- The online data stream in production systems, e.g., online advertising.

Consider that

1. data generators could crash and be restarted (by Kubernetes or other cluster management systems), and
1. the network/pipe connection between could the generator and the trainer may break,

we need

1. the data format is fault-tolerable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need the fluid can read from multipe files concurrently? Suppose there are many data generators, they should generate the data into multiples files to avoid mutual interference.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. And the solution in my mind is as in #10179 (comment)


### A Choice: RecordIO

The [RecordIO file format](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/recordio/README.md) is a container of records and is fault-tolerable. It groups record into *chunks* and each chunk starts with a magic number and includes its MD5 hash, so that we could check the consistency skip over damaged chunks, which could be created by generators crashed unexpectedly or broken network connections.

## Discussions

### Other Data Formats

We also considered other data formats, e.g., [SSTable](https://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/). Different from that RecordIO is a container of records, SSTable is a container of key-value pairs. The fact that training and testing data used with machine learning are records but not key-value pairs inspires us to choose ReocrdIO instead of SSTable.

### Record Format

Usually, each instance contains multiple fields. For example, each instance of ResNet includes an image and one or more text labels. In another example of text classification, each instance contains text and its labels.

Data reading operators of Fluid must understand not only the file format, but also the record format, so could it map fields into Fluid variables of various types. For this purpose, we propose to standardize the record format as a protobuf message:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Protobuf message has a limit of storage size. I remember it is 64M by default. We shall take it into consideration although a single instance seems unlikely to be larger than 64M.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder and yes, I assume that each training instance shouldn't be over 64MB, actually, 32MB, because the serialization/deserialization mechanism of protobuf message warns if a message is over 32MB, and errors if it is over 64MB.


```protobuf
message Instance {
enum Type {
IMAGE = 0;
...
}
message Field {
required Type type = 1;
repeated bytes image = 2; // PNG image
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might not support so many types. The recordio format should only store LoDTensor for simplicity.

Files with other types, like PNG images, should be preprocessed to LoDTensor firstly, then save them into recordio file.

If we support many kinds of binary format storing in recordio, there are could be too flexible and we cannot reuse the data argumentation program.

Copy link
Collaborator Author

@wangkuiyi wangkuiyi Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It looks to me that your point is the alternative I described in the Open Discussion section below. Isn't it? If so, I agree with you that it would be convenient to save Fluid variable instead of raw data in records. I am updating the document accordingly.

}
Field fields = 1;
}
```

***Open Discussion:*** Should we reuse [`VarDesc.Type`](https://github.com/PaddlePaddle/Paddle/blob/72ee737f3f3e539e1f4b2a5e819e0d62f362c7b0/paddle/fluid/framework/framework.proto#L95) instead of reinventing `Instance.Type`?

### Data Augmentation

A typical kind of data augmentation is to duplicate each training instance by adding various types of noise, so to train a noise-tolerable model.

It is far from trivial to implement the many augmentation operations as Fluid operators, so we'd adopt a more flexible approach -- write the data augmentation program in arbitrary languages and pipe them up. For example:

```bash
cat data | go run add_noise.go | python resnet50.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data augmentation can be the performance bottleneck for a training process, especially when using many GPUs. It is hard to use GPUs for data augmentation when we use a separate program, which is necessary for image models. Maybe we should provide some common data augmentation algorithms by operators. We can also provide a flexible approach by piping multiple processes.

Reference: High-Performance Models by tensorflow. The half of this article describes how to feed data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we provide some augmentation operators. The capability of using pipeline is also critical; I am afraid that it is intractable to implement all augmentation operations as Fluid operators.

```

As we use standard data format, `add_noise.go` must be able to decode `data` and encode its outputs into the RecordIO format. We could provide the Go binding of the RecordIO API.
Copy link
Collaborator

@JiayiFeng JiayiFeng Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the format of data? If it's RecordIO, do we need to provide users tools to convert their own data to RecordIO format? If it is user-defined format, how to make sure the add_noise.go can be applied to it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should write data.recordio, instead of data, to make it clear that the format is RecordIO.

Yes, we need to provide at least one of the two kinds of tools, as I mentioned in this design:

  1. a library can be called by data augmentation programs
  2. a pair of decoding/encoding executable binaries -- recordio_encode and recordio_decode.


For quick-n-dirty experiments, we might want to write data augmentation programs as Bash/Awk scripts. In such case, we want the Bash script to read decoded records from stdin and writes to stdout, without being able to encode/decode RecordIO format. We can do this by

1. providing programs to encode/decode records to/from RecordIO files, and
1. base64-encoding records so could we use `\n` as the record separator.

The usage would be

```bash
cat data | recordio_decode | awk -f change_label.awk | recordio_encode | python resnet50.py
```

Please be aware that base64 would increase the data size by up to 30%, but for quick-n-dirty experiments, this should be acceptable. For high-performance/production usages, please feel free to write the data augmentation programs in a production language like C++ and Go, and don't use base64.