Compare commits

..

7 commits

16 changed files with 217 additions and 551 deletions

15
.gitignore vendored
View file

@ -1,12 +1,5 @@
# Compiled source
__pycache__
# Packages
*.gz
*.rar
*.tar
*.zip
# OS generated files
.DS_Store
notebooks/
/deployment_scripts/create-env.sh
/deployment_scripts/deploy-env.sh
/deployment_scripts/environment.yaml

187
README.md
View file

@ -1,162 +1,127 @@
# Ray: How to launch a Ray Cluster on Hawk?
# Dask: How to execute python workloads using a Dask cluster on Vulcan
This guide shows you how to launch a Ray cluster on HLRS' Hawk system.
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
## Table of Contents
- [Ray: How to launch a Ray Cluster on Hawk?](#ray-how-to-launch-a-ray-cluster-on-hawk)
- [Table of Contents](#table-of-contents)
- [Prerequisites](#prerequisites)
- [Getting Started](#getting-started)
- [Launch a local Ray Cluster in Interactive Mode](#launch-a-local-ray-cluster-in-interactive-mode)
- [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode)
## Prerequisites
Before building the environment, make sure you have the following prerequisites:
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system.
- [Conda-Pack](https://conda.github.io/conda-pack/) installed in the base environment: Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to the target system.
- `linux-64` platform for installing the Conda packages because Conda/pip downloads and installs precompiled binaries suitable to the architecture and OS of the local environment.
For more information, look at the documentation for [Conda on HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters)
- [Getting Started](#getting-started)
- [Usage](#usage)
- [Notes](#notes)
## Getting Started
Only the main and r channels are available using the conda module on the clusters. To use custom packages, we need to move the local conda environment to Hawk.
### 1. Build and transfer the Conda environment to Vulcan:
**Step 1.** Clone this repository to your local machine:
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Vulcan.
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment.
### 2. Allocate workspace on Vulcan:
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
```bash
git clone <repository_url>
ws_allocate dask_workspace 10
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
```
**Step 2.** Go into the directory and create an environment using Conda and environment.yaml.
Note: Be sure to add the necessary packages in `deployment_scripts/environment.yaml`:
```bash
cd deployment_scripts
./create-env.sh <your-env>
```
**Step 3.** Package the environment and transfer the archive to the target system:
```bash
(base) $ conda pack -n <your-env> -o ray_env.tar.gz # conda-pack must be installed in the base environment
```
A workspace is suitable to store the compressed Conda environment archive on Hawk. Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
```bash
ws_allocate hpda_project 10
ws_find hpda_project # find the path to workspace, which is the destination directory in the next step
```
You can send your data to an existing workspace using:
```bash
scp ray_env.tar.gz <username>@hawk.hww.hlrs.de:<workspace_directory>
rm ray_env.tar.gz # We don't need the archive locally anymore.
```
**Step 4.** Clone the repository on Hawk to use the deployment scripts and project structure:
### 3. Clone the repository on Vulcan to use the deployment scripts and project structure:
```bash
cd <workspace_directory>
git clone <repository_url>
```
## Launch a local Ray Cluster in Interactive Mode
Using a single node interactively provides opportunities for faster code debugging.
**Step 1.** On the Hawk login node, start an interactive job using:
### 4. Send all the code to the appropriate directory on Vulcan using `scp`:
```bash
qsub -I -l select=1:node_type=rome -l walltime=01:00:00
scp <your_script>.py <destination_host>:<destination_directory>
```
**Step 2.** Go into the project directory:
### 5. SSH into Vulcan and start a job interactively using:
```bash
cd <project_directory>/deployment_scripts
qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
```
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. Follow section [Multiple Nodes](#multiple-nodes) for more information.
**Step 3.** Deploy the conda environment to the ram disk:
Change the following line by editing `deploy-env.sh`:
### 6. Go into the directory with all code:
```bash
export WS_DIR=<workspace_dir>
cd <destination_directory>
```
Then, use the following command to deploy and activate the environment:
### 7. Initialize the Dask cluster:
```bash
source deploy-env.sh
source deploy-dask.sh "$(pwd)"
```
Note: Make sure all permissions are set using `chmod +x`.
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
Note: Make sure all permissions are set using `chmod +x` for all scripts.
**Step 4.** Initialize the Ray cluster.
## Usage
You can use a Python interpreter to start a local Ray cluster:
```python
import ray
ray.init()
```
**Step 5.** Connect to the dashboard.
Warning: Do not change the default dashboard host `127.0.0.1` to keep Ray cluster reachable by only you.
Note: We recommend using a dedicated Firefox profile for accessing web-based services on HLRS Compute Platforms. If you haven't created a profile, check out our [guide](https://kb.hlrs.de/platforms/index.php/How_to_use_Web_Based_Services_on_HLRS_Compute_Platforms).
You need the job id and the hostname for your current job. You can obtain this information on the login node using:
### Single Node
To run the application interactively on a single node, follow points 4, 5, 6 and, 7 from [Getting Started](#getting-started), and execute the following command after all the job has started:
```bash
qstat -anw # get the job id and the hostname
# Load the Conda module
module load bigdata/conda
source activate # activates the base environment
# List available Conda environments for verification purposes
conda env list
# Activate a specific Conda environment.
conda activate dask_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
```
Then, on your local computer,
After the environment is activated, you can run the python interpretor:
```bash
export PBS_JOBID=<job-id> # e.g., 2316419.hawk-pbs5
ssh <compute-host> # e.g., r38c3t8n3
python
```
Check your SSH config in the first step if this doesn't work.
Then, launch Firefox web browser using the configured profile. Open `localhost:8265` to access the Ray dashboard.
## Launch a Ray Cluster in Batch Mode
Let us [estimate the value of π](https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html) as an example application.
**Step 1.** Add execution permissions to `start-ray-worker.sh`
Or to run a full script:
```bash
cd deployment_scripts
chmod +x start-ray-worker.sh
python <your-script>.py
```
**Step 2.** Submit a job to launch the head and worker nodes.
You must modify the following lines in `submit-ray-job.sh`:
- Line 3 changes the cluster size. The default configuration launches a 3 node cluster.
- `export WS_DIR=<workspace_dir>` - set the correct workspace directory.
- `export PROJECT_DIR=$WS_DIR/<project_name>` - set the correct project directory.
Note: The job script `src/monte-carlo-pi.py` waits for all nodes in the Ray cluster to become available. Preserve this pattern in your Python code while using a multiple node Ray cluster.
Launch the job and monitor the progress. As the job starts, its status (S) shifts from Q (Queued) to R (Running). Upon completion, the job will no longer appear in the `qstat -a` display.
### Multiple Nodes
To run the application on multiple nodes, you need to write a `.pbs` script and submit it using `qsub`. Follow lines 1-4 from the [Getting Started](#getting-started) section. Write a `submit-dask-job.pbs` script:
```bash
qsub submit-ray-job.pbs
#!/bin/bash
#PBS -N dask-job
#PBS -l select=3:node_type=rome
#PBS -l walltime=1:00:00
#Go to the directory where the code is
cd <destination_directory>
#Deploy the Dask cluster
source deploy-dask.sh "$(pwd)"
#Run the python script
python <your-script>.py
```
A more thorough example is available in the `deployment_scripts` directory under `submit-dask-job.pbs`.
And then execute the following commands to submit the job:
```bash
qsub submit-dask-job.pbs
qstat -anw # Q: Queued, R: Running, E: Ending
ls -l # list files after the job finishes
cat ray-job.o... # inspect the output file
cat ray-job.e... # inspect the error file
cat dask-job.o... # inspect the output file
cat dask-job.e... # inspect the error file
```
If you need to delete the job, use `qdel <job-id>`. If this doesn't work, use the `-W force` option: `qdel -W force <job-id>`
## Notes
Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster:
```python
client = Client(..., silence_logs='error')
```
Note: Replace all filenames within `<>` with the actual values applicable to your project.

View file

@ -1,23 +0,0 @@
#!/bin/bash
# Display usage
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <conda_environment_name>"
exit 1
fi
# Name of the Conda environment
CONDA_ENV_NAME=$1
# Check if the Conda environment already exists
if conda env list | grep -q "$CONDA_ENV_NAME"; then
echo "Environment '$CONDA_ENV_NAME' already exists."
else
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
# Create Conda environment
CONDA_SUBDIR=linux-64 conda env create --name $CONDA_ENV_NAME -f environment.yaml
fi

View file

@ -0,0 +1,31 @@
#!/bin/bash
#Get the current workspace directory and the master node
export CURRENT_WORKSPACE=$1
export DASK_SCHEDULER_HOST=$2
# Path to localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
# Extract Dask environment in localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
#tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
#chmod -R 700 $DASK_ENV
# Start the dask environment
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
source $DASK_ENV/bin/activate
conda-unpack
# Start Dask worker
export DASK_SCHEDULER_PORT="8786" # Replace with the port on which the Dask scheduler is running
# Additional Dask worker options can be added here if needed
# Change local directory if memory is an issue
# Change directory to localscratch and start Dask worker
cd $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Starting Dask worker at $DASK_SCHEDULER_HOST on port $DASK_SCHEDULER_PORT"
dask worker $DASK_SCHEDULER_HOST:$DASK_SCHEDULER_PORT

View file

@ -0,0 +1,54 @@
#!/bin/bash
export CURRENT_WORKSPACE=$1
# Check if running in a PBS Job environment
if [ -z ${PBS_NODEFILE+x} ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] ERROR: This script is meant to run as a part of PBS Job. Don't start it at login nodes."
exit 1
fi
export NUM_NODES=$(wc -l < $PBS_NODEFILE)
if [ $NUM_NODES -lt 2 ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] WARNING: You have a single node job running. Dask cluster requires at least 2 nodes."
exit 1
fi
export ALL_NODES=$(cat $PBS_NODEFILE)
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
export DASK_SCHEDULER_PORT=8786
export DASK_UI_PORT=8787
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
# Path to localscratch
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
# Extract Dask environment in localscratch
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Setting up Dask environment"
# Start the dask environment
source $DASK_ENV/bin/activate
conda-unpack
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Scheduler at $SCHEDULER_NODE on port $DASK_SCHEDULER_PORT"
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
for ((i=1;i<$NUM_NODES;i++)); do
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
pbsdsh -n $i -o -- bash -l -c "source $CURRENT_WORKSPACE/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
done
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Dask cluster ready, wait for workers to connect to the scheduler."
# Optionally, you can provide a script for the workers to execute using ssh, similar to Spark.
# Example: ssh $node "source activate your_conda_env && python your_dask_worker_script.py" &

View file

@ -1,43 +0,0 @@
#!/bin/bash
export WS_DIR=<workspace_dir>
# Get the first character of the hostname
first_char=$(hostname | cut -c1)
# Check if the first character is not "r"
if [[ $first_char != "r" ]]; then
# it's not a cpu node.
echo "Hostname does not start with 'r'."
# Get the first seven characters of the hostname
first_seven_chars=$(hostname | cut -c1,2,3,4,5,6,7)
# Check if it is an ai node
if [[ $first_seven_chars != "hawk-ai" ]]; then
echo "Hostname does not start with 'hawk-ai' too. Exiting."
return 1
else
echo "GPU node detected."
export OBJ_STR_MEMORY=350000000000
export TEMP_CHECKPOINT_DIR=/localscratch/$PBS_JOBID/model_checkpoints/
mkdir -p $TEMP_CHECKPOINT_DIR
fi
else
echo "CPU node detected."
fi
module load bigdata/conda
export RAY_DEDUP_LOGS=0
export ENV_ARCHIVE=ray_env.tar.gz
export CONDA_ENVS=/run/user/$PBS_JOBID/envs
export ENV_NAME=ray_env
export ENV_PATH=$CONDA_ENVS/$ENV_NAME
mkdir -p $ENV_PATH
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
source $ENV_PATH/bin/activate
export CONDA_ENVS_PATH=CONDA_ENVS

View file

@ -1,18 +1,4 @@
# Reference: Cluster Deployment Scripts
Wiki link:
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
Structure:
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
- [ ] [How-to guide](https://diataxis.fr/how-to-guides/)
- [x] [Reference](https://diataxis.fr/reference/)
- [ ] [Explanation](https://diataxis.fr/explanation/)
To do:
---
# Reference Guide: Dask Cluster Deployment Scripts
## Overview
@ -36,40 +22,7 @@ Before using these scripts, ensure that the following prerequisites are met:
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
## 1. create-env.sh
### Overview
`create-env.sh` is designed to create a Conda environment. It checks for the existence of the specified environment and either creates it or notifies the user if it already exists.
Note: Define your Conda environment in `environment.yaml` before running this script.
### Usage
```bash
./create-env.sh <conda_environment_name>
```
### Note
- This script is intended to run on a local system where Conda is installed.
## 2. deploy-env.sh
### Overview
`deploy-env.sh` is responsible for deploying the Conda environment to a remote server. If the tar.gz file already exists, it is copied; otherwise, it is created before being transferred.
### Usage
```bash
./deploy-env.sh <environment_name> <destination_directory>
```
### Note
- This script is intended to run on a local system.
## 3. deploy-dask.sh
## 1. deploy-dask.sh
### Overview
@ -86,7 +39,7 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
- This script is designed for an HPC environment with PBS job scheduling.
- Modifications may be necessary for different job schedulers.
## 4. dask-worker.sh
## 2. dask-worker.sh
### Overview
@ -96,9 +49,3 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
- Execute this script on each allocated node to connect them to the Dask scheduler.
- Designed for use with PBS job scheduling.
## Workflow
1. **Create Conda Environment**: Execute `create-env.sh` to create a Conda environment locally.
2. **Deploy Conda Environment**: Execute `deploy-env.sh` to deploy the Conda environment to a remote server.
3. **Deploy Dask Cluster**: Execute `deploy-dask.sh` to start the Dask cluster on an HPC environment.

View file

@ -1,23 +0,0 @@
name: ray
channels:
- defaults
dependencies:
- python=3.10
- pip
- pip:
- ray==2.8.0
- "ray[default]==2.8.0"
- dask==2022.10.1
- torch
- pydantic<2
- six
- torch
- tqdm
- pandas<2
- scikit-learn
- matplotlib
- optuna
- seaborn
- tabulate
- jupyterlab
- autopep8

View file

@ -1,27 +0,0 @@
FROM python:3.9
# -------------------------------------------------------------------
# Install Ray and essential packages.
# For more information on Ray installation, see:
# https://docs.ray.io/en/latest/ray-overview/installation.html
# Install the latest Dask versions that are compatible with
# Ray nightly. For more information, see:
# https://docs.ray.io/en/latest/data/dask-on-ray.html
# -------------------------------------------------------------------
RUN pip install --no-cache-dir \
"ray==2.8.0" \
"ray[default]==2.8.0" \
"dask==2022.10.1" \
torch \
"pydantic<2" \
six \
"tqdm<2" \
"pandas<2" \
scikit-learn \
matplotlib \
optuna \
seaborn \
tabulate \
jupyterlab \
autopep8

View file

@ -1,26 +0,0 @@
#!/bin/bash
if [ $# -ne 5 ]; then
echo "Usage: $0 <ws_dir> <env_archive> <ray_address> <redis_password> <obj_store_memory>"
exit 1
fi
export WS_DIR=$1
export ENV_ARCHIVE=$2
export RAY_ADDRESS=$3
export REDIS_PASSWORD=$4
export OBJECT_STORE_MEMORY=$5
export ENV_PATH=/run/user/$PBS_JOBID/ray_env # We use the ram disk to extract the environment packages since a large number of files decreases the performance of the parallel file system.
mkdir -p $ENV_PATH
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
source $ENV_PATH/bin/activate
conda-unpack
ray start --address=$RAY_ADDRESS \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY \
--block
rm -rf $ENV_PATH # It's nice to clean up before you terminate the job

View file

@ -0,0 +1,33 @@
#!/bin/bash
#PBS -N dask-job
#PBS -l select=2:node_type=rome
#PBS -l walltime=1:00:00
export PYTHON_FILE= # Path to the Python file you want to run
export CURRENT_WORKSPACE= # Path to the workspace where you have pulled this repo and the dask-env.tar.gz file
export ALL_NODES=$(cat $PBS_NODEFILE)
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
export DASK_SCHEDULER_PORT=8786
export DASK_UI_PORT=8787
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV
source $DASK_ENV/bin/activate
conda-unpack
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
for ((i=1;i<$NUM_NODES;i++)); do
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
pbsdsh -n $i -o -- bash -l -c "source /deplyment_scripts/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
done
python3 $PYTHON_FILE

View file

@ -1,50 +0,0 @@
#!/bin/bash
#PBS -N ray-job
#PBS -l select=2:node_type=rome
#PBS -l walltime=1:00:00
export WS_DIR=<workspace_dir>
export PROJECT_DIR=$WS_DIR/<project_name>
export JOB_SCRIPT=monte-carlo-pi.py
export ENV_ARCHIVE=ray_env.tar.gz
export OBJECT_STORE_MEMORY=128000000000
# Environment variables after this line should not change
export SRC_DIR=$PROJECT_DIR/src
export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT
export DEPLOYMENT_SCRIPTS=$PROJECT_DIR/deployment_scripts
export ENV_PATH=/run/user/$PBS_JOBID/ray_env # We use the ram disk to extract the environment packages since a large number of files decreases the performance of the parallel file system.
mkdir -p $ENV_PATH
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH # This line extracts the packages to ram disk.
source $ENV_PATH/bin/activate
export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'`
export RAY_ADDRESS=$IP_ADDRESS:6379
export REDIS_PASSWORD=$(openssl rand -base64 32)
export NCCL_DEBUG=INFO
ray start --disable-usage-stats \
--head \
--node-ip-address=$IP_ADDRESS \
--port=6379 \
--dashboard-host=127.0.0.1 \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
for ((i=1;i<$NUM_NODES;i++)); do
pbsdsh -n $i -- bash -l -c "'$DEPLOYMENT_SCRIPTS/start-ray-worker.sh' '$WS_DIR' '$ENV_ARCHIVE' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" &
done
python3 $PYTHON_FILE
ray stop --grace-period 30
rm -rf $ENV_PATH # It's nice to clean up before you terminate the job.

View file

@ -1,54 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ray"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ray.init()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster_resources = ray.available_resources()\n",
"available_cpu_cores = cluster_resources.get('CPU', 0)\n",
"print(cluster_resources)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ray",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -1,38 +0,0 @@
Create the container on the login node:
```bash
export WS_DIR=$(ws_find workspace_dir) # adjust this
cd $WS_DIR
wget https://fex.hlrs.de/fop/FYaJqyzw/ray.tar # download the container archive
export CONTAINER_NAME=ray
export CONTAINER_TAG=latest
export UDOCKER_DIR="$WS_DIR/.udocker/" # to store the image layers
udocker images -l # this will create a repo the first time you use it
udocker rmi $CONTAINER_NAME:$CONTAINER_TAG # results in error since the image does not exist
udocker load -i $WS_DIR/$CONTAINER_NAME.tar $CONTAINER_NAME
rm /$WS_DIR/$CONTAINER_NAME.tar # you no longer need the tar archive
```
Allocate a CPU node, and then:
```bash
module load bigdata/udocker/1.3.4
export WS_DIR=$(ws_find workspace_dir) # adjust this
export UDOCKER_DIR="$WS_DIR/.udocker/"
export UDOCKER_CONTAINERS="/run/user/$PBS_JOBID/udocker/containers"
mkdir -p $UDOCKER_CONTAINERS
mkdir -p /run/user/$PBS_JOBID/tmp
export CONTAINER_NAME=ray
export CONTAINER_TAG=latest
udocker create --name=$CONTAINER_NAME:$CONTAINER_TAG
udocker ps
udocker run --volume $WS_DIR:/workspace --volume /run/user/$PBS_JOBID/tmp:/tmp $CONTAINER_NAME
```
You should see a Python shell.
```python
import ray
# ray.init(num_cpus=4) # Works with a small number of CPUs
ray.init() # But, it can't use all the available CPUs
```

16
src/dask-example-pi.py Normal file
View file

@ -0,0 +1,16 @@
import dask.bag as db
import random
from dask.distributed import Client
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
NUM_SAMPLES=100
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
def calc_pi():
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
return 4.0 * count / NUM_SAMPLES
print(calc_pi())

View file

@ -1,89 +0,0 @@
# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html
import ray
import math
import time
import random
import os
# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 100
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
@ray.remote
class ProgressActor:
def __init__(self, total_num_samples: int):
self.total_num_samples = total_num_samples
self.num_samples_completed_per_task = {}
def report_progress(self, task_id: int, num_samples_completed: int) -> None:
self.num_samples_completed_per_task[task_id] = num_samples_completed
def get_progress(self) -> float:
return (
sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
)
@ray.remote
def sampling_task(num_samples: int, task_id: int,
progress_actor: ray.actor.ActorHandle) -> int:
num_inside = 0
for i in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
# Report progress every 1 million samples.
if (i + 1) % 1_000_000 == 0:
# This is async.
progress_actor.report_progress.remote(task_id, i + 1)
# Report the final progress.
progress_actor.report_progress.remote(task_id, num_samples)
return num_inside
def wait_for_nodes(expected_num_nodes: int):
while True:
num_nodes = len(ray.nodes())
if num_nodes >= expected_num_nodes:
break
print(f'Currently {num_nodes} nodes connected. Waiting for more...')
time.sleep(5) # wait for 5 seconds before checking again
if __name__ == "__main__":
num_nodes = int(os.environ["NUM_NODES"])
assert num_nodes > 1, "If the environment variable NUM_NODES is set, it should be greater than 1."
redis_password = os.environ["REDIS_PASSWORD"]
ray.init(address="auto", _redis_password=redis_password)
wait_for_nodes(num_nodes)
cluster_resources = ray.available_resources()
print(cluster_resources)
# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
# Create and execute all sampling tasks in parallel.
results = [
sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
for i in range(NUM_SAMPLING_TASKS)
]
# Query progress periodically.
while True:
progress = ray.get(progress_actor.get_progress.remote())
print(f"Progress: {int(progress * 100)}%")
if progress == 1:
break
time.sleep(1)
# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")