Skip to content

Commit 0a55804

Browse files
committed
Add quick start docs and examples
1 parent 2ef1c40 commit 0a55804

File tree

4 files changed

+374
-178
lines changed

4 files changed

+374
-178
lines changed

README.md

Lines changed: 16 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,26 @@
1-
[![Build Status](https://travis-ci.org/alibaba/flink-ai-extended.svg?branch=master)](https://travis-ci.org/alibaba/flink-ai-extended)
1+
# Deep Learning on Flink
22

3-
# deep-learning-on-flink
3+
Deep Learning on Flink aims to integrate Flink and deep learning frameworks
4+
(e.g. TensorFlow, PyTorch, etc) to enable distributed deep learning training and
5+
inference on a Flink cluster.
46

5-
Deep Learning on Flink aims to integrate Flink and deep learning frameworks (e.g. TensorFlow, PyTorch, etc).
6-
It runs the deep learning tasks inside a Flink operator, so that Flink can help establish a distributed environment,
7-
manage the resource, read/write the records and handle the failures.
7+
It runs the deep learning tasks inside a Flink operator so that Flink can help
8+
establish a distributed environment, manage the resource, read/write the data
9+
with the rich connectors in Flink and handle the failures.
810

911
Currently, Deep Learning on Flink supports TensorFlow and PyTorch.
1012

11-
**contents**
12-
13-
- [TensorFlow support](#tensorflow-support)
14-
* [Support Version](#support-version)
15-
* [Quick Start](#quick-start)
16-
+ [Setup](#setup)
17-
+ [Build From Source](#build-from-source)
18-
+ [Build Source in virtual environment](#build-source-in-virtual-environment)
19-
+ [Example](#example)
20-
* [Distributed Running](#distributed-running)
21-
+ [Deployment](#deployment)
22-
+ [Running Distributed Programs](#running-distributed-programs)
23-
* [Distributed Running Example](#distributed-running-example)
24-
+ [Setup & Build](#setup---build)
25-
+ [Start Service](#start-service)
26-
+ [Prepare data & code](#prepare-data---code)
27-
+ [Submit train job](#submit-train-job)
28-
+ [Visit Flink Cluster](#visit-flink-cluster)
29-
+ [Stop all docker containers](#stop-all-docker-containers)
30-
+ [Summary](#summary)
31-
* [Optional Tools](#optional-tools)
32-
+ [Build framework and tensorflow python package Independently](#build-framework-and-tensorflow-python-package-independently)
33-
+ [Build custom virtual environment package](#build-custom-virtual-environment-package)
34-
- [Structure](#structure)
35-
- [For More Information](#for-more-information)
36-
- [License](#license)
37-
38-
# TensorFlow support
39-
TensorFlow is a deep learning system developed by Google and open source, which is widely used in the field of deep learning. There are many inconveniences in distributed use and resource management of native TensorFlow, but it can not integrate with the existing widely used large data processing framework.
40-
41-
Flink is a data processing framework. It is widely used in data extraction, feature preprocessing and data cleaning.
42-
43-
This project combines TensorFlow with Flink and provides users with more convenient and useful tools.
44-
**Currently, Flink job code can be written in both java with Flink Java API and in python with PyFlink. The algorithm code is written in python.**
45-
4613
## Support Version
47-
TensorFlow: 1.15.0 & 2.3.1
48-
49-
Flink: 1.11.x
14+
TensorFlow: 1.15.x & 2.3.x
15+
Pytorch: 1.x
16+
Flink: 1.14.x
5017

51-
## Quick Start
18+
## Getting Started
19+
20+
To get you hand dirty, You can follow [quick start](doc/quick_start.md)
21+
to submit an example job to a local standalone Flink cluster.
22+
23+
## Build
5224

5325
### Setup
5426

@@ -165,140 +137,6 @@ mvn clean install
165137
```shell
166138
deactivate
167139
```
168-
169-
### Example
170-
171-
1. tensorflow add example
172-
**<p>python code:</p>**
173-
174-
```python
175-
import tensorflow as tf
176-
import time
177-
import sys
178-
from flink_ml_tensorflow.tensorflow_context import TFContext
179-
180-
def build_graph():
181-
global a
182-
i = 1
183-
a = tf.placeholder(tf.float32, shape=None, name="a")
184-
b = tf.reduce_mean(a, name="b")
185-
r_list = []
186-
v = tf.Variable(dtype=tf.float32, initial_value=tf.constant(1.0), name="v_" + str(i))
187-
c = tf.add(b, v, name="c_" + str(i))
188-
add = tf.assign(v, c, name="assign_" + str(i))
189-
sum = tf.summary.scalar(name="sum_" + str(i), tensor=c)
190-
r_list.append(add)
191-
global_step = tf.contrib.framework.get_or_create_global_step()
192-
global_step_inc = tf.assign_add(global_step, 1)
193-
r_list.append(global_step_inc)
194-
return r_list
195-
196-
def map_func(context):
197-
tf_context = TFContext(context)
198-
job_name = tf_context.get_role_name()
199-
index = tf_context.get_index()
200-
cluster_json = tf_context.get_tf_cluster()
201-
202-
cluster = tf.train.ClusterSpec(cluster=cluster_json)
203-
server = tf.train.Server(cluster, job_name=job_name, task_index=index)
204-
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False,
205-
device_filters=["/job:ps", "/job:worker/task:%d" % index])
206-
t = time.time()
207-
if 'ps' == job_name:
208-
from time import sleep
209-
while True:
210-
sleep(1)
211-
else:
212-
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)):
213-
train_ops = build_graph()
214-
hooks = [tf.train.StopAtStepHook(last_step=2)]
215-
with tf.train.MonitoredTrainingSession(master=server.target, config=sess_config,
216-
checkpoint_dir="./target/tmp/s1/" + str(t),
217-
hooks=hooks) as mon_sess:
218-
while not mon_sess.should_stop():
219-
print (mon_sess.run(train_ops, feed_dict={a: [1.0, 2.0, 3.0]}))
220-
sys.stdout.flush()
221-
222-
```
223-
**<p>java code:</p>**
224-
add maven dependencies
225-
```xml
226-
<?xml version="1.0" encoding="UTF-8"?>
227-
<project xmlns="http://maven.apache.org/POM/4.0.0"
228-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
229-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
230-
<modelVersion>4.0.0</modelVersion>
231-
232-
<groupId>org.flinkextended</groupId>
233-
<artifactId>flink-ai-extended-examples</artifactId>
234-
<version>0.3.0</version>
235-
<packaging>jar</packaging>
236-
<dependencies>
237-
<dependency>
238-
<groupId>org.flinkextended</groupId>
239-
<artifactId>flink-ml-tensorflow</artifactId>
240-
<version>0.3.0</version>
241-
</dependency>
242-
<dependency>
243-
<groupId>org.apache.curator</groupId>
244-
<artifactId>curator-framework</artifactId>
245-
<version>2.7.1</version>
246-
</dependency>
247-
<dependency>
248-
<groupId>org.apache.curator</groupId>
249-
<artifactId>curator-test</artifactId>
250-
<version>2.7.1</version>
251-
<exclusions>
252-
<exclusion>
253-
<groupId>com.google.guava</groupId>
254-
<artifactId>guava</artifactId>
255-
</exclusion>
256-
</exclusions>
257-
</dependency>
258-
<dependency>
259-
<groupId>com.google.guava</groupId>
260-
<artifactId>guava</artifactId>
261-
<version>20.0</version>
262-
</dependency>
263-
</dependencies>
264-
265-
<build>
266-
<plugins>
267-
<plugin>
268-
<groupId>org.apache.maven.plugins</groupId>
269-
<artifactId>maven-compiler-plugin</artifactId>
270-
<version>3.1</version>
271-
<configuration>
272-
<source>1.8</source>
273-
<target>1.8</target>
274-
</configuration>
275-
</plugin>
276-
</plugins>
277-
</build>
278-
</project>
279-
```
280-
*You can refer to the following POM*
281-
282-
[example pom.xml](flink-ml-examples/pom.xml)
283-
284-
```java
285-
class Add{
286-
public static void main(String args[]) throws Exception{
287-
// local zookeeper server.
288-
TestingServer server = new TestingServer(2181, true);
289-
String script = "./add.py";
290-
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
291-
// if zookeeper has other address
292-
Map<String, String> prop = new HashMap<>();
293-
prop.put(MLConstants.CONFIG_STORAGE_TYPE, MLConstants.STORAGE_ZOOKEEPER);
294-
prop.put(MLConstants.CONFIG_ZOOKEEPER_CONNECT_STR, "localhost:2181");
295-
TFConfig config = new TFConfig(2, 1, prop, script, "map_func", null);
296-
TFUtils.train(streamEnv, null, config);
297-
JobExecutionResult result = streamEnv.execute();
298-
server.stop();
299-
}
300-
}
301-
```
302140

303141
## Distributed Running
304142
### Deployment

doc/quick_start.md

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Quick Start
21+
22+
This tutorial provides a quick introduction to using Deep Learning on Flink.
23+
This guide will show you how to download the latest stable version of Deep
24+
Learning on Flink, install. You will run a simple Flink job locally to train
25+
a linear model.
26+
27+
## Environment Requirement
28+
29+
- Java: 8
30+
- Python: 3.7
31+
- Flink: 1.14
32+
- TensorFlow: 1.15.x or 2.3.x
33+
34+
## Download & Install
35+
36+
### Download Flink
37+
[Download the latest binary release](https://flink.apache.org/downloads.html)
38+
of Flink 1.14, then extract the archive:
39+
40+
```sh
41+
tar -xzf flink-*.tgz
42+
```
43+
44+
Please refer to [guide](https://nightlies.apache.org/flink/flink-docs-release-1.14//docs/try-flink/local_installation/)
45+
for more detailed step of downloading or installing Flink.
46+
47+
### Download Deep Learning on Flink
48+
You can download the binary release of Deep Learning on Flink from
49+
[release](https://github.com/flink-extended/dl-on-flink/releases), then extract
50+
the archive:
51+
52+
```sh
53+
tar -xzf flink-ml-*.tgz
54+
```
55+
56+
Navigate to the extracted directory, you should see the following directory
57+
layout:
58+
59+
| Directory | Meaning |
60+
|---|---|
61+
|`lib/` | Directory containing the Deep Learning on Flink JARs compiled |
62+
|`examples/` | Directory containing examples. |
63+
64+
### Install Python dependencies
65+
In order to run Deep Learning on Flink job, we need install the python
66+
dependency.
67+
68+
Python dependency should be installed with pip. We strongly recommend using
69+
[virtualenv](https://virtualenv.pypa.io/en/latest/index.html) or other similar
70+
tools for an isolated Python environment.
71+
72+
Use the following command to install `flink-ml-framwork`
73+
```bash
74+
pip install flink-ml-framework
75+
```
76+
77+
Install `flink-ml-tensorflow` if you use Tensorflow 1.15.x
78+
```bash
79+
pip install flink-ml-tensorflow
80+
```
81+
82+
Install `flink-ml-tensorflow-2.x` if you use Tensorflow 2.3.x
83+
```bash
84+
pip install flink-ml-tensorflow-2.x
85+
```
86+
87+
## Starting Local Standalone Cluster
88+
89+
In this example, we use two workers to train the model. Thus, there has to be
90+
at least 2 slots available in the Flink cluster. To do that, you can simply
91+
config the `taskmanager.numberOfTaskSlots` at `config/flink-config.yaml` to 2.
92+
You can use the following command to do that.
93+
94+
```sh
95+
# We assume to be in the root directory of the Flink extracted distribution
96+
97+
sed -i '' 's/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 2/' ./conf/flink-conf.yaml
98+
```
99+
100+
Usually, starting a local Flink cluster by running the following command is
101+
enough for this quick start guide.
102+
103+
**Note: If you are using virtualenv, you should start your local Flink cluster
104+
with virtualenv activated.**
105+
106+
```sh
107+
# We assume to be in the root directory of the Flink extracted distribution
108+
109+
./bin/start-cluster.sh
110+
```
111+
112+
You should be able to navigate to the web UI at
113+
`http://<job manager ip address>:8081` to view the Flink dashboard and see that
114+
the cluster is up and running.
115+
116+
## Submit a Flink Job
117+
118+
The examples are included in the binary release. You can download the binary
119+
release from [release](https://github.com/flink-extended/dl-on-flink/releases).
120+
121+
You can run the following command to submit the job.
122+
123+
**Note: If you are using virtualenv, you should submit the job
124+
with virtualenv activated.**
125+
126+
```sh
127+
export DL_ON_FLINK_DIR=<root dir of Deep Learning on Flink extracted distribution>
128+
129+
# We assume to be in the root directory of the Flink extracted distribution.
130+
131+
# For tensorflow 1.15.x
132+
./bin/flink run \
133+
-py ${DL_ON_FLINK_DIR}/examples/tensorflow-on-flink/linear/flink_job.py \
134+
--jarfile ${DL_ON_FLINK_DIR}/lib/flink-ml-tensorflow-0.4-SNAPSHOT-jar-with-dependencies.jar
135+
136+
# For tensorflow 2.3.x
137+
./bin/flink run \
138+
-py ${DL_ON_FLINK_DIR}/examples/tensorflow-on-flink/linear/flink_job.py \
139+
--jarfile ${DL_ON_FLINK_DIR}/lib/flink-ml-tensorflow-2.x-0.4-SNAPSHOT-jar-with-dependencies.jar
140+
```
141+
142+
After the job is submitted successfully, you should see the job at running state
143+
in the Flink web ui.
144+
145+
If the job is finished, you will see the model saved at `/tmp/linear`.

0 commit comments

Comments
 (0)