diff --git a/exercises/solutions/.gitignore b/exercises/solutions/.gitignore new file mode 100644 index 0000000..ddaa27d --- /dev/null +++ b/exercises/solutions/.gitignore @@ -0,0 +1,9 @@ +# Ignore .html and .json file generated by ray's ui +*.html +*.json + +# Ignore ipynb checkpoints +.ipynb_checkpoints + +# Ignore temporary files +.*~ diff --git a/exercises/solutions/exercise01.ipynb b/exercises/solutions/exercise01.ipynb new file mode 100644 index 0000000..e7934ad --- /dev/null +++ b/exercises/solutions/exercise01.ipynb @@ -0,0 +1,327 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Exercise 1 - Simple Data Parallel Example\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to run simple tasks in parallel.\n", + "\n", + "This script is too slow, and the computation is embarrassingly parallel. In this exercise, you will use Ray to execute the functions in parallel to speed it up.\n", + "\n", + "### Concept for this Exercise - Remote Functions\n", + "\n", + "The standard way to turn a Python function into a remote function is to add the `@ray.remote` decorator. Here is an example.\n", + "\n", + "```python\n", + "# A regular Python function.\n", + "def regular_function():\n", + " return 1\n", + "\n", + "# A Ray remote function.\n", + "@ray.remote\n", + "def remote_function():\n", + " return 1\n", + "```\n", + "\n", + "The differences are the following:\n", + "\n", + "1. **Invocation:** The regular version is called with `regular_function()`, whereas the remote version is called with `remote_function.remote()`.\n", + "2. **Return values:** `regular_function` immediately executes and returns `1`, whereas `remote_function` immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be obtained with `ray.get`.\n", + " ```python\n", + " >>> regular_function()\n", + " 1\n", + " \n", + " >>> remote_function.remote()\n", + " ObjectID(1c80d6937802cd7786ad25e50caf2f023c95e350)\n", + " \n", + " >>> ray.get(remote_function.remote())\n", + " 1\n", + " ```\n", + "3. **Parallelism:** Invocations of `regular_function` happen **serially**, for example\n", + " ```python\n", + " # These happen serially.\n", + " for _ in range(4):\n", + " regular_function()\n", + " ```\n", + " whereas invocations of `remote_function` happen in **parallel**, for example\n", + " ```python\n", + " # These happen in parallel.\n", + " for _ in range(4):\n", + " remote_function.remote()\n", + " ```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start Ray. By default, Ray does not schedule more tasks concurrently than there are CPUs. This example requires four tasks to run concurrently, so we tell Ray that there are four CPUs. Usually this is not done and Ray computes the number of CPUs using `psutil.cpu_count()`. The argument `redirect_output=True` just suppresses some logging.\n", + "\n", + "The call to `ray.init` starts a number of processes." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:49263 to respond...\n", + "Waiting for redis server at 127.0.0.1:61954 to respond...\n", + "Starting local scheduler with 4 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8893/notebooks/ray_ui84549.ipynb?token=33d5b878725f4d8f053fe2bdbfac3b2a22fd0a4ccbb3aa20\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler21215428'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store93576587', manager_name='/tmp/plasma_manager5822887', manager_port=40447)],\n", + " 'redis_address': '127.0.0.1:49263',\n", + " 'webui_url': 'http://localhost:8893/notebooks/ray_ui84549.ipynb?token=33d5b878725f4d8f053fe2bdbfac3b2a22fd0a4ccbb3aa20'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** The function below is slow. Turn it into a remote function using the `@ray.remote` decorator." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# This function is a proxy for a more interesting and computationally\n", + "# intensive function.\n", + "def slow_function(i):\n", + " time.sleep(1)\n", + " return i\n", + "@ray.remote\n", + "def remote_function(i):\n", + " time.sleep(1)\n", + " return i" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** The loop below takes too long. The four function calls could be executed in parallel. Instead of four seconds, it should only take one second. Once `slow_function` has been made a remote function, execute these four tasks in parallel by calling `slow_function.remote()`. Then obtain the results by calling `ray.get` on a list of the resulting object IDs." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "# We do this because workers may still be starting up in the background.\n", + "time.sleep(4.0) # Increased the sleep time as 2.0 was small at times\n", + "start_time = time.time()\n", + "\n", + "results = []\n", + "for i in range(4):\n", + " results.append(remote_function.remote(i))\n", + "results = ray.get(results)\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false, + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 1.0059635639190674 seconds.\n" + ] + } + ], + "source": [ + "assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'\n", + "assert duration < 1.1, ('The loop took {} seconds. This is too slow.'\n", + " .format(duration))\n", + "assert duration > 1, ('The loop took {} seconds. This is too fast.'\n", + " .format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Use the UI to view the task timeline and to verify that the four tasks were executed in parallel. After running the cell below, you'll need to click on **View task timeline**\".\n", + "- Using the **second** button, you can click and drag to **move** the timeline.\n", + "- Using the **third** button, you can click and drag to **zoom**. You can also zoom by holding \"alt\" and scrolling.\n", + "\n", + "**NOTE:** Normally our UI is used as a separate Jupyter notebook. However, for simplicity we embedded the relevant feature here in this notebook.\n", + "\n", + "**NOTE:** The first time you click **View task timeline** it may take **several minutes** to start up. This will change.\n", + "\n", + "**NOTE:** If you run more tasks and want to regenerate the UI, you need to move the slider bar a little bit and then click **View task timeline** again.\n", + "\n", + "**NOTE:** The timeline visualization may only work in **Chrome**." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "To view fullscreen, open chrome://tracing in Google Chrome and load `/tmp/tmpxo6zov96.json`\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import ray.experimental.ui as ui\n", + "ui.task_timeline()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + }, + "widgets": { + "state": { + "18e4f7a4f0d14019b433838f43a21dde": { + "views": [ + { + "cell_index": 11 + } + ] + }, + "5629e28d1420403fbe81f0a88fbdeab4": { + "views": [ + { + "cell_index": 11 + } + ] + }, + "5a1628570ea7498580bec45dd430f87c": { + "views": [ + { + "cell_index": 11 + } + ] + }, + "84312b77b729401b9c6e82c457a8385c": { + "views": [ + { + "cell_index": 11 + } + ] + }, + "fcf4d4d5865342bfb66b8efb46f8d9b9": { + "views": [ + { + "cell_index": 11 + } + ] + } + }, + "version": "1.2.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise02.ipynb b/exercises/solutions/exercise02.ipynb new file mode 100644 index 0000000..668e09a --- /dev/null +++ b/exercises/solutions/exercise02.ipynb @@ -0,0 +1,333 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 2 - Parallel Data Processing with Task Dependencies\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.\n", + "\n", + "In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.\n", + "\n", + "In this exercise, you will use Ray to parallelize the computation below and speed it up.\n", + "\n", + "### Concept for this Exercise - Task Dependencies\n", + "\n", + "Suppose we have two remote functions defined as follows.\n", + "\n", + "```python\n", + "@ray.remote\n", + "def f(x):\n", + " return x\n", + "```\n", + "\n", + "Arguments can be passed into remote functions as usual.\n", + "\n", + "```python\n", + ">>> x1_id = f.remote(1)\n", + ">>> ray.get(x1_id)\n", + "1\n", + "\n", + ">>> x2_id = f.remote([1, 2, 3])\n", + ">>> ray.get(x2_id)\n", + "[1, 2, 3]\n", + "```\n", + "\n", + "**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.\n", + "\n", + "```python\n", + ">>> y1_id = f.remote(x1_id)\n", + ">>> ray.get(y1_id)\n", + "1\n", + "\n", + ">>> y2_id = f.remote(x2_id)\n", + ">>> ray.get(y2_id)\n", + "[1, 2, 3]\n", + "```\n", + "\n", + "So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.\n", + "\n", + "**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.\n", + "\n", + "- The second task will not be executed until the first task has finished executing.\n", + "- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:10319 to respond...\n", + "Waiting for redis server at 127.0.0.1:33815 to respond...\n", + "Starting local scheduler with 4 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8894/notebooks/ray_ui56649.ipynb?token=6a1804c2bc65bf39b6f8fff6e9fe26ce2325ee91f5ce0155\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler88514660'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store60048017', manager_name='/tmp/plasma_manager86793328', manager_port=61227)],\n", + " 'redis_address': '127.0.0.1:10319',\n", + " 'webui_url': 'http://localhost:8894/notebooks/ray_ui56649.ipynb?token=6a1804c2bc65bf39b6f8fff6e9fe26ce2325ee91f5ce0155'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "These are some helper functions that mimic an example pattern of a data parallel application.\n", + "\n", + "**EXERCISE:** You will need to turn these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "def load_data(filename):\n", + " time.sleep(0.1)\n", + " return np.ones((1000, 100))\n", + "\n", + "@ray.remote\n", + "def normalize_data(data):\n", + " time.sleep(0.1)\n", + " return data - np.mean(data, axis=0)\n", + "\n", + "@ray.remote\n", + "def extract_features(normalized_data):\n", + " time.sleep(0.1)\n", + " return np.hstack([normalized_data, normalized_data ** 2])\n", + "\n", + "@ray.remote\n", + "def compute_loss(features):\n", + " num_data, dim = features.shape\n", + " time.sleep(0.1)\n", + " return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.\n", + "\n", + "**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(2.0)\n", + "start_time = time.time()\n", + "\n", + "losses = []\n", + "for filename in ['file1', 'file2', 'file3', 'file4']:\n", + " data = load_data.remote(filename)\n", + " normalized_data = normalize_data.remote(data)\n", + " features = extract_features.remote(normalized_data)\n", + " loss = compute_loss.remote(features)\n", + " losses.append(loss)\n", + "\n", + "loss = sum(ray.get(losses))\n", + "\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 0.531139612197876 seconds.\n" + ] + } + ], + "source": [ + "assert loss == 4000\n", + "assert duration < 0.8, ('The loop took {} seconds. This is too slow.'\n", + " .format(duration))\n", + "assert duration > 0.4, ('The loop took {} seconds. This is too fast.'\n", + " .format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Use the UI to view the task timeline and to verify that the relevant tasks were executed in parallel. After running the cell below, you'll need to click on **View task timeline**\".\n", + "- Using the **second** button, you can click and drag to **move** the timeline.\n", + "- Using the **third** button, you can click and drag to **zoom**. You can also zoom by holding \"alt\" and scrolling.\n", + "\n", + "In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "To view fullscreen, open chrome://tracing in Google Chrome and load `/tmp/tmp3hirb5nc.json`\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import ray.experimental.ui as ui\n", + "ui.task_timeline()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + }, + "widgets": { + "state": { + "4f23203d7f684d3798936b5f8a561a03": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "6ca363794e3849a6b56ead1503164d3b": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "92ae66e3495744959183a1b37c1a175b": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "c22119c3244a4c2fae37b906d3525e74": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "faaa26c96b0c494cbc8790b5ad0499ed": { + "views": [ + { + "cell_index": 10 + } + ] + } + }, + "version": "1.2.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise03.ipynb b/exercises/solutions/exercise03.ipynb new file mode 100644 index 0000000..536f178 --- /dev/null +++ b/exercises/solutions/exercise03.ipynb @@ -0,0 +1,325 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 3 - Tree Reduce\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to implement a tree reduce in Ray by passing object IDs into remote functions to encode dependencies between tasks.\n", + "\n", + "In this exercise, you will use Ray to implement parallel data generation and a parallel tree reduction." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:15142 to respond...\n", + "Waiting for redis server at 127.0.0.1:64040 to respond...\n", + "Starting local scheduler with 8 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8895/notebooks/ray_ui12814.ipynb?token=c2d9b783979e8bacbd50a6278ac58651787dc16de314c724\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler92566628'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store32359160', manager_name='/tmp/plasma_manager66816245', manager_port=15155)],\n", + " 'redis_address': '127.0.0.1:15142',\n", + " 'webui_url': 'http://localhost:8895/notebooks/ray_ui12814.ipynb?token=c2d9b783979e8bacbd50a6278ac58651787dc16de314c724'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=8, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** These functions will need to be turned into remote functions so that the tree of tasks can be executed in parallel." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# This is a proxy for a function which generates some data.\n", + "@ray.remote\n", + "def create_data(i):\n", + " time.sleep(0.3)\n", + " return i * np.ones(10000)\n", + "\n", + "# This is a proxy for an expensive aggregation step (which is also\n", + "# commutative and associative so it can be used in a tree-reduce).\n", + "@ray.remote\n", + "def aggregate_data(x, y):\n", + " time.sleep(0.3)\n", + " return x * y" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Make the data creation tasks run in parallel. Also aggregate the vectors in parallel. Note that the `aggregate_data` function must be called 7 times. They cannot all run in parallel because some depend on the outputs of others. However, it is possible to first run 4 in parallel, then 2 in parallel, and then 1." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(4.0) # @Praveen.Palanisamy: Observation: time.sleep(2) seems not enough sleep time for improving\n", + " # the accuracy of timing measurements. When time.sleep(2.0) was used, even though the task timeline\n", + " # diagram showed that the executions were completed in 1.21x seconds, the assert statements\n", + " # based on the timing measurements raised errors.\n", + "start_time = time.time()\n", + "\n", + "# EXERCISE: Here we generate some data. Do this part in parallel.\n", + "vectors = [create_data.remote(i + 1) for i in range(8)]\n", + "\n", + "# Here we aggregate all of the data repeatedly calling aggregate_data. This\n", + "# can be sped up using Ray.\n", + "#\n", + "# NOTE: A direct translation of the code below to use Ray will not result in\n", + "# a speedup because each function call uses the output of the previous function\n", + "# call so the function calls must be executed serially.\n", + "#\n", + "# EXERCISE: Speed up the aggregation below by using Ray. Note that this will\n", + "# require restructuring the code to expose more parallelism. First run 4 tasks\n", + "# aggregating the 8 values in pairs. Then run 2 tasks aggregating the resulting\n", + "# 4 intermediate values in pairs. then run 1 task aggregating the two resulting\n", + "# values. Lastly, you will need to call ray.get to retrieve the final result.\n", + "#result = aggregate_data(vectors[0], vectors[1])\n", + "#result = aggregate_data(result, vectors[2])\n", + "#result = aggregate_data(result, vectors[3])\n", + "#result = aggregate_data(result, vectors[4])\n", + "#result = aggregate_data(result, vectors[5])\n", + "#result = aggregate_data(result, vectors[6])\n", + "#result = aggregate_data(result, vectors[7])\n", + "\n", + "# NOTE: For clarity, the aggregation above is written out as 7 separate function\n", + "# calls, but this can be done more easily in a while loop via\n", + "#\n", + "# while len(vectors) > 1:\n", + "# vectors = aggregate_data(vectors[0], vectors[1]) + vectors[2:]\n", + "# result = vectors[0]\n", + "#\n", + "# When expressed this way, the change from serial aggregation to tree-structured\n", + "# aggregation can be made simply by appending the result of aggregate_data to the\n", + "# end of the vectors list as opposed to the beginning.\n", + "#\n", + "# EXERCISE: Think about why this is true.\n", + "\n", + "while len(vectors) > 1:\n", + " vectors = vectors[2:] + list([aggregate_data.remote(vectors[0], vectors[1])])\n", + "result = ray.get(vectors[0])\n", + "\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Use the UI to view the task timeline and to verify that the vectors were aggregated with a tree of tasks.\n", + "\n", + "You should be able to see the 8 `create_data` tasks running in parallel followed by 4 `aggregate_data` tasks running in parallel followed by 2 more `aggregate_data` tasks followed by 1 more `aggregate_data` task.\n", + "\n", + "In the timeline, click on **View Options** and select **Flow Events** to visualize tasks dependencies." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "To view fullscreen, open chrome://tracing in Google Chrome and load `/tmp/tmpvuhjtnpu.json`\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import ray.experimental.ui as ui\n", + "ui.task_timeline()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 1.2234106063842773 seconds.\n" + ] + } + ], + "source": [ + "assert np.all(result == 40320 * np.ones(10000)), ('Did you remember to '\n", + " 'call ray.get?')\n", + "assert duration < 0.3 + 0.9 + 0.3, ('FAILURE: The data generation and '\n", + " 'aggregation took {} seconds. This is '\n", + " 'too slow'.format(duration))\n", + "assert duration > 0.3 + 0.9, ('FAILURE: The data generation and '\n", + " 'aggregation took {} seconds. This is '\n", + " 'too fast'.format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + }, + "widgets": { + "state": { + "1a580876bd224ca5b255d98d15f58fb9": { + "views": [ + { + "cell_index": 8 + } + ] + }, + "73f8209795204c9bbbeb91b01286a727": { + "views": [ + { + "cell_index": 8 + } + ] + }, + "87be49ae95bd4293b3f90a29e25225f2": { + "views": [ + { + "cell_index": 8 + } + ] + }, + "c4b52e74d495499a9ff68269b4e5a56a": { + "views": [ + { + "cell_index": 8 + } + ] + }, + "c7e49cf27cee46c0a67c5295e8a823e6": { + "views": [ + { + "cell_index": 8 + } + ] + } + }, + "version": "1.2.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise04.ipynb b/exercises/solutions/exercise04.ipynb new file mode 100644 index 0000000..f64e49f --- /dev/null +++ b/exercises/solutions/exercise04.ipynb @@ -0,0 +1,286 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 4 - Nested Parallelism\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to create nested tasks by calling a remote function inside of another remote function.\n", + "\n", + "In this exercise, you will implement the structure of a parallel hyperparameter sweep which trains a number of models in parallel. Each model will be trained using parallel gradient computations.\n", + "\n", + "### Concepts for this Exercise - Nested Remote Functions\n", + "\n", + "Remote functions can call other functions. For example, consider the following.\n", + "\n", + "```python\n", + "@ray.remote\n", + "def f():\n", + " return 1\n", + "\n", + "@ray.remote\n", + "def g():\n", + " # Call f 4 times and return the resulting object IDs.\n", + " return [f.remote() for _ in range(4)]\n", + "\n", + "@ray.remote\n", + "def h():\n", + " # Call f 4 times, block until those 4 tasks finish,\n", + " # retrieve the results, and return the values.\n", + " return ray.get([f.remote() for _ in range(4)])\n", + "```\n", + "\n", + "Then calling `g` and `h` produces the following behavior.\n", + "\n", + "```python\n", + ">>> ray.get(g.remote())\n", + "[ObjectID(b1457ba0911ae84989aae86f89409e953dd9a80e),\n", + " ObjectID(7c14a1d13a56d8dc01e800761a66f09201104275),\n", + " ObjectID(99763728ffc1a2c0766a2000ebabded52514e9a6),\n", + " ObjectID(9c2f372e1933b04b2936bb6f58161285829b9914)]\n", + "\n", + ">>> ray.get(h.remote())\n", + "[1, 1, 1, 1]\n", + "```\n", + "\n", + "**One limitation** is that the definition of `f` must come before the definitions of `g` and `h` because as soon as `g` is defined, it will be pickled and shipped to the workers, and so if `f` hasn't been defined yet, the definition will be incomplete." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:21429 to respond...\n", + "Waiting for redis server at 127.0.0.1:27191 to respond...\n", + "Starting local scheduler with 9 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8896/notebooks/ray_ui98647.ipynb?token=76324c0d73bfaf266a16d7fb60a9c067c96db69d7715f3f8\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler92272777'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store68370232', manager_name='/tmp/plasma_manager77547876', manager_port=56153)],\n", + " 'redis_address': '127.0.0.1:21429',\n", + " 'webui_url': 'http://localhost:8896/notebooks/ray_ui98647.ipynb?token=76324c0d73bfaf266a16d7fb60a9c067c96db69d7715f3f8'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=9, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example represents a hyperparameter sweep in which multiple models are trained in parallel. Each model training task also performs data parallel gradient computations.\n", + "\n", + "**EXERCISE:** Turn `compute_gradient` and `train_model` into remote functions so that they can be executed in parallel. Inside of `train_model`, do the calls to `compute_gradient` in parallel and fetch the results using `ray.get`." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "def compute_gradient(data):\n", + " time.sleep(0.03)\n", + " return 1\n", + "\n", + "@ray.remote\n", + "def train_model(hyperparameters):\n", + " result = 0\n", + " for i in range(10):\n", + " # EXERCISE: After you turn \"compute_gradient\" into a remote function,\n", + " # you will need to call it with \".remote\". The results must be retrieved\n", + " # with \"ray.get\" before \"sum\" is called.\n", + " result += sum(ray.get([compute_gradient.remote(j) for j in range(2)]))\n", + " return result" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** The code below runs 3 hyperparameter experiments. Change this to run the experiments in parallel." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(5.0)# @Praveen.Palanisamy: Observation: time.sleep(2) seems not enough sleep time for improving\n", + " # the accuracy of timing measurements. When time.sleep(2.0) was used, even though the task timeline\n", + " # diagram showed that the executions were completed in 1.21x seconds, the assert statements\n", + " # based on the timing measurements raised errors.\n", + "start_time = time.time()\n", + "\n", + "# Run some hyperparaameter experiments.\n", + "results = []\n", + "for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},\n", + " {'learning_rate': 1e-2, 'batch_size': 100},\n", + " {'learning_rate': 1e-3, 'batch_size': 100}]:\n", + " results.append(train_model.remote(hyperparameters))\n", + "results = ray.get(results)\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 0.3440968990325928 seconds.\n" + ] + } + ], + "source": [ + "assert results == [20, 20, 20]\n", + "assert duration < 0.5, ('The experiments ran in {} seconds. This is too '\n", + " 'slow.'.format(duration))\n", + "assert duration > 0.3, ('The experiments ran in {} seconds. This is too '\n", + " 'fast.'.format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Use the UI to view the task timeline and to verify that the pattern makes sense." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import ray.experimental.ui as ui\n", + "ui.task_timeline()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + }, + "widgets": { + "state": { + "084e5f48481045ef99047148a6528e10": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "089dbf3ad6fe4268be96138b70812137": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "135a480693a34f87840c3067da3ad4c0": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "3ed0900c371b43289bb8b66f408920f0": { + "views": [ + { + "cell_index": 10 + } + ] + }, + "bd0e9d677f3e4ce9a90a2350b40f5ed5": { + "views": [ + { + "cell_index": 10 + } + ] + } + }, + "version": "1.2.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise05.ipynb b/exercises/solutions/exercise05.ipynb new file mode 100644 index 0000000..b923105 --- /dev/null +++ b/exercises/solutions/exercise05.ipynb @@ -0,0 +1,230 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 5 - Handling Slow Tasks\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.\n", + "\n", + "See the documentation for ray.wait at http://ray.readthedocs.io/en/latest/api.html#waiting-for-a-subset-of-tasks-to-finish.\n", + "\n", + "This script starts 6 tasks, each of which takes a random amount of time to complete. We'd like to process the results in two batches (each of size 3). Change the code so that instead of waiting for a fixed set of 3 tasks to finish, we make the first batch consist of the first 3 tasks that complete. The second batch should consist of the 3 remaining tasks. Do this exercise by using `ray.wait`.\n", + "\n", + "### Concepts for this Exercise - ray.wait\n", + "\n", + "After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.\n", + "\n", + "```python\n", + "ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout_ms=None)\n", + "```\n", + "\n", + "**Arguments:**\n", + "- `object_ids`: This is a list of object IDs.\n", + "- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.\n", + "- `timeout_ms`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout_ms` milliseconds have passed.\n", + "\n", + "**Return values:**\n", + "- `ready_ids`: This is a list of object IDs that are available in the object store.\n", + "- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:33768 to respond...\n", + "Waiting for redis server at 127.0.0.1:22364 to respond...\n", + "Starting local scheduler with 6 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8897/notebooks/ray_ui79895.ipynb?token=6f1a6d3d5055567501f93bd9472fdbd9fafd2a8250e8f310\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler91432317'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store70608289', manager_name='/tmp/plasma_manager2803995', manager_port=48820)],\n", + " 'redis_address': '127.0.0.1:33768',\n", + " 'webui_url': 'http://localhost:8897/notebooks/ray_ui79895.ipynb?token=6f1a6d3d5055567501f93bd9472fdbd9fafd2a8250e8f310'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=6, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define a remote function that takes a variable amount of time to run." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "def f(i):\n", + " np.random.seed(5 + i)\n", + " x = np.random.uniform(0, 4)\n", + " time.sleep(x)\n", + " return i, time.time()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(2.0)\n", + "start_time = time.time()\n", + "\n", + "# This launches 6 tasks, each of which takes a random amount of time to\n", + "# complete.\n", + "result_ids = [f.remote(i) for i in range(6)]\n", + "# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we\n", + "# should instead use the first 3 tasks that finish.\n", + "ready_ids, remaining_ids = ray.wait(result_ids, num_returns= 3)\n", + "initial_results = ray.get(ready_ids)\n", + "\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Wait for the remaining tasks to complete.\n", + "remaining_results = ray.get(remaining_ids)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 1.3953678607940674 seconds.\n" + ] + } + ], + "source": [ + "assert len(initial_results) == 3\n", + "assert len(remaining_results) == 3\n", + "\n", + "initial_indices = [result[0] for result in initial_results]\n", + "initial_times = [result[1] for result in initial_results]\n", + "remaining_indices = [result[0] for result in remaining_results]\n", + "remaining_times = [result[1] for result in remaining_results]\n", + "\n", + "assert set(initial_indices + remaining_indices) == set(range(6))\n", + "\n", + "assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '\n", + " '{} seconds. This is too slow.'.format(duration))\n", + "\n", + "assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '\n", + " '{} seconds. This is too slow.'.format(duration))\n", + "\n", + "# Make sure the initial results actually completed first.\n", + "assert max(initial_times) < min(remaining_times)\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise06.ipynb b/exercises/solutions/exercise06.ipynb new file mode 100644 index 0000000..1f4a1cb --- /dev/null +++ b/exercises/solutions/exercise06.ipynb @@ -0,0 +1,194 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 6 - Process Tasks in Order of Completion\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.\n", + "\n", + "See the documentation for ray.wait at http://ray.readthedocs.io/en/latest/api.html#waiting-for-a-subset-of-tasks-to-finish.\n", + "\n", + "The code below runs 10 tasks and retrieves the results in the order that the tasks were launched. However, since each task takes a random amount of time to finish, we could instead process the tasks in the order that they finish." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:14568 to respond...\n", + "Waiting for redis server at 127.0.0.1:58432 to respond...\n", + "Starting local scheduler with 5 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8898/notebooks/ray_ui22125.ipynb?token=15e82bfc4d2507595d3a038d15eda02d51652612b05a2c7e\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler66481052'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store79155088', manager_name='/tmp/plasma_manager62186845', manager_port=41265)],\n", + " 'redis_address': '127.0.0.1:14568',\n", + " 'webui_url': 'http://localhost:8898/notebooks/ray_ui22125.ipynb?token=15e82bfc4d2507595d3a038d15eda02d51652612b05a2c7e'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=5, redirect_output=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "def f():\n", + " time.sleep(np.random.uniform(0, 5))\n", + " return time.time()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.\n", + "\n", + "**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Processing result which finished after 1.5506346225738525 seconds.\n", + "Processing result which finished after 3.3468384742736816 seconds.\n", + "Processing result which finished after 3.8243601322174072 seconds.\n", + "Processing result which finished after 4.136775016784668 seconds.\n", + "Processing result which finished after 4.304465293884277 seconds.\n", + "Processing result which finished after 5.513854503631592 seconds.\n", + "Processing result which finished after 5.573364734649658 seconds.\n", + "Processing result which finished after 5.708314657211304 seconds.\n", + "Processing result which finished after 7.56557297706604 seconds.\n", + "Processing result which finished after 8.792317390441895 seconds.\n" + ] + } + ], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(2.0)\n", + "start_time = time.time()\n", + "\n", + "result_ids = [f.remote() for _ in range(10)]\n", + "\n", + "# Get the results.\n", + "results = []\n", + "while len(result_ids):\n", + " ready_id, remaining_ids = ray.wait(result_ids, num_returns= 1) # Get the Object ID which is ready in the object store\n", + " result = ray.get(ready_id[0])\n", + " results.append(result)\n", + " result_ids = remaining_ids # Reset the result_ids to contain those IDs which are yet to be fetched \n", + " print('Processing result which finished after {} seconds.'\n", + " .format(result - start_time))\n", + "\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 8.794857740402222 seconds.\n" + ] + } + ], + "source": [ + "assert results == sorted(results), ('The results were not processed in the '\n", + " 'order that they finished.')\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise07.ipynb b/exercises/solutions/exercise07.ipynb new file mode 100644 index 0000000..4c95701 --- /dev/null +++ b/exercises/solutions/exercise07.ipynb @@ -0,0 +1,260 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 7 - Introducing Actors\n", + "\n", + "**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.\n", + "\n", + "See the documentation on actors at http://ray.readthedocs.io/en/latest/actors.html.\n", + "\n", + "Sometimes you need a \"worker\" process to have \"state\". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.\n", + "\n", + "Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.\n", + "\n", + "This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.\n", + "\n", + "### Concepts for this Exercise - Actors\n", + "\n", + "To create an actor, decorate Python class with the `@ray.remote` decorator.\n", + "\n", + "```python\n", + "@ray.remote\n", + "class Example(object):\n", + " def __init__(self, x):\n", + " self.x = x\n", + " \n", + " def set(self, x):\n", + " self.x = x\n", + " \n", + " def get(self):\n", + " return self.x\n", + "```\n", + "\n", + "Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.\n", + "\n", + "Actor classes differ from regular Python classes in the following ways.\n", + "1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via\n", + " ```python\n", + " e = Example.remote(1)\n", + " ```\n", + " When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.\n", + "2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.\n", + " ```python\n", + " >>> e.set.remote(2)\n", + " ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)\n", + " \n", + " >>> e.get.remote()\n", + " ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)\n", + " ```\n", + "3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.\n", + " ```python\n", + " >>> ray.get(e.set.remote(2))\n", + " None\n", + " \n", + " >>> ray.get(e.get.remote())\n", + " 2\n", + " ```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:60259 to respond...\n", + "Waiting for redis server at 127.0.0.1:14249 to respond...\n", + "Starting local scheduler with 4 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8896/notebooks/ray_ui72516.ipynb?token=a0e3d1847986925abef24e707e97bc96ea79b2a23b601952\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler28125215'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store39176895', manager_name='/tmp/plasma_manager12644975', manager_port=37399)],\n", + " 'redis_address': '127.0.0.1:60259',\n", + " 'webui_url': 'http://localhost:8896/notebooks/ray_ui72516.ipynb?token=a0e3d1847986925abef24e707e97bc96ea79b2a23b601952'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "class Foo(object):\n", + " def __init__(self):\n", + " self.counter = 0\n", + "\n", + " def reset(self):\n", + " self.counter = 0\n", + "\n", + " def increment(self):\n", + " time.sleep(0.5)\n", + " self.counter += 1\n", + " return self.counter" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Create two Foo objects.\n", + "f1 = Foo.remote()\n", + "f2 = Foo.remote()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time)." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(2.0)\n", + "start_time = time.time()\n", + "\n", + "# Reset the actor state so that we can run this cell multiple times without\n", + "# changing the results.\n", + "f1.reset.remote()\n", + "f2.reset.remote()\n", + "\n", + "# We want to parallelize this code. However, it is not straightforward to\n", + "# make \"increment\" a remote function, because state is shared (the value of\n", + "# \"self.counter\") between subsequent calls to \"increment\". In this case, it\n", + "# makes sense to use actors.\n", + "results = []\n", + "for _ in range(5):\n", + " results.append(f1.increment.remote())\n", + " results.append(f2.increment.remote())\n", + "results = ray.get(results)\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 2.5244545936584473 seconds.\n" + ] + } + ], + "source": [ + "assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]\n", + "\n", + "assert duration < 3, ('The experiments ran in {} seconds. This is too '\n", + " 'slow.'.format(duration))\n", + "assert duration > 2.5, ('The experiments ran in {} seconds. This is too '\n", + " 'fast.'.format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise08.ipynb b/exercises/solutions/exercise08.ipynb new file mode 100644 index 0000000..b07cbed --- /dev/null +++ b/exercises/solutions/exercise08.ipynb @@ -0,0 +1,285 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 8 - Speed up Serialization\n", + "\n", + "**GOAL:** The goal of this exercise is to illustrate how to speed up serialization by using `ray.put`.\n", + "\n", + "### Concepts for this Exercise - ray.put\n", + "\n", + "Object IDs can be created in multiple ways.\n", + "- They are returned by remote function calls.\n", + "- They are returned by actor method calls.\n", + "- They are returned by `ray.put`.\n", + "\n", + "When an object is passed to `ray.put`, the object is serialized using the Apache Arrow format (see https://arrow.apache.org/ for more information about Arrow) and copied into a shared memory object store. This object will then be available to other workers on the same machine via shared memory. If it is needed by workers on another machine, it will be shipped under the hood.\n", + "\n", + "**When objects are passed into a remote function, Ray puts them in the object store under the hood.** That is, if `f` is a remote function, the code\n", + "\n", + "```python\n", + "x = np.zeros(1000)\n", + "f.remote(x)\n", + "```\n", + "\n", + "is essentially transformed under the hood to\n", + "\n", + "```python\n", + "x = np.zeros(1000)\n", + "x_id = ray.put(x)\n", + "f.remote(x_id)\n", + "```\n", + "\n", + "The call to `ray.put` copies the numpy array into the shared-memory object store, from where it can be read by all of the worker processes (without additional copying). However, if you do something like\n", + "\n", + "```python\n", + "for i in range(10):\n", + " f.remote(x)\n", + "```\n", + "\n", + "then 10 copies of the array will be placed into the object store. This takes up more memory in the object store than is necessary, and it also takes time to copy the array into the object store over and over. This can be made more efficient by placing the array in the object store only once as follows.\n", + "\n", + "```python\n", + "x_id = ray.put(x)\n", + "for i in range(10):\n", + " f.remote(x_id)\n", + "```\n", + "\n", + "In this exercise, you will speed up the code below and reduce the memory footprint by calling `ray.put` on the neural net weights before passing them into the remote functions.\n", + "\n", + "**WARNING:** This exercise requires a lot of memory to run. If this notebook is running within a Docker container, then the docker container must be started with a large shared-memory file system. This can be done by starting the docker container with the `--shm-size` flag." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import pickle\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:15512 to respond...\n", + "Waiting for redis server at 127.0.0.1:41291 to respond...\n", + "Starting local scheduler with 4 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8889/notebooks/ray_ui70347.ipynb?token=de93f9223bde678e9185265f74b2fb5baa081e508f9002c2\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler96385156'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store85348469', manager_name='/tmp/plasma_manager34179277', manager_port=55609)],\n", + " 'redis_address': '127.0.0.1:15512',\n", + " 'webui_url': 'http://localhost:8889/notebooks/ray_ui70347.ipynb?token=de93f9223bde678e9185265f74b2fb5baa081e508f9002c2'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define some neural net weights which will be passed into a number of tasks." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)\n", + " for i in range(50)}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Compare the time required to serialize the neural net weights and copy them into the object store using Ray versus the time required to pickle and unpickle the weights. The big win should be with the time required for *deserialization*.\n", + "\n", + "Note that when you call `ray.put`, in addition to serializing the object, we are copying it into shared memory where it can be efficiently accessed by other workers on the same machine.\n", + "\n", + "**NOTE:** You don't actually have to do anything here other than run the cell below and read the output.\n", + "\n", + "**NOTE:** Sometimes `ray.put` can be faster than `pickle.dumps`. This is because `ray.put` leverages multiple threads when serializing large objects. Note that this is not possible with `pickle`." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ray - serializing\n", + "CPU times: user 132 ms, sys: 240 ms, total: 372 ms\n", + "Wall time: 149 ms\n", + "\n", + "Ray - deserializing\n", + "CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n", + "Wall time: 3.29 ms\n", + "\n", + "pickle - serializing\n", + "CPU times: user 88 ms, sys: 84 ms, total: 172 ms\n", + "Wall time: 172 ms\n", + "\n", + "pickle - deserializing\n", + "CPU times: user 60 ms, sys: 28 ms, total: 88 ms\n", + "Wall time: 85.8 ms\n" + ] + } + ], + "source": [ + "print('Ray - serializing')\n", + "%time x_id = ray.put(neural_net_weights)\n", + "print('\\nRay - deserializing')\n", + "%time x_val = ray.get(x_id)\n", + "\n", + "print('\\npickle - serializing')\n", + "%time serialized = pickle.dumps(neural_net_weights)\n", + "print('\\npickle - deserializing')\n", + "%time deserialized = pickle.loads(serialized)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define a remote function which uses the neural net weights." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "def use_weights(weights, i):\n", + " return i" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** In the code below, use `ray.put` to avoid copying the neural net weights to the object store multiple times." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Sleep a little to improve the accuracy of the timing measurements below.\n", + "time.sleep(2.0)\n", + "start_time = time.time()\n", + "nn_weights_id = ray.put(neural_net_weights)\n", + "results = ray.get([use_weights.remote(nn_weights_id, i)\n", + " for i in range(20)])\n", + "\n", + "end_time = time.time()\n", + "duration = end_time - start_time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The example took 0.14200997352600098 seconds.\n" + ] + } + ], + "source": [ + "assert results == list(range(20))\n", + "assert duration < 1, ('The experiments ran in {} seconds. This is too '\n", + " 'slow.'.format(duration))\n", + "\n", + "print('Success! The example took {} seconds.'.format(duration))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise09.ipynb b/exercises/solutions/exercise09.ipynb new file mode 100644 index 0000000..90a98d0 --- /dev/null +++ b/exercises/solutions/exercise09.ipynb @@ -0,0 +1,245 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 9 - Using the GPU API\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to use GPUs with remote functions and actors.\n", + "\n", + "**NOTE:** These exercises are designed to run on a machine without GPUs.\n", + "\n", + "See the documentation on using Ray with GPUs http://ray.readthedocs.io/en/latest/using-ray-with-gpus.html.\n", + "\n", + "### Concepts for this Exercise - Using Ray with GPUs\n", + "\n", + "We can indicate that a remote function or an actor requires some GPUs using the `num_gpus` keyword.\n", + "\n", + "```python\n", + "@ray.remote(num_gpus=1)\n", + "def f():\n", + " # The command ray.get_gpu_ids() returns a list of the indices\n", + " # of the GPUs that this task can use (e.g., [0] or [1]).\n", + " ray.get_gpu_ids()\n", + "\n", + "@ray.remote(num_gpus=2)\n", + "class Foo(object):\n", + " def __init__(self):\n", + " # The command ray.get_gpu_ids() returns a list of the\n", + " # indices of the GPUs that this actor can use\n", + " # (e.g., [0, 1] or [3, 5]).\n", + " ray.get_gpu_ids()\n", + "```\n", + "\n", + "Then inside of the actor constructor and methods, we can get the IDs of the GPUs allocated for that actor with `ray.get_gpu_ids()`." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import numpy as np\n", + "import ray\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start Ray, note that we pass in `num_gpus=4`. Ray will assume this machine has 4 GPUs (even if it does not). When a task or actor requests a GPU, it will be assigned a GPU ID from the set `[0, 1, 2, 3]`. It is then the responsibility of the task or actor to make sure that it only uses that specific GPU (e.g., by setting the `CUDA_VISIBLE_DEVICES` environment variable)." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:11024 to respond...\n", + "Waiting for redis server at 127.0.0.1:41633 to respond...\n", + "Starting local scheduler with 4 CPUs, 2 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8890/notebooks/ray_ui62016.ipynb?token=7211e966d33cf92964d48d4868a298e6b612ff11f8de0e33\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler43381804'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store32689610', manager_name='/tmp/plasma_manager78625912', manager_port=53342)],\n", + " 'redis_address': '127.0.0.1:11024',\n", + " 'webui_url': 'http://localhost:8890/notebooks/ray_ui62016.ipynb?token=7211e966d33cf92964d48d4868a298e6b612ff11f8de0e33'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, num_gpus=2, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Change the remote function below to require one GPU.\n", + "\n", + "**NOTE:** This change does not make the remote function actually **use** the GPU, it simply **reserves** the GPU for use by the remote function. To actually use the GPU, the remote function would use a neural net library like TensorFlow or PyTorch after setting the `CUDA_VISIBLE_DEVICES` environment variable properly. This can be done as follows.\n", + "\n", + "```python\n", + "import os\n", + "os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()])\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote(num_gpus = 1)\n", + "def f():\n", + " time.sleep(0.5)\n", + " return ray.get_gpu_ids()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** This code checks that each task was assigned one GPU and that not more than two tasks are run at the same time (because we told Ray there are only two GPUs)." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sucess! The test passed.\n" + ] + } + ], + "source": [ + "start_time = time.time()\n", + "\n", + "gpu_ids = ray.get([f.remote() for _ in range(3)])\n", + "\n", + "end_time = time.time()\n", + "\n", + "for i in range(len(gpu_ids)):\n", + " assert len(gpu_ids[i]) == 1\n", + "\n", + "assert end_time - start_time > 1\n", + "\n", + "print('Sucess! The test passed.')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** The code below defines an actor. Make it require one GPU." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote(num_gpus = 1)\n", + "class Actor(object):\n", + " def __init__(self):\n", + " pass\n", + "\n", + " def get_gpu_ids(self):\n", + " return ray.get_gpu_ids()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** This code checks that the actor was assigned a GPU." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sucess! The test passed.\n" + ] + } + ], + "source": [ + "actor = Actor.remote()\n", + "\n", + "gpu_ids = ray.get(actor.get_gpu_ids.remote())\n", + "\n", + "assert len(gpu_ids) == 1\n", + "\n", + "print('Sucess! The test passed.')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/exercises/solutions/exercise10.ipynb b/exercises/solutions/exercise10.ipynb new file mode 100644 index 0000000..a845358 --- /dev/null +++ b/exercises/solutions/exercise10.ipynb @@ -0,0 +1,352 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Solution to Exercise 10 - Pass Neural Net Weights Between Processes\n", + "\n", + "**GOAL:** The goal of this exercise is to show how to send neural network weights between workers and the driver.\n", + "\n", + "For more details on using Ray with TensorFlow, see the documentation at http://ray.readthedocs.io/en/latest/using-ray-with-tensorflow.html.\n", + "\n", + "### Concepts for this Exercise - Getting and Setting Neural Net Weights\n", + "\n", + "Since pickling and unpickling a TensorFlow graph can be inefficient or may not work at all, it is most efficient to ship the weights between processes as a dictionary of numpy arrays (or as a flattened numpy array).\n", + "\n", + "We provide the helper class `ray.experimental.TensorFlowVariables` to help with getting and setting weights. Similar techniques should work other neural net libraries.\n", + "\n", + "Consider the following neural net definition.\n", + "\n", + "```python\n", + "import tensorflow as tf\n", + "\n", + "x_data = tf.placeholder(tf.float32, shape=[100])\n", + "y_data = tf.placeholder(tf.float32, shape=[100])\n", + "\n", + "w = tf.Variable(tf.random_uniform([1], -1.0, 1.0))\n", + "b = tf.Variable(tf.zeros([1]))\n", + "y = w * x_data + b\n", + "\n", + "loss = tf.reduce_mean(tf.square(y - y_data))\n", + "optimizer = tf.train.GradientDescentOptimizer(0.5)\n", + "grads = optimizer.compute_gradients(loss)\n", + "train = optimizer.apply_gradients(grads)\n", + "\n", + "init = tf.global_variables_initializer()\n", + "sess = tf.Session()\n", + "sess.run(init)\n", + "```\n", + "\n", + "Then we can use the helper class as follows.\n", + "\n", + "```python\n", + "variables = ray.experimental.TensorFlowVariables(loss, sess)\n", + "# Here 'weights' is a dictionary mapping variable names to the associated\n", + "# weights as a numpy array.\n", + "weights = variables.get_weights()\n", + "variables.set_weights(weights)\n", + "```\n", + "\n", + "Note that there are analogous methods `variables.get_flat` and `variables.set_flat`, which concatenate the weights as a single array insead of a dictionary.\n", + "\n", + "```python\n", + "# Here 'weights' is a numpy array of all of the neural net weights\n", + "# concatenated together.\n", + "weights = variables.get_flat()\n", + "variables.set_flat(weights)\n", + "```\n", + "\n", + "In this exercise, we will use an actor containing a neural network and implement methods to extract and set the neural net weights.\n", + "\n", + "**WARNING:** This exercise is more complex than previous exercises." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "import numpy as np\n", + "import ray\n", + "import tensorflow as tf\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for redis server at 127.0.0.1:33609 to respond...\n", + "Waiting for redis server at 127.0.0.1:28636 to respond...\n", + "Starting local scheduler with 4 CPUs, 0 GPUs\n", + "\n", + "======================================================================\n", + "View the web UI at http://localhost:8889/notebooks/ray_ui31983.ipynb?token=b834aeeca704f77f1929908c297e24a26db4b677d8c8ab42\n", + "======================================================================\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "{'local_scheduler_socket_names': ['/tmp/scheduler51901307'],\n", + " 'node_ip_address': '127.0.0.1',\n", + " 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store58908059', manager_name='/tmp/plasma_manager5142334', manager_port=36761)],\n", + " 'redis_address': '127.0.0.1:33609',\n", + " 'webui_url': 'http://localhost:8889/notebooks/ray_ui31983.ipynb?token=b834aeeca704f77f1929908c297e24a26db4b677d8c8ab42'}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(num_cpus=4, redirect_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The code below defines a class containing a simple neural network.\n", + "\n", + "**EXERCISE:** Implement the `set_weights` and `get_weights` methods. This should be done using the `ray.experimental.TensorFlowVariables` helper class." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "@ray.remote\n", + "class SimpleModel(object):\n", + " def __init__(self):\n", + " x_data = tf.placeholder(tf.float32, shape=[100])\n", + " y_data = tf.placeholder(tf.float32, shape=[100])\n", + "\n", + " w = tf.Variable(tf.random_uniform([1], -1.0, 1.0))\n", + " b = tf.Variable(tf.zeros([1]))\n", + " y = w * x_data + b\n", + "\n", + " self.loss = tf.reduce_mean(tf.square(y - y_data))\n", + " optimizer = tf.train.GradientDescentOptimizer(0.5)\n", + " grads = optimizer.compute_gradients(self.loss)\n", + " self.train = optimizer.apply_gradients(grads)\n", + "\n", + " init = tf.global_variables_initializer()\n", + " self.sess = tf.Session()\n", + "\n", + " # Here we create the TensorFlowVariables object to assist with getting\n", + " # and setting weights.\n", + " self.variables = ray.experimental.TensorFlowVariables(self.loss, self.sess)\n", + "\n", + " self.sess.run(init)\n", + "\n", + " def set_weights(self, weights):\n", + " \"\"\"Set the neural net weights.\n", + " \n", + " This method should assign the given weights to the neural net.\n", + " \n", + " Args:\n", + " weights: Either a dict mapping strings (the variable names) to numpy\n", + " arrays or a single flattened numpy array containing all of the\n", + " concatenated weights.\n", + " \"\"\"\n", + " # EXERCISE: You will want to use self.variables here.\n", + " self.variables.set_flat(weights)\n", + "\n", + " def get_weights(self):\n", + " \"\"\"Get the neural net weights.\n", + " \n", + " This method should return the current neural net weights.\n", + " \n", + " Returns:\n", + " Either a dict mapping strings (the variable names) to numpy arrays or\n", + " a single flattened numpy array containing all of the concatenated\n", + " weights.\n", + " \"\"\"\n", + " # EXERCISE: You will want to use self.variables here.\n", + " return self.variables.get_flat()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a few actors." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "actors = [SimpleModel.remote() for _ in range(4)]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Get the neural net weights from all of the actors." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[[-0.23027754 0. ]\n", + " [ 0.32967567 0. ]\n", + " [-0.94012499 0. ]\n", + " [ 0.72108579 0. ]]\n" + ] + } + ], + "source": [ + "weights_from_actors = ray.get([actor.get_weights.remote() for actor in actors])\n", + "print(np.asarray(weights_from_actors))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Average all of the neural net weights.\n", + "\n", + "**NOTE:** This will be easier to do if you chose to use `get_flat`/`set_flat` instead of `get_weights`/`set_weights` in the implementation of `SimpleModel.set_weights` and `SimpleModel.get_weights` above.." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[-0.02991027 0. ]\n" + ] + } + ], + "source": [ + "average_actor_weights = np.mean(np.asarray(weights_from_actors), axis = 0)\n", + "print(average_actor_weights)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**EXERCISE:** Set the average weights on the actors." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "[ObjectID(89e49b0f06226e9e797cfa005ef837dd416b9006),\n", + " ObjectID(2d795d801bbd190cc4061d8b4c0e2a0654da1d5a),\n", + " ObjectID(d0be81cddda738fef6336ce6a48e0344d81f3697),\n", + " ObjectID(16d01bc18993e8a566dd872b20341b790a0f3e9f)]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "[actor.set_weights.remote(average_actor_weights) for actor in actors]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**VERIFY:** Check that all of the actors have the same weights." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Success! The test passed.\n" + ] + } + ], + "source": [ + "weights = ray.get([actor.get_weights.remote() for actor in actors])\n", + "\n", + "for i in range(len(weights)):\n", + " np.testing.assert_equal(weights[i], weights[0])\n", + "\n", + "print('Success! The test passed.')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}