forked from SiVeGCS/dask_template
Compare commits
7 commits
Author | SHA1 | Date | |
---|---|---|---|
26cb3ed722 | |||
aa3d6b6799 | |||
1eb6024fd0 | |||
1e56496f56 | |||
3ceddcaed1 | |||
48328cc85f | |||
251f190e11 |
16 changed files with 217 additions and 551 deletions
15
.gitignore
vendored
15
.gitignore
vendored
|
@ -1,12 +1,5 @@
|
||||||
# Compiled source
|
|
||||||
__pycache__
|
__pycache__
|
||||||
|
notebooks/
|
||||||
|
/deployment_scripts/create-env.sh
|
||||||
# Packages
|
/deployment_scripts/deploy-env.sh
|
||||||
*.gz
|
/deployment_scripts/environment.yaml
|
||||||
*.rar
|
|
||||||
*.tar
|
|
||||||
*.zip
|
|
||||||
|
|
||||||
# OS generated files
|
|
||||||
.DS_Store
|
|
185
README.md
185
README.md
|
@ -1,162 +1,127 @@
|
||||||
# Ray: How to launch a Ray Cluster on Hawk?
|
# Dask: How to execute python workloads using a Dask cluster on Vulcan
|
||||||
|
|
||||||
This guide shows you how to launch a Ray cluster on HLRS' Hawk system.
|
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
|
||||||
|
|
||||||
## Table of Contents
|
## Table of Contents
|
||||||
- [Ray: How to launch a Ray Cluster on Hawk?](#ray-how-to-launch-a-ray-cluster-on-hawk)
|
|
||||||
- [Table of Contents](#table-of-contents)
|
|
||||||
- [Prerequisites](#prerequisites)
|
|
||||||
- [Getting Started](#getting-started)
|
- [Getting Started](#getting-started)
|
||||||
- [Launch a local Ray Cluster in Interactive Mode](#launch-a-local-ray-cluster-in-interactive-mode)
|
- [Usage](#usage)
|
||||||
- [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode)
|
- [Notes](#notes)
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
Before building the environment, make sure you have the following prerequisites:
|
|
||||||
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system.
|
|
||||||
- [Conda-Pack](https://conda.github.io/conda-pack/) installed in the base environment: Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to the target system.
|
|
||||||
- `linux-64` platform for installing the Conda packages because Conda/pip downloads and installs precompiled binaries suitable to the architecture and OS of the local environment.
|
|
||||||
|
|
||||||
For more information, look at the documentation for [Conda on HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters)
|
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
Only the main and r channels are available using the conda module on the clusters. To use custom packages, we need to move the local conda environment to Hawk.
|
### 1. Build and transfer the Conda environment to Vulcan:
|
||||||
|
|
||||||
**Step 1.** Clone this repository to your local machine:
|
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Vulcan.
|
||||||
|
|
||||||
|
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment.
|
||||||
|
|
||||||
|
### 2. Allocate workspace on Vulcan:
|
||||||
|
|
||||||
|
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
git clone <repository_url>
|
ws_allocate dask_workspace 10
|
||||||
|
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
|
||||||
```
|
```
|
||||||
|
|
||||||
**Step 2.** Go into the directory and create an environment using Conda and environment.yaml.
|
### 3. Clone the repository on Vulcan to use the deployment scripts and project structure:
|
||||||
|
|
||||||
Note: Be sure to add the necessary packages in `deployment_scripts/environment.yaml`:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd deployment_scripts
|
|
||||||
./create-env.sh <your-env>
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 3.** Package the environment and transfer the archive to the target system:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
(base) $ conda pack -n <your-env> -o ray_env.tar.gz # conda-pack must be installed in the base environment
|
|
||||||
```
|
|
||||||
|
|
||||||
A workspace is suitable to store the compressed Conda environment archive on Hawk. Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
ws_allocate hpda_project 10
|
|
||||||
ws_find hpda_project # find the path to workspace, which is the destination directory in the next step
|
|
||||||
```
|
|
||||||
|
|
||||||
You can send your data to an existing workspace using:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
scp ray_env.tar.gz <username>@hawk.hww.hlrs.de:<workspace_directory>
|
|
||||||
rm ray_env.tar.gz # We don't need the archive locally anymore.
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 4.** Clone the repository on Hawk to use the deployment scripts and project structure:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd <workspace_directory>
|
cd <workspace_directory>
|
||||||
git clone <repository_url>
|
git clone <repository_url>
|
||||||
```
|
```
|
||||||
|
|
||||||
## Launch a local Ray Cluster in Interactive Mode
|
### 4. Send all the code to the appropriate directory on Vulcan using `scp`:
|
||||||
|
|
||||||
Using a single node interactively provides opportunities for faster code debugging.
|
|
||||||
|
|
||||||
**Step 1.** On the Hawk login node, start an interactive job using:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
qsub -I -l select=1:node_type=rome -l walltime=01:00:00
|
scp <your_script>.py <destination_host>:<destination_directory>
|
||||||
```
|
```
|
||||||
|
|
||||||
**Step 2.** Go into the project directory:
|
### 5. SSH into Vulcan and start a job interactively using:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd <project_directory>/deployment_scripts
|
qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
|
||||||
```
|
```
|
||||||
|
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. Follow section [Multiple Nodes](#multiple-nodes) for more information.
|
||||||
|
|
||||||
**Step 3.** Deploy the conda environment to the ram disk:
|
### 6. Go into the directory with all code:
|
||||||
|
|
||||||
Change the following line by editing `deploy-env.sh`:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
export WS_DIR=<workspace_dir>
|
cd <destination_directory>
|
||||||
```
|
```
|
||||||
|
|
||||||
Then, use the following command to deploy and activate the environment:
|
### 7. Initialize the Dask cluster:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
source deploy-env.sh
|
source deploy-dask.sh "$(pwd)"
|
||||||
```
|
```
|
||||||
Note: Make sure all permissions are set using `chmod +x`.
|
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
|
||||||
|
Note: Make sure all permissions are set using `chmod +x` for all scripts.
|
||||||
|
|
||||||
**Step 4.** Initialize the Ray cluster.
|
## Usage
|
||||||
|
|
||||||
You can use a Python interpreter to start a local Ray cluster:
|
### Single Node
|
||||||
|
To run the application interactively on a single node, follow points 4, 5, 6 and, 7 from [Getting Started](#getting-started), and execute the following command after all the job has started:
|
||||||
```python
|
|
||||||
import ray
|
|
||||||
|
|
||||||
ray.init()
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 5.** Connect to the dashboard.
|
|
||||||
|
|
||||||
Warning: Do not change the default dashboard host `127.0.0.1` to keep Ray cluster reachable by only you.
|
|
||||||
|
|
||||||
Note: We recommend using a dedicated Firefox profile for accessing web-based services on HLRS Compute Platforms. If you haven't created a profile, check out our [guide](https://kb.hlrs.de/platforms/index.php/How_to_use_Web_Based_Services_on_HLRS_Compute_Platforms).
|
|
||||||
|
|
||||||
You need the job id and the hostname for your current job. You can obtain this information on the login node using:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
qstat -anw # get the job id and the hostname
|
# Load the Conda module
|
||||||
|
module load bigdata/conda
|
||||||
|
source activate # activates the base environment
|
||||||
|
|
||||||
|
# List available Conda environments for verification purposes
|
||||||
|
conda env list
|
||||||
|
|
||||||
|
# Activate a specific Conda environment.
|
||||||
|
conda activate dask_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
|
||||||
```
|
```
|
||||||
|
|
||||||
Then, on your local computer,
|
After the environment is activated, you can run the python interpretor:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
export PBS_JOBID=<job-id> # e.g., 2316419.hawk-pbs5
|
python
|
||||||
ssh <compute-host> # e.g., r38c3t8n3
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Check your SSH config in the first step if this doesn't work.
|
Or to run a full script:
|
||||||
|
|
||||||
Then, launch Firefox web browser using the configured profile. Open `localhost:8265` to access the Ray dashboard.
|
|
||||||
|
|
||||||
## Launch a Ray Cluster in Batch Mode
|
|
||||||
|
|
||||||
Let us [estimate the value of π](https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html) as an example application.
|
|
||||||
|
|
||||||
**Step 1.** Add execution permissions to `start-ray-worker.sh`
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd deployment_scripts
|
python <your-script>.py
|
||||||
chmod +x start-ray-worker.sh
|
|
||||||
```
|
```
|
||||||
|
|
||||||
**Step 2.** Submit a job to launch the head and worker nodes.
|
### Multiple Nodes
|
||||||
|
To run the application on multiple nodes, you need to write a `.pbs` script and submit it using `qsub`. Follow lines 1-4 from the [Getting Started](#getting-started) section. Write a `submit-dask-job.pbs` script:
|
||||||
You must modify the following lines in `submit-ray-job.sh`:
|
|
||||||
- Line 3 changes the cluster size. The default configuration launches a 3 node cluster.
|
|
||||||
- `export WS_DIR=<workspace_dir>` - set the correct workspace directory.
|
|
||||||
- `export PROJECT_DIR=$WS_DIR/<project_name>` - set the correct project directory.
|
|
||||||
|
|
||||||
Note: The job script `src/monte-carlo-pi.py` waits for all nodes in the Ray cluster to become available. Preserve this pattern in your Python code while using a multiple node Ray cluster.
|
|
||||||
|
|
||||||
Launch the job and monitor the progress. As the job starts, its status (S) shifts from Q (Queued) to R (Running). Upon completion, the job will no longer appear in the `qstat -a` display.
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
qsub submit-ray-job.pbs
|
#!/bin/bash
|
||||||
|
#PBS -N dask-job
|
||||||
|
#PBS -l select=3:node_type=rome
|
||||||
|
#PBS -l walltime=1:00:00
|
||||||
|
|
||||||
|
#Go to the directory where the code is
|
||||||
|
cd <destination_directory>
|
||||||
|
|
||||||
|
#Deploy the Dask cluster
|
||||||
|
source deploy-dask.sh "$(pwd)"
|
||||||
|
|
||||||
|
#Run the python script
|
||||||
|
python <your-script>.py
|
||||||
|
```
|
||||||
|
|
||||||
|
A more thorough example is available in the `deployment_scripts` directory under `submit-dask-job.pbs`.
|
||||||
|
|
||||||
|
And then execute the following commands to submit the job:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
qsub submit-dask-job.pbs
|
||||||
qstat -anw # Q: Queued, R: Running, E: Ending
|
qstat -anw # Q: Queued, R: Running, E: Ending
|
||||||
ls -l # list files after the job finishes
|
ls -l # list files after the job finishes
|
||||||
cat ray-job.o... # inspect the output file
|
cat dask-job.o... # inspect the output file
|
||||||
cat ray-job.e... # inspect the error file
|
cat dask-job.e... # inspect the error file
|
||||||
```
|
```
|
||||||
|
|
||||||
If you need to delete the job, use `qdel <job-id>`. If this doesn't work, use the `-W force` option: `qdel -W force <job-id>`
|
## Notes
|
||||||
|
|
||||||
|
Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster:
|
||||||
|
```python
|
||||||
|
client = Client(..., silence_logs='error')
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: Replace all filenames within `<>` with the actual values applicable to your project.
|
|
@ -1,23 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Display usage
|
|
||||||
if [ "$#" -ne 1 ]; then
|
|
||||||
echo "Usage: $0 <conda_environment_name>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Name of the Conda environment
|
|
||||||
CONDA_ENV_NAME=$1
|
|
||||||
|
|
||||||
# Check if the Conda environment already exists
|
|
||||||
if conda env list | grep -q "$CONDA_ENV_NAME"; then
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' already exists."
|
|
||||||
|
|
||||||
else
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
|
|
||||||
|
|
||||||
# Create Conda environment
|
|
||||||
CONDA_SUBDIR=linux-64 conda env create --name $CONDA_ENV_NAME -f environment.yaml
|
|
||||||
fi
|
|
31
deployment_scripts/dask-worker.sh
Normal file
31
deployment_scripts/dask-worker.sh
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
#Get the current workspace directory and the master node
|
||||||
|
export CURRENT_WORKSPACE=$1
|
||||||
|
export DASK_SCHEDULER_HOST=$2
|
||||||
|
|
||||||
|
# Path to localscratch
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
||||||
|
export DASK_ENV="$HOME/dask"
|
||||||
|
mkdir -p $DASK_ENV
|
||||||
|
|
||||||
|
# Extract Dask environment in localscratch
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
|
||||||
|
#tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
||||||
|
#chmod -R 700 $DASK_ENV
|
||||||
|
|
||||||
|
# Start the dask environment
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
||||||
|
source $DASK_ENV/bin/activate
|
||||||
|
conda-unpack
|
||||||
|
|
||||||
|
# Start Dask worker
|
||||||
|
export DASK_SCHEDULER_PORT="8786" # Replace with the port on which the Dask scheduler is running
|
||||||
|
|
||||||
|
# Additional Dask worker options can be added here if needed
|
||||||
|
# Change local directory if memory is an issue
|
||||||
|
|
||||||
|
# Change directory to localscratch and start Dask worker
|
||||||
|
cd $DASK_ENV
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Starting Dask worker at $DASK_SCHEDULER_HOST on port $DASK_SCHEDULER_PORT"
|
||||||
|
dask worker $DASK_SCHEDULER_HOST:$DASK_SCHEDULER_PORT
|
54
deployment_scripts/deploy-dask.sh
Normal file
54
deployment_scripts/deploy-dask.sh
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export CURRENT_WORKSPACE=$1
|
||||||
|
|
||||||
|
# Check if running in a PBS Job environment
|
||||||
|
if [ -z ${PBS_NODEFILE+x} ]; then
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] ERROR: This script is meant to run as a part of PBS Job. Don't start it at login nodes."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
export NUM_NODES=$(wc -l < $PBS_NODEFILE)
|
||||||
|
|
||||||
|
if [ $NUM_NODES -lt 2 ]; then
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] WARNING: You have a single node job running. Dask cluster requires at least 2 nodes."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
export ALL_NODES=$(cat $PBS_NODEFILE)
|
||||||
|
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
|
||||||
|
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
|
||||||
|
|
||||||
|
export DASK_SCHEDULER_PORT=8786
|
||||||
|
export DASK_UI_PORT=8787
|
||||||
|
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
|
||||||
|
# Path to localscratch
|
||||||
|
export DASK_ENV="$HOME/dask"
|
||||||
|
mkdir -p $DASK_ENV
|
||||||
|
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
|
||||||
|
# Extract Dask environment in localscratch
|
||||||
|
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
||||||
|
chmod -R 700 $DASK_ENV
|
||||||
|
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Setting up Dask environment"
|
||||||
|
# Start the dask environment
|
||||||
|
source $DASK_ENV/bin/activate
|
||||||
|
conda-unpack
|
||||||
|
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Scheduler at $SCHEDULER_NODE on port $DASK_SCHEDULER_PORT"
|
||||||
|
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
|
||||||
|
|
||||||
|
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
||||||
|
|
||||||
|
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
|
||||||
|
for ((i=1;i<$NUM_NODES;i++)); do
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
|
||||||
|
pbsdsh -n $i -o -- bash -l -c "source $CURRENT_WORKSPACE/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Dask cluster ready, wait for workers to connect to the scheduler."
|
||||||
|
|
||||||
|
# Optionally, you can provide a script for the workers to execute using ssh, similar to Spark.
|
||||||
|
# Example: ssh $node "source activate your_conda_env && python your_dask_worker_script.py" &
|
|
@ -1,43 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
export WS_DIR=<workspace_dir>
|
|
||||||
|
|
||||||
# Get the first character of the hostname
|
|
||||||
first_char=$(hostname | cut -c1)
|
|
||||||
|
|
||||||
# Check if the first character is not "r"
|
|
||||||
if [[ $first_char != "r" ]]; then
|
|
||||||
# it's not a cpu node.
|
|
||||||
echo "Hostname does not start with 'r'."
|
|
||||||
# Get the first seven characters of the hostname
|
|
||||||
first_seven_chars=$(hostname | cut -c1,2,3,4,5,6,7)
|
|
||||||
# Check if it is an ai node
|
|
||||||
if [[ $first_seven_chars != "hawk-ai" ]]; then
|
|
||||||
echo "Hostname does not start with 'hawk-ai' too. Exiting."
|
|
||||||
return 1
|
|
||||||
else
|
|
||||||
echo "GPU node detected."
|
|
||||||
export OBJ_STR_MEMORY=350000000000
|
|
||||||
export TEMP_CHECKPOINT_DIR=/localscratch/$PBS_JOBID/model_checkpoints/
|
|
||||||
mkdir -p $TEMP_CHECKPOINT_DIR
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
echo "CPU node detected."
|
|
||||||
fi
|
|
||||||
|
|
||||||
module load bigdata/conda
|
|
||||||
|
|
||||||
export RAY_DEDUP_LOGS=0
|
|
||||||
|
|
||||||
export ENV_ARCHIVE=ray_env.tar.gz
|
|
||||||
export CONDA_ENVS=/run/user/$PBS_JOBID/envs
|
|
||||||
export ENV_NAME=ray_env
|
|
||||||
export ENV_PATH=$CONDA_ENVS/$ENV_NAME
|
|
||||||
|
|
||||||
mkdir -p $ENV_PATH
|
|
||||||
|
|
||||||
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
|
|
||||||
|
|
||||||
source $ENV_PATH/bin/activate
|
|
||||||
|
|
||||||
export CONDA_ENVS_PATH=CONDA_ENVS
|
|
|
@ -1,18 +1,4 @@
|
||||||
# Reference: Cluster Deployment Scripts
|
# Reference Guide: Dask Cluster Deployment Scripts
|
||||||
|
|
||||||
Wiki link:
|
|
||||||
|
|
||||||
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
|
|
||||||
|
|
||||||
Structure:
|
|
||||||
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
|
|
||||||
- [ ] [How-to guide](https://diataxis.fr/how-to-guides/)
|
|
||||||
- [x] [Reference](https://diataxis.fr/reference/)
|
|
||||||
- [ ] [Explanation](https://diataxis.fr/explanation/)
|
|
||||||
|
|
||||||
To do:
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
|
@ -36,40 +22,7 @@ Before using these scripts, ensure that the following prerequisites are met:
|
||||||
|
|
||||||
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
|
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
|
||||||
|
|
||||||
## 1. create-env.sh
|
## 1. deploy-dask.sh
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`create-env.sh` is designed to create a Conda environment. It checks for the existence of the specified environment and either creates it or notifies the user if it already exists.
|
|
||||||
Note: Define your Conda environment in `environment.yaml` before running this script.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./create-env.sh <conda_environment_name>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system where Conda is installed.
|
|
||||||
|
|
||||||
## 2. deploy-env.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`deploy-env.sh` is responsible for deploying the Conda environment to a remote server. If the tar.gz file already exists, it is copied; otherwise, it is created before being transferred.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./deploy-env.sh <environment_name> <destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system.
|
|
||||||
|
|
||||||
## 3. deploy-dask.sh
|
|
||||||
|
|
||||||
### Overview
|
### Overview
|
||||||
|
|
||||||
|
@ -86,7 +39,7 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
|
||||||
- This script is designed for an HPC environment with PBS job scheduling.
|
- This script is designed for an HPC environment with PBS job scheduling.
|
||||||
- Modifications may be necessary for different job schedulers.
|
- Modifications may be necessary for different job schedulers.
|
||||||
|
|
||||||
## 4. dask-worker.sh
|
## 2. dask-worker.sh
|
||||||
|
|
||||||
### Overview
|
### Overview
|
||||||
|
|
||||||
|
@ -96,9 +49,3 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
|
||||||
|
|
||||||
- Execute this script on each allocated node to connect them to the Dask scheduler.
|
- Execute this script on each allocated node to connect them to the Dask scheduler.
|
||||||
- Designed for use with PBS job scheduling.
|
- Designed for use with PBS job scheduling.
|
||||||
|
|
||||||
## Workflow
|
|
||||||
|
|
||||||
1. **Create Conda Environment**: Execute `create-env.sh` to create a Conda environment locally.
|
|
||||||
2. **Deploy Conda Environment**: Execute `deploy-env.sh` to deploy the Conda environment to a remote server.
|
|
||||||
3. **Deploy Dask Cluster**: Execute `deploy-dask.sh` to start the Dask cluster on an HPC environment.
|
|
|
@ -1,23 +0,0 @@
|
||||||
name: ray
|
|
||||||
channels:
|
|
||||||
- defaults
|
|
||||||
dependencies:
|
|
||||||
- python=3.10
|
|
||||||
- pip
|
|
||||||
- pip:
|
|
||||||
- ray==2.8.0
|
|
||||||
- "ray[default]==2.8.0"
|
|
||||||
- dask==2022.10.1
|
|
||||||
- torch
|
|
||||||
- pydantic<2
|
|
||||||
- six
|
|
||||||
- torch
|
|
||||||
- tqdm
|
|
||||||
- pandas<2
|
|
||||||
- scikit-learn
|
|
||||||
- matplotlib
|
|
||||||
- optuna
|
|
||||||
- seaborn
|
|
||||||
- tabulate
|
|
||||||
- jupyterlab
|
|
||||||
- autopep8
|
|
|
@ -1,27 +0,0 @@
|
||||||
FROM python:3.9
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------
|
|
||||||
# Install Ray and essential packages.
|
|
||||||
# For more information on Ray installation, see:
|
|
||||||
# https://docs.ray.io/en/latest/ray-overview/installation.html
|
|
||||||
# Install the latest Dask versions that are compatible with
|
|
||||||
# Ray nightly. For more information, see:
|
|
||||||
# https://docs.ray.io/en/latest/data/dask-on-ray.html
|
|
||||||
# -------------------------------------------------------------------
|
|
||||||
|
|
||||||
RUN pip install --no-cache-dir \
|
|
||||||
"ray==2.8.0" \
|
|
||||||
"ray[default]==2.8.0" \
|
|
||||||
"dask==2022.10.1" \
|
|
||||||
torch \
|
|
||||||
"pydantic<2" \
|
|
||||||
six \
|
|
||||||
"tqdm<2" \
|
|
||||||
"pandas<2" \
|
|
||||||
scikit-learn \
|
|
||||||
matplotlib \
|
|
||||||
optuna \
|
|
||||||
seaborn \
|
|
||||||
tabulate \
|
|
||||||
jupyterlab \
|
|
||||||
autopep8
|
|
|
@ -1,26 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
if [ $# -ne 5 ]; then
|
|
||||||
echo "Usage: $0 <ws_dir> <env_archive> <ray_address> <redis_password> <obj_store_memory>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
export WS_DIR=$1
|
|
||||||
export ENV_ARCHIVE=$2
|
|
||||||
export RAY_ADDRESS=$3
|
|
||||||
export REDIS_PASSWORD=$4
|
|
||||||
export OBJECT_STORE_MEMORY=$5
|
|
||||||
|
|
||||||
export ENV_PATH=/run/user/$PBS_JOBID/ray_env # We use the ram disk to extract the environment packages since a large number of files decreases the performance of the parallel file system.
|
|
||||||
|
|
||||||
mkdir -p $ENV_PATH
|
|
||||||
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
|
|
||||||
source $ENV_PATH/bin/activate
|
|
||||||
conda-unpack
|
|
||||||
|
|
||||||
ray start --address=$RAY_ADDRESS \
|
|
||||||
--redis-password=$REDIS_PASSWORD \
|
|
||||||
--object-store-memory=$OBJECT_STORE_MEMORY \
|
|
||||||
--block
|
|
||||||
|
|
||||||
rm -rf $ENV_PATH # It's nice to clean up before you terminate the job
|
|
33
deployment_scripts/submit-dask-job.pbs
Normal file
33
deployment_scripts/submit-dask-job.pbs
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#PBS -N dask-job
|
||||||
|
#PBS -l select=2:node_type=rome
|
||||||
|
#PBS -l walltime=1:00:00
|
||||||
|
|
||||||
|
export PYTHON_FILE= # Path to the Python file you want to run
|
||||||
|
export CURRENT_WORKSPACE= # Path to the workspace where you have pulled this repo and the dask-env.tar.gz file
|
||||||
|
|
||||||
|
export ALL_NODES=$(cat $PBS_NODEFILE)
|
||||||
|
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
|
||||||
|
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
|
||||||
|
|
||||||
|
export DASK_SCHEDULER_PORT=8786
|
||||||
|
export DASK_UI_PORT=8787
|
||||||
|
|
||||||
|
export DASK_ENV="$HOME/dask"
|
||||||
|
mkdir -p $DASK_ENV
|
||||||
|
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
||||||
|
chmod -R 700 $DASK_ENV
|
||||||
|
|
||||||
|
source $DASK_ENV/bin/activate
|
||||||
|
conda-unpack
|
||||||
|
|
||||||
|
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
|
||||||
|
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
||||||
|
|
||||||
|
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
|
||||||
|
for ((i=1;i<$NUM_NODES;i++)); do
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
|
||||||
|
pbsdsh -n $i -o -- bash -l -c "source /deplyment_scripts/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
|
||||||
|
done
|
||||||
|
|
||||||
|
python3 $PYTHON_FILE
|
|
@ -1,50 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
#PBS -N ray-job
|
|
||||||
#PBS -l select=2:node_type=rome
|
|
||||||
#PBS -l walltime=1:00:00
|
|
||||||
|
|
||||||
export WS_DIR=<workspace_dir>
|
|
||||||
export PROJECT_DIR=$WS_DIR/<project_name>
|
|
||||||
export JOB_SCRIPT=monte-carlo-pi.py
|
|
||||||
|
|
||||||
export ENV_ARCHIVE=ray_env.tar.gz
|
|
||||||
|
|
||||||
export OBJECT_STORE_MEMORY=128000000000
|
|
||||||
|
|
||||||
# Environment variables after this line should not change
|
|
||||||
|
|
||||||
export SRC_DIR=$PROJECT_DIR/src
|
|
||||||
export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT
|
|
||||||
export DEPLOYMENT_SCRIPTS=$PROJECT_DIR/deployment_scripts
|
|
||||||
export ENV_PATH=/run/user/$PBS_JOBID/ray_env # We use the ram disk to extract the environment packages since a large number of files decreases the performance of the parallel file system.
|
|
||||||
|
|
||||||
mkdir -p $ENV_PATH
|
|
||||||
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH # This line extracts the packages to ram disk.
|
|
||||||
source $ENV_PATH/bin/activate
|
|
||||||
|
|
||||||
export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'`
|
|
||||||
|
|
||||||
export RAY_ADDRESS=$IP_ADDRESS:6379
|
|
||||||
export REDIS_PASSWORD=$(openssl rand -base64 32)
|
|
||||||
|
|
||||||
export NCCL_DEBUG=INFO
|
|
||||||
|
|
||||||
ray start --disable-usage-stats \
|
|
||||||
--head \
|
|
||||||
--node-ip-address=$IP_ADDRESS \
|
|
||||||
--port=6379 \
|
|
||||||
--dashboard-host=127.0.0.1 \
|
|
||||||
--redis-password=$REDIS_PASSWORD \
|
|
||||||
--object-store-memory=$OBJECT_STORE_MEMORY
|
|
||||||
|
|
||||||
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
|
||||||
|
|
||||||
for ((i=1;i<$NUM_NODES;i++)); do
|
|
||||||
pbsdsh -n $i -- bash -l -c "'$DEPLOYMENT_SCRIPTS/start-ray-worker.sh' '$WS_DIR' '$ENV_ARCHIVE' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" &
|
|
||||||
done
|
|
||||||
|
|
||||||
python3 $PYTHON_FILE
|
|
||||||
|
|
||||||
ray stop --grace-period 30
|
|
||||||
|
|
||||||
rm -rf $ENV_PATH # It's nice to clean up before you terminate the job.
|
|
|
@ -1,54 +0,0 @@
|
||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import ray"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"ray.init()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"cluster_resources = ray.available_resources()\n",
|
|
||||||
"available_cpu_cores = cluster_resources.get('CPU', 0)\n",
|
|
||||||
"print(cluster_resources)"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "ray",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.10.13"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
|
@ -1,38 +0,0 @@
|
||||||
Create the container on the login node:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
export WS_DIR=$(ws_find workspace_dir) # adjust this
|
|
||||||
cd $WS_DIR
|
|
||||||
wget https://fex.hlrs.de/fop/FYaJqyzw/ray.tar # download the container archive
|
|
||||||
export CONTAINER_NAME=ray
|
|
||||||
export CONTAINER_TAG=latest
|
|
||||||
export UDOCKER_DIR="$WS_DIR/.udocker/" # to store the image layers
|
|
||||||
udocker images -l # this will create a repo the first time you use it
|
|
||||||
udocker rmi $CONTAINER_NAME:$CONTAINER_TAG # results in error since the image does not exist
|
|
||||||
udocker load -i $WS_DIR/$CONTAINER_NAME.tar $CONTAINER_NAME
|
|
||||||
rm /$WS_DIR/$CONTAINER_NAME.tar # you no longer need the tar archive
|
|
||||||
```
|
|
||||||
|
|
||||||
Allocate a CPU node, and then:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
module load bigdata/udocker/1.3.4
|
|
||||||
export WS_DIR=$(ws_find workspace_dir) # adjust this
|
|
||||||
export UDOCKER_DIR="$WS_DIR/.udocker/"
|
|
||||||
export UDOCKER_CONTAINERS="/run/user/$PBS_JOBID/udocker/containers"
|
|
||||||
mkdir -p $UDOCKER_CONTAINERS
|
|
||||||
mkdir -p /run/user/$PBS_JOBID/tmp
|
|
||||||
export CONTAINER_NAME=ray
|
|
||||||
export CONTAINER_TAG=latest
|
|
||||||
udocker create --name=$CONTAINER_NAME:$CONTAINER_TAG
|
|
||||||
udocker ps
|
|
||||||
udocker run --volume $WS_DIR:/workspace --volume /run/user/$PBS_JOBID/tmp:/tmp $CONTAINER_NAME
|
|
||||||
```
|
|
||||||
|
|
||||||
You should see a Python shell.
|
|
||||||
|
|
||||||
```python
|
|
||||||
import ray
|
|
||||||
# ray.init(num_cpus=4) # Works with a small number of CPUs
|
|
||||||
ray.init() # But, it can't use all the available CPUs
|
|
||||||
```
|
|
16
src/dask-example-pi.py
Normal file
16
src/dask-example-pi.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import dask.bag as db
|
||||||
|
import random
|
||||||
|
from dask.distributed import Client
|
||||||
|
|
||||||
|
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
||||||
|
NUM_SAMPLES=100
|
||||||
|
|
||||||
|
def inside(p):
|
||||||
|
x, y = random.random(), random.random()
|
||||||
|
return x*x + y*y < 1
|
||||||
|
|
||||||
|
def calc_pi():
|
||||||
|
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
|
||||||
|
return 4.0 * count / NUM_SAMPLES
|
||||||
|
|
||||||
|
print(calc_pi())
|
|
@ -1,89 +0,0 @@
|
||||||
# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html
|
|
||||||
|
|
||||||
import ray
|
|
||||||
import math
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Change this to match your cluster scale.
|
|
||||||
NUM_SAMPLING_TASKS = 100
|
|
||||||
NUM_SAMPLES_PER_TASK = 10_000_000
|
|
||||||
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
|
|
||||||
|
|
||||||
@ray.remote
|
|
||||||
class ProgressActor:
|
|
||||||
def __init__(self, total_num_samples: int):
|
|
||||||
self.total_num_samples = total_num_samples
|
|
||||||
self.num_samples_completed_per_task = {}
|
|
||||||
|
|
||||||
def report_progress(self, task_id: int, num_samples_completed: int) -> None:
|
|
||||||
self.num_samples_completed_per_task[task_id] = num_samples_completed
|
|
||||||
|
|
||||||
def get_progress(self) -> float:
|
|
||||||
return (
|
|
||||||
sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
|
|
||||||
)
|
|
||||||
|
|
||||||
@ray.remote
|
|
||||||
def sampling_task(num_samples: int, task_id: int,
|
|
||||||
progress_actor: ray.actor.ActorHandle) -> int:
|
|
||||||
num_inside = 0
|
|
||||||
for i in range(num_samples):
|
|
||||||
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
|
|
||||||
if math.hypot(x, y) <= 1:
|
|
||||||
num_inside += 1
|
|
||||||
|
|
||||||
# Report progress every 1 million samples.
|
|
||||||
if (i + 1) % 1_000_000 == 0:
|
|
||||||
# This is async.
|
|
||||||
progress_actor.report_progress.remote(task_id, i + 1)
|
|
||||||
|
|
||||||
# Report the final progress.
|
|
||||||
progress_actor.report_progress.remote(task_id, num_samples)
|
|
||||||
return num_inside
|
|
||||||
|
|
||||||
def wait_for_nodes(expected_num_nodes: int):
|
|
||||||
while True:
|
|
||||||
num_nodes = len(ray.nodes())
|
|
||||||
if num_nodes >= expected_num_nodes:
|
|
||||||
break
|
|
||||||
print(f'Currently {num_nodes} nodes connected. Waiting for more...')
|
|
||||||
time.sleep(5) # wait for 5 seconds before checking again
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
|
|
||||||
num_nodes = int(os.environ["NUM_NODES"])
|
|
||||||
assert num_nodes > 1, "If the environment variable NUM_NODES is set, it should be greater than 1."
|
|
||||||
|
|
||||||
redis_password = os.environ["REDIS_PASSWORD"]
|
|
||||||
ray.init(address="auto", _redis_password=redis_password)
|
|
||||||
|
|
||||||
wait_for_nodes(num_nodes)
|
|
||||||
|
|
||||||
cluster_resources = ray.available_resources()
|
|
||||||
print(cluster_resources)
|
|
||||||
|
|
||||||
# Create the progress actor.
|
|
||||||
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
|
|
||||||
|
|
||||||
# Create and execute all sampling tasks in parallel.
|
|
||||||
results = [
|
|
||||||
sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
|
|
||||||
for i in range(NUM_SAMPLING_TASKS)
|
|
||||||
]
|
|
||||||
|
|
||||||
# Query progress periodically.
|
|
||||||
while True:
|
|
||||||
progress = ray.get(progress_actor.get_progress.remote())
|
|
||||||
print(f"Progress: {int(progress * 100)}%")
|
|
||||||
|
|
||||||
if progress == 1:
|
|
||||||
break
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Get all the sampling tasks results.
|
|
||||||
total_num_inside = sum(ray.get(results))
|
|
||||||
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
|
|
||||||
print(f"Estimated value of π is: {pi}")
|
|
Loading…
Reference in a new issue