Compare commits

..

7 commits

12 changed files with 261 additions and 528 deletions

15
.gitignore vendored
View file

@ -1,12 +1,5 @@
# Compiled source
__pycache__
# Packages
*.gz
*.rar
*.tar
*.zip
# OS generated files
.DS_Store
notebooks/
/deployment_scripts/create-env.sh
/deployment_scripts/deploy-env.sh
/deployment_scripts/environment.yaml

138
README.md
View file

@ -1,49 +1,67 @@
# Ray: How to launch a Ray Cluster on Hawk?
# Dask: How to execute python workloads using a Dask cluster on Vulcan
This guide shows you how to launch a Ray cluster on HLRS' Hawk system.
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
## Table of Contents
- [Ray: How to launch a Ray Cluster on Hawk?](#ray-how-to-launch-a-ray-cluster-on-hawk)
- [Table of Contents](#table-of-contents)
- [Getting Started](#getting-started)
- [Launch a local Ray Cluster in Interactive Mode](#launch-a-local-ray-cluster-in-interactive-mode)
- [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode)
- [Usage](#usage)
- [Notes](#notes)
## Getting Started
**Step 1.** Build and transfer the Conda environment to Hawk:
### 1. Build and transfer the Conda environment to Vulcan:
Only the main and r channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Hawk.
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Vulcan.
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment to run Ray workflows.
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment.
**Step 2.** Allocate workspace on Hawk:
### 2. Allocate workspace on Vulcan:
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
```bash
ws_allocate hpda_project 10
ws_find hpda_project # find the path to workspace, which is the destination directory in the next step
ws_allocate dask_workspace 10
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
```
**Step 2.** Clone the repository on Hawk to use the deployment scripts and project structure:
### 3. Clone the repository on Vulcan to use the deployment scripts and project structure:
```bash
cd <workspace_directory>
git clone <repository_url>
```
## Launch a local Ray Cluster in Interactive Mode
Using a single node interactively provides opportunities for faster code debugging.
**Step 1.** On the Hawk login node, start an interactive job using:
### 4. Send all the code to the appropriate directory on Vulcan using `scp`:
```bash
qsub -I -l select=1:node_type=rome -l walltime=01:00:00
scp <your_script>.py <destination_host>:<destination_directory>
```
**Step 2.** Activate the Conda environment:
### 5. SSH into Vulcan and start a job interactively using:
```bash
qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
```
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. Follow section [Multiple Nodes](#multiple-nodes) for more information.
### 6. Go into the directory with all code:
```bash
cd <destination_directory>
```
### 7. Initialize the Dask cluster:
```bash
source deploy-dask.sh "$(pwd)"
```
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
Note: Make sure all permissions are set using `chmod +x` for all scripts.
## Usage
### Single Node
To run the application interactively on a single node, follow points 4, 5, 6 and, 7 from [Getting Started](#getting-started), and execute the following command after all the job has started:
```bash
# Load the Conda module
@ -54,70 +72,56 @@ source activate # activates the base environment
conda env list
# Activate a specific Conda environment.
conda activate ray_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
conda activate dask_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
```
**Step 3.** Initialize the Ray cluster.
You can use a Python interpreter to start a local Ray cluster:
```python
import ray
ray.init()
```
**Step 4.** Connect to the dashboard.
Warning: Do not change the default dashboard host `127.0.0.1` to keep Ray cluster reachable by only you.
Note: We recommend using a dedicated Firefox profile for accessing web-based services on HLRS Compute Platforms. If you haven't created a profile, check out our [guide](https://kb.hlrs.de/platforms/index.php/How_to_use_Web_Based_Services_on_HLRS_Compute_Platforms).
You need the job id and the hostname for your current job. You can obtain this information on the login node using:
After the environment is activated, you can run the python interpretor:
```bash
qstat -anw # get the job id and the hostname
python
```
Then, on your local computer,
Or to run a full script:
```bash
export PBS_JOBID=<job-id> # e.g., 2316419.hawk-pbs5
ssh <compute-host> # e.g., r38c3t8n3
python <your-script>.py
```
Check your SSH config in the first step if this doesn't work.
Then, launch Firefox web browser using the configured profile. Open `localhost:8265` to access the Ray dashboard.
## Launch a Ray Cluster in Batch Mode
Let us [estimate the value of π](https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html) as an example application.
**Step 1.** Add execution permissions to `start-ray-worker.sh`
### Multiple Nodes
To run the application on multiple nodes, you need to write a `.pbs` script and submit it using `qsub`. Follow lines 1-4 from the [Getting Started](#getting-started) section. Write a `submit-dask-job.pbs` script:
```bash
cd deployment_scripts
chmod +x start-ray-worker.sh
#!/bin/bash
#PBS -N dask-job
#PBS -l select=3:node_type=rome
#PBS -l walltime=1:00:00
#Go to the directory where the code is
cd <destination_directory>
#Deploy the Dask cluster
source deploy-dask.sh "$(pwd)"
#Run the python script
python <your-script>.py
```
**Step 2.** Submit a job to launch the head and worker nodes.
A more thorough example is available in the `deployment_scripts` directory under `submit-dask-job.pbs`.
You must modify the following lines in `submit-ray-job.sh`:
- Line 3 changes the cluster size. The default configuration launches a 3 node cluster.
- `export WS_DIR=<workspace_dir>` - set the correct workspace directory.
- `export PROJECT_DIR=$WS_DIR/<project_name>` - set the correct project directory.
Note: The job script `src/monte-carlo-pi.py` waits for all nodes in the Ray cluster to become available. Preserve this pattern in your Python code while using a multiple node Ray cluster.
Launch the job and monitor the progress. As the job starts, its status (S) shifts from Q (Queued) to R (Running). Upon completion, the job will no longer appear in the `qstat -a` display.
And then execute the following commands to submit the job:
```bash
qsub submit-ray-job.pbs
qsub submit-dask-job.pbs
qstat -anw # Q: Queued, R: Running, E: Ending
ls -l # list files after the job finishes
cat ray-job.o... # inspect the output file
cat ray-job.e... # inspect the error file
cat dask-job.o... # inspect the output file
cat dask-job.e... # inspect the error file
```
If you need to delete the job, use `qdel <job-id>`. If this doesn't work, use the `-W force` option: `qdel -W force <job-id>`
## Notes
Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster:
```python
client = Client(..., silence_logs='error')
```
Note: Replace all filenames within `<>` with the actual values applicable to your project.

View file

@ -0,0 +1,31 @@
#!/bin/bash
#Get the current workspace directory and the master node
export CURRENT_WORKSPACE=$1
export DASK_SCHEDULER_HOST=$2
# Path to localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
# Extract Dask environment in localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
#tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
#chmod -R 700 $DASK_ENV
# Start the dask environment
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
source $DASK_ENV/bin/activate
conda-unpack
# Start Dask worker
export DASK_SCHEDULER_PORT="8786" # Replace with the port on which the Dask scheduler is running
# Additional Dask worker options can be added here if needed
# Change local directory if memory is an issue
# Change directory to localscratch and start Dask worker
cd $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Starting Dask worker at $DASK_SCHEDULER_HOST on port $DASK_SCHEDULER_PORT"
dask worker $DASK_SCHEDULER_HOST:$DASK_SCHEDULER_PORT

View file

@ -0,0 +1,54 @@
#!/bin/bash
export CURRENT_WORKSPACE=$1
# Check if running in a PBS Job environment
if [ -z ${PBS_NODEFILE+x} ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] ERROR: This script is meant to run as a part of PBS Job. Don't start it at login nodes."
exit 1
fi
export NUM_NODES=$(wc -l < $PBS_NODEFILE)
if [ $NUM_NODES -lt 2 ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] WARNING: You have a single node job running. Dask cluster requires at least 2 nodes."
exit 1
fi
export ALL_NODES=$(cat $PBS_NODEFILE)
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
export DASK_SCHEDULER_PORT=8786
export DASK_UI_PORT=8787
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
# Path to localscratch
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
# Extract Dask environment in localscratch
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Setting up Dask environment"
# Start the dask environment
source $DASK_ENV/bin/activate
conda-unpack
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Scheduler at $SCHEDULER_NODE on port $DASK_SCHEDULER_PORT"
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
for ((i=1;i<$NUM_NODES;i++)); do
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
pbsdsh -n $i -o -- bash -l -c "source $CURRENT_WORKSPACE/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
done
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Dask cluster ready, wait for workers to connect to the scheduler."
# Optionally, you can provide a script for the workers to execute using ssh, similar to Spark.
# Example: ssh $node "source activate your_conda_env && python your_dask_worker_script.py" &

View file

@ -0,0 +1,51 @@
# Reference Guide: Dask Cluster Deployment Scripts
## Overview
This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script:
### Note: Permissions
Ensure that execution permissions (`chmod +x`) are granted to these scripts before attempting to run them. This can be done using the following command:
```bash
chmod +x script_name.sh
```
## Prerequisites
Before using these scripts, ensure that the following prerequisites are met:
1. **Conda Installation**: Ensure that Conda is installed on your local system. Follow the [official Conda installation guide](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html) if not already installed.
2. **PBS Job Scheduler**: The deployment scripts (`deploy-dask.sh` and `dask-worker.sh`) are designed for use with the PBS job scheduler. Modify accordingly if using a different job scheduler.
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
## 1. deploy-dask.sh
### Overview
`deploy-dask.sh` initiates the Dask cluster on an HPC environment using the PBS job scheduler. It extracts the Conda environment, activates it, and starts the Dask scheduler and workers on allocated nodes.
### Usage
```bash
./deploy-dask.sh <current_workspace_directory>
```
### Notes
- This script is designed for an HPC environment with PBS job scheduling.
- Modifications may be necessary for different job schedulers.
## 2. dask-worker.sh
### Overview
`dask-worker.sh` is a worker script designed to be executed on each allocated node. It sets up the Dask environment, extracts the Conda environment, activates it, and starts the Dask worker to connect to the scheduler. This script is not directly executed by the user.
### Notes
- Execute this script on each allocated node to connect them to the Dask scheduler.
- Designed for use with PBS job scheduling.

View file

@ -1,19 +0,0 @@
#!/bin/bash
if [ $# -ne 5 ]; then
echo "Usage: $0 <ws_dir> <env_path> <ray_address> <redis_password> <obj_store_memory>"
exit 1
fi
export WS_DIR=$1
export ENV_PATH=$2
export RAY_ADDRESS=$3
export REDIS_PASSWORD=$4
export OBJECT_STORE_MEMORY=$5
source $ENV_PATH/bin/activate
ray start --address=$RAY_ADDRESS \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY \
--block

View file

@ -0,0 +1,33 @@
#!/bin/bash
#PBS -N dask-job
#PBS -l select=2:node_type=rome
#PBS -l walltime=1:00:00
export PYTHON_FILE= # Path to the Python file you want to run
export CURRENT_WORKSPACE= # Path to the workspace where you have pulled this repo and the dask-env.tar.gz file
export ALL_NODES=$(cat $PBS_NODEFILE)
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
export DASK_SCHEDULER_PORT=8786
export DASK_UI_PORT=8787
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV
source $DASK_ENV/bin/activate
conda-unpack
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
for ((i=1;i<$NUM_NODES;i++)); do
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
pbsdsh -n $i -o -- bash -l -c "source /deplyment_scripts/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
done
python3 $PYTHON_FILE

View file

@ -1,44 +0,0 @@
#!/bin/bash
#PBS -N ray-job
#PBS -l select=2:node_type=rome
#PBS -l walltime=1:00:00
export WS_DIR=<workspace_dir>
export PROJECT_DIR=$WS_DIR/<project_name>
export ENV_PATH=<env_path>
export JOB_SCRIPT=monte-carlo-pi.py
export OBJECT_STORE_MEMORY=128000000000
# Environment variables after this line should not change
export SRC_DIR=$PROJECT_DIR/src
export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT
export DEPLOYMENT_SCRIPTS=$PROJECT_DIR/deployment_scripts
source $ENV_PATH/bin/activate
export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'`
export RAY_ADDRESS=$IP_ADDRESS:6379
export REDIS_PASSWORD=$(openssl rand -base64 32)
export NCCL_DEBUG=INFO
ray start --disable-usage-stats \
--head \
--node-ip-address=$IP_ADDRESS \
--port=6379 \
--dashboard-host=127.0.0.1 \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
for ((i=1;i<$NUM_NODES;i++)); do
pbsdsh -n $i -- bash -l -c "'$DEPLOYMENT_SCRIPTS/start-ray-worker.sh' '$WS_DIR' '$ENV_PATH' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" &
done
python3 $PYTHON_FILE
ray stop --grace-period 30

View file

@ -1,54 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ray"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ray.init()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster_resources = ray.available_resources()\n",
"available_cpu_cores = cluster_resources.get('CPU', 0)\n",
"print(cluster_resources)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ray",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

16
src/dask-example-pi.py Normal file
View file

@ -0,0 +1,16 @@
import dask.bag as db
import random
from dask.distributed import Client
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
NUM_SAMPLES=100
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
def calc_pi():
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
return 4.0 * count / NUM_SAMPLES
print(calc_pi())

View file

@ -1,89 +0,0 @@
# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html
import ray
import math
import time
import random
import os
# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 100
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
@ray.remote
class ProgressActor:
def __init__(self, total_num_samples: int):
self.total_num_samples = total_num_samples
self.num_samples_completed_per_task = {}
def report_progress(self, task_id: int, num_samples_completed: int) -> None:
self.num_samples_completed_per_task[task_id] = num_samples_completed
def get_progress(self) -> float:
return (
sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
)
@ray.remote
def sampling_task(num_samples: int, task_id: int,
progress_actor: ray.actor.ActorHandle) -> int:
num_inside = 0
for i in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
# Report progress every 1 million samples.
if (i + 1) % 1_000_000 == 0:
# This is async.
progress_actor.report_progress.remote(task_id, i + 1)
# Report the final progress.
progress_actor.report_progress.remote(task_id, num_samples)
return num_inside
def wait_for_nodes(expected_num_nodes: int):
while True:
num_nodes = len(ray.nodes())
if num_nodes >= expected_num_nodes:
break
print(f'Currently {num_nodes} nodes connected. Waiting for more...')
time.sleep(5) # wait for 5 seconds before checking again
if __name__ == "__main__":
num_nodes = int(os.environ["NUM_NODES"])
assert num_nodes > 1, "If the environment variable NUM_NODES is set, it should be greater than 1."
redis_password = os.environ["REDIS_PASSWORD"]
ray.init(address="auto", _redis_password=redis_password)
wait_for_nodes(num_nodes)
cluster_resources = ray.available_resources()
print(cluster_resources)
# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
# Create and execute all sampling tasks in parallel.
results = [
sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
for i in range(NUM_SAMPLING_TASKS)
]
# Query progress periodically.
while True:
progress = ray.get(progress_actor.get_progress.remote())
print(f"Progress: {int(progress * 100)}%")
if progress == 1:
break
time.sleep(1)
# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

View file

@ -1,243 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Train keras CNN on the CIFAR10 small images dataset.
The model comes from: https://zhuanlan.zhihu.com/p/29214791,
and it gets to about 87% validation accuracy in 100 epochs.
Note that the script requires a machine with 4 GPUs. You
can set {"gpu": 0} to use CPUs for training, although
it is less efficient.
"""
from __future__ import print_function
import argparse
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import (
Convolution2D,
Dense,
Dropout,
Flatten,
Input,
MaxPooling2D,
)
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from ray import train, tune
from ray.tune import Trainable
from ray.tune.schedulers import PopulationBasedTraining
num_classes = 10
NUM_SAMPLES = 128
class Cifar10Model(Trainable):
def _read_data(self):
# The data, split between train and test sets:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# Convert class vectors to binary class matrices.
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
x_train = x_train.astype("float32")
x_train /= 255
x_test = x_test.astype("float32")
x_test /= 255
return (x_train, y_train), (x_test, y_test)
def _build_model(self, input_shape):
x = Input(shape=(32, 32, 3))
y = x
y = Convolution2D(
filters=64,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = Convolution2D(
filters=64,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
y = Convolution2D(
filters=128,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = Convolution2D(
filters=128,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
y = Convolution2D(
filters=256,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = Convolution2D(
filters=256,
kernel_size=3,
strides=1,
padding="same",
activation="relu",
kernel_initializer="he_normal",
)(y)
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
y = Flatten()(y)
y = Dropout(self.config.get("dropout", 0.5))(y)
y = Dense(units=10, activation="softmax", kernel_initializer="he_normal")(y)
model = Model(inputs=x, outputs=y, name="model1")
return model
def setup(self, config):
self.train_data, self.test_data = self._read_data()
x_train = self.train_data[0]
model = self._build_model(x_train.shape[1:])
opt = tf.keras.optimizers.Adadelta(
lr=self.config.get("lr", 1e-4), weight_decay=self.config.get("decay", 1e-4)
)
model.compile(
loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"]
)
self.model = model
def step(self):
x_train, y_train = self.train_data
x_train, y_train = x_train[:NUM_SAMPLES], y_train[:NUM_SAMPLES]
x_test, y_test = self.test_data
x_test, y_test = x_test[:NUM_SAMPLES], y_test[:NUM_SAMPLES]
aug_gen = ImageDataGenerator(
# set input mean to 0 over the dataset
featurewise_center=False,
# set each sample mean to 0
samplewise_center=False,
# divide inputs by dataset std
featurewise_std_normalization=False,
# divide each input by its std
samplewise_std_normalization=False,
# apply ZCA whitening
zca_whitening=False,
# randomly rotate images in the range (degrees, 0 to 180)
rotation_range=0,
# randomly shift images horizontally (fraction of total width)
width_shift_range=0.1,
# randomly shift images vertically (fraction of total height)
height_shift_range=0.1,
# randomly flip images
horizontal_flip=True,
# randomly flip images
vertical_flip=False,
)
aug_gen.fit(x_train)
batch_size = self.config.get("batch_size", 64)
gen = aug_gen.flow(x_train, y_train, batch_size=batch_size)
self.model.fit_generator(
generator=gen, epochs=self.config.get("epochs", 1), validation_data=None
)
# loss, accuracy
_, accuracy = self.model.evaluate(x_test, y_test, verbose=0)
return {"mean_accuracy": accuracy}
def save_checkpoint(self, checkpoint_dir):
file_path = checkpoint_dir + "/model"
self.model.save(file_path)
def load_checkpoint(self, checkpoint_dir):
# See https://stackoverflow.com/a/42763323
del self.model
file_path = checkpoint_dir + "/model"
self.model = load_model(file_path)
def cleanup(self):
# If need, save your model when exit.
# saved_path = self.model.save(self.logdir)
# print("save model at: ", saved_path)
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing"
)
args, _ = parser.parse_known_args()
space = {
"epochs": 1,
"batch_size": 64,
"lr": tune.grid_search([10**-4, 10**-5]),
"decay": tune.sample_from(lambda spec: spec.config.lr / 100.0),
"dropout": tune.grid_search([0.25, 0.5]),
}
if args.smoke_test:
space["lr"] = 10**-4
space["dropout"] = 0.5
perturbation_interval = 10
pbt = PopulationBasedTraining(
time_attr="training_iteration",
perturbation_interval=perturbation_interval,
hyperparam_mutations={
"dropout": lambda _: np.random.uniform(0, 1),
},
)
tuner = tune.Tuner(
tune.with_resources(
Cifar10Model,
resources={"cpu": 1, "gpu": 1},
),
run_config=train.RunConfig(
name="pbt_cifar10",
stop={
"mean_accuracy": 0.80,
"training_iteration": 30,
},
checkpoint_config=train.CheckpointConfig(
checkpoint_frequency=perturbation_interval,
checkpoint_score_attribute="mean_accuracy",
num_to_keep=2,
),
),
tune_config=tune.TuneConfig(
scheduler=pbt,
num_samples=4,
metric="mean_accuracy",
mode="max",
reuse_actors=True,
),
param_space=space,
)
results = tuner.fit()
print("Best hyperparameters found were: ", results.get_best_result().config)