ready to test the workflow on Hawk

This commit is contained in:
Kerem Kayabay 2024-01-05 13:22:52 +01:00
parent 8474b4328d
commit 79f28f8e02
10 changed files with 307 additions and 638 deletions

13
.gitignore vendored
View File

@ -1 +1,12 @@
__pycache__
# Compiled source
__pycache__
# Packages
*.gz
*.rar
*.tar
*.zip
# OS generated files
.DS_Store

131
README.md
View File

@ -7,88 +7,113 @@ This guide shows you how to launch a Ray cluster on HLRS' Hawk system.
- [Table of Contents](#table-of-contents)
- [Prerequisites](#prerequisites)
- [Getting Started](#getting-started)
- [Usage](#usage)
- [Notes](#notes)
- [Launch a Ray Cluster in Interactive Mode](#launch-a-ray-cluster-in-interactive-mode)
- [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode)
## Prerequisites
Before running the application, make sure you have the following prerequisites installed in a conda environment:
- [Python 3.9](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
- [Ray](https://dask.org/): You can install Ray inside
- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan.
Before building the environment, make sure you have the following prerequisites:
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system.
- [Conda-Pack](https://conda.github.io/conda-pack/) installed in the base environment: Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to the target system.
- `linux-64` platform for installing the Conda packages because Conda/pip downloads and installs precompiled binaries suitable to the architecture and OS of the local environment.
For more information, look at the documentation for [Conda on HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters)
## Getting Started
Only the main and r channels are available using the conda module on the clusters. To use custom packages, we need to move the local conda environment to Hawk.
1. Clone this repository to your local machine:
```bash
git clone <repository_url>
```
2. Go into the directory and create an environment using Conda and environment.yaml. Note: Be sure to add the necessary packages in environment.yaml:
2. Go into the directory and create an environment using Conda and environment.yaml.
Note: Be sure to add the necessary packages in `deployment_scripts/environment.yaml`:
```bash
cd deployment_scripts
./create-env.sh <your-env>
```
3. Send all files using `deploy-env.sh`:
```bash
./deployment_scripts/deploy-env.sh <your-env> <destination_host>:<destination_directory>
```
4. Send all the code to the appropriate directory on Vulcan using `scp`:
```bash
scp <your_script>.py <destination_host>:<destination_directory>
```
5. SSH into Vulcan and start a job interatively using:
```bash
qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00
```
6. Go into the directory with all code:
```bash
cd <destination_directory>
```
7. Initialize the Dask cluster:
```bash
source deploy-dask.sh "$(pwd)"
```
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
Note: Make sure all permissions are set using `chmod +x` for all scripts.
## Usage
To run the application interactively, execute the following command after all the cluster's nodes are up and running:
3. Package the environment and transfer the archive to the target system:
```bash
python
(my_env) $ conda deactivate
(base) $ conda pack -n my_env -o my_env.tar.gz # conda-pack must be installed in the base environment
```
Or to run a full script:
A workspace is suitable to store the compressed Conda environment archive on Hawk. Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
```bash
python <your-script>.py
ws_allocate hpda_project 10
ws_find hpda_project # find the path to workspace, which is the destination directory in the next step
```
Note: If you don't see your environment in the python interpretor, then manually activate it using:
You can send your data to an existing workspace using:
```bash
conda activate <your-env>
scp my_env.tar.gz <username>@hawk.hww.hlrs.de:<workspace_directory>
rm my_env.tar.gz # We don't need the archive locally anymore.
```
Do this before using the python interpretor.
## Notes
4. Clone the repository on Hawk to use the deployment scripts and project structure:
```bash
cd <workspace_directory>
git clone <repository_url>
```
## Launch a Ray Cluster in Interactive Mode
Using a single node interactively provides opportunities for faster code debugging.
1. On the Hawk login node, start an interactive job using:
```bash
qsub -I -l select=1:node_type=rome -l walltime=01:00:00
```
2. Go into the directory with all code:
```bash
cd <source_directory>/deployment_scripts
```
3. Deploy the conda environment to the ram disk:
```bash
source deploy-env.sh
```
Note: Make sure all permissions are set using `chmod +x`.
4. Initialize the Ray cluster.
You can use a Python interpreter to start a Ray cluster:
Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster:
```python
client = Client(..., silence_logs='error')
import ray
ray.init(dashboard_host='127.0.0.1')
```
Note: Replace all filenames within `<>` with the actual values applicable to your project.
1. Connect to the dashboard.
Warning: Always use `127.0.0.1` as the dashboard host to make the Ray cluster reachable by only you.
## Launch a Ray Cluster in Batch Mode
1. Add execution permissions to `start-ray-worker.sh`
```bash
cd deployment_scripts
chmod +x ray-start-worker.sh
```
2. Submit a job to launch the head and worker nodes.
You must modify the following variables in `submit-ray-job.sh`:
- Line 3 changes the cluster size. The default configuration launches a 3 node cluster.
- `$PROJECT_DIR`

View File

@ -0,0 +1,30 @@
#!/bin/bash
if [ $# -ne 5 ]; then
echo "Usage: $0 <ws_dir> <env_archive> <ray_address> <redis_password> <obj_store_memory>"
exit 1
fi
export WS_DIR=$1
export ENV_ARCHIVE=$2
export RAY_ADDRESS=$3
export REDIS_PASSWORD=$4
export OBJECT_STORE_MEMORY=$5
# printenv | grep 'RAY_ADDRESS\|REDIS_PASSWORD'
# module load system/nvidia/ALL.ALL.525.125.06
export ENV_PATH=/run/user/$PBS_JOBID/ray_env
mkdir -p $ENV_PATH
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
source $ENV_PATH/bin/activate
conda-unpack
ray start --address=$RAY_ADDRESS \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY \
--block
rm -rf $ENV_PATH

View File

@ -0,0 +1,59 @@
#!/bin/bash
#PBS -N output-ray-job
#PBS -l select=2:node_type=rome-ai
#PBS -l walltime=1:00:00
export JOB_SCRIPT=modeling_evaluation.py
export WS_DIR=/lustre/hpe/ws10/ws10.3/ws/hpckkaya-ifu
export ENV_ARCHIVE=ray-environment-v0.3.tar.gz
export SRC_DIR=$WS_DIR/ifu/src/ray-workflow
export DATA_DIR=/lustre/hpe/ws10/ws10.3/ws/hpckkaya-ifu-data/hpclzhon-ifu_data-1668830707
export RESULTS_DIR=$WS_DIR/ray_results
export NCCL_DEBUG=INFO
# Environment variables after this line should not change
export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT
export ENV_PATH=/run/user/$PBS_JOBID/ray_env
mkdir -p $ENV_PATH
tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH
source $ENV_PATH/bin/activate
conda-unpack
export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'`
export RAY_ADDRESS=$IP_ADDRESS:6379
export REDIS_PASSWORD=$(uuidgen)
# export RAY_scheduler_spread_threshold=0.0
export OBJECT_STORE_MEMORY=128000000000
ray start --disable-usage-stats \
--head \
--node-ip-address=$IP_ADDRESS \
--port=6379 \
--dashboard-host=127.0.0.1 \
--redis-password=$REDIS_PASSWORD \
--object-store-memory=$OBJECT_STORE_MEMORY
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
for ((i=1;i<$NUM_NODES;i++)); do
pbsdsh -n $i -- bash -l -c "'$SRC_DIR/ray-start-worker.sh' '$WS_DIR' '$ENV_ARCHIVE' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" &
done
# uncomment if you don't already control inside the code
# if [[ $NUM_NODES -gt 1 ]]
# then
# sleep 90
#fi
python3 $PYTHON_FILE
ray stop
rm -rf $ENV_PATH

View File

@ -1,413 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import dask\n",
"import random\n",
"import torch\n",
"from torch.utils.data import Dataset, DataLoader\n",
"\n",
"from dask.distributed import Client\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"\n",
"from dask_ml.preprocessing import MinMaxScaler\n",
"from dask_ml.model_selection import train_test_split\n",
"from dask_ml.linear_model import LinearRegression\n",
"\n",
"from daskdataset import DaskDataset, ShallowNet\n",
"import torch.nn as nn\n",
"import torch.nn.functional as F"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"client = Client()\n",
"\n",
"sample_dataset=\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\"\n",
"\n",
"df = dd.read_parquet(sample_dataset, engine=\"fastparquet\")#.repartition(npartitions=10) #using pyarrow throws error with numpy"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run this only on the cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = Client(str(os.getenv('HOSTNAME')) + \"-ib:8786\")\n",
"\n",
"sample_datasets=[\"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet\",]\n",
"\n",
"df = dd.read_parquet(sample_datasets, engine=\"fastparquet\") "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Convert old Parquet to new Parquet"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:43:12,666 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:14,316 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:15,668 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33977 (pid=48276) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:15,946 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:21,786 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.74 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:23,443 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.09 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:24,862 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46711 (pid=48247) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:25,144 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:31,078 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.76 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:32,615 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.11 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:34,068 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:39735 (pid=48375) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:34,366 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:40,997 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:42,760 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:44,718 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43435 (pid=48187) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:45,089 - distributed.nanny - WARNING - Restarting worker\n"
]
},
{
"ename": "KilledWorker",
"evalue": "Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Compute the Dask dataframe to get a Pandas dataframe\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m df_pandas \u001b[38;5;241m=\u001b[39m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;66;03m# Create a new Pandas dataframe with the expanded 'features' columns\u001b[39;00m\n\u001b[1;32m 5\u001b[0m features_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mto_list(), columns\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeature_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m i \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(\u001b[38;5;28mlen\u001b[39m(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;241m0\u001b[39m]))])\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:3224\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3222\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3223\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3224\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3225\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3226\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2359\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2358\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2359\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2360\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2361\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2362\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2363\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2364\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2365\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2366\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:351\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 351\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 352\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 353\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:418\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 416\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 417\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 418\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 419\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 420\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:391\u001b[0m, in \u001b[0;36msync.<locals>.f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 389\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 390\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 391\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 392\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 393\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2222\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2220\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2221\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2222\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2223\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2224\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n",
"\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html."
]
}
],
"source": [
"# Compute the Dask dataframe to get a Pandas dataframe\n",
"df_pandas = df.compute()\n",
"\n",
"# Create a new Pandas dataframe with the expanded 'features' columns\n",
"features_df = pd.DataFrame(df_pandas['features'].to_list(), columns=[f'feature_{i}' for i in range(len(df_pandas['features'][0]))])\n",
"labels_df = pd.DataFrame(df_pandas['labels'].to_list(), columns=[f'label_{i}' for i in range(len(df_pandas['labels'][0]))])\n",
"\n",
"# Concatenate the original dataframe with the expanded features and labels dataframes\n",
"df_pandas = pd.concat([features_df, labels_df], axis=1)\n",
"\n",
"#save df_pandas a parquet\n",
"df_pandas.to_parquet(\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\", engine=\"fastparquet\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"#separate the features and labels\n",
"df_labels = df.loc[:, df.columns.str.contains('label')]\n",
"\n",
"# Create a StandardScaler object\n",
"scaler_features = MinMaxScaler()\n",
"df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])\n",
"\n",
"#Split the data into training and test sets\n",
"X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:58:28,387 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.16 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:58:29,636 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.26 GiB -- Worker memory limit: 3.86 GiB\n"
]
}
],
"source": [
"X = torch.tensor(X_train.compute().values)\n",
"y = torch.tensor(y_train.compute().values)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:52:11,130 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.15 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:52:12,561 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.25 GiB -- Worker memory limit: 3.86 GiB\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"4860\n"
]
}
],
"source": [
"from skorch import NeuralNetRegressor\n",
"import torch.optim as optim\n",
"\n",
"niceties = {\n",
" \"callbacks\": False,\n",
" \"warm_start\": False,\n",
" \"train_split\": None,\n",
" \"max_epochs\": 5,\n",
"}\n",
"\n",
"model = NeuralNetRegressor(\n",
" module=ShallowNet,\n",
" module__n_features=X.size(dim=1),\n",
" criterion=nn.MSELoss,\n",
" optimizer=optim.SGD,\n",
" optimizer__lr=0.1,\n",
" optimizer__momentum=0.9,\n",
" batch_size=64,\n",
" **niceties,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
"model = model.share_memory()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Re-initializing module because the following parameters were re-set: n_features.\n",
"Re-initializing criterion.\n",
"Re-initializing optimizer.\n",
" epoch train_loss dur\n",
"------- ------------ ------\n",
" 1 \u001b[36m0.3994\u001b[0m 4.5545\n"
]
},
{
"data": {
"text/plain": [
"<class 'skorch.regressor.NeuralNetRegressor'>[initialized](\n",
" module_=ShallowNet(\n",
" (layer1): Linear(in_features=4860, out_features=8, bias=True)\n",
" ),\n",
")"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"model.fit(X, y)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 17:00:11,618 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 17:00:13,143 - distributed.worker.memory - WARNING - Worker is at 57% memory usage. Resuming worker. Process memory: 2.22 GiB -- Worker memory limit: 3.86 GiB\n"
]
}
],
"source": [
"test_X = torch.tensor(X_test.compute().values)\n",
"test_y = torch.tensor(y_test.compute().values)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-3.410176609207851\n"
]
}
],
"source": [
"print(model.score(test_X, test_y))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the loss function and optimizer\n",
"criterion = torch.nn.CrossEntropyLoss()\n",
"optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n",
"# Move the model to the GPU\n",
"device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
"model = model.to(device)\n",
"# Distribute the model across workers\n",
"model = model.share_memory()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Train the model\n",
"for epoch in range(10):\n",
" for batch in dataloader:\n",
" # Split the batch across workers\n",
" batch = [b.to(device) for b in batch]\n",
" futures = client.map(lambda data: model(data[0]), batch)\n",
" # Compute the loss\n",
" losses = client.map(criterion, futures, batch[1])\n",
" loss = client.submit(torch.mean, client.gather(losses))\n",
" # Compute the gradients and update the model parameters\n",
" optimizer.zero_grad()\n",
" gradients = client.map(lambda loss, future: torch.autograd.grad(loss, future)[0], loss, futures)\n",
" gradients =client.submit(torch.mean, client.gather(gradients))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.map(lambda parameter, gradient: parameter.grad.copy_(gradient), model.parameters(), gradients)\n",
"optimizer.step()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_ml.linear_model import LinearRegression\n",
"from sklearn.linear_model import LinearRegression\n",
"\n",
"# Initialize the Linear Regression model\n",
"model = LinearRegression()\n",
"model.fit(X_train, y_train)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_dask_df = df['features'].apply(pd.Series, meta=meta)\n",
"new_dask_df.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use this code for PyArrow tables\n",
"\n",
"#import pyarrow as pa\n",
"\n",
"# Define schema for features and labels columns\n",
"#schema = pa.schema({\n",
" #'features': pa.list_(pa.float32()),\n",
" #'labels': pa.list_(pa.float32())\n",
"#})\n",
"\n",
"#import pyarrow.parquet as pq\n",
"#df = dd.from_pandas(pq.read_table(sample_dataset, schema=schema).to_pandas(), npartitions=10)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dask-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.18"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,54 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ray"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ray.init()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster_resources = ray.available_resources()\n",
"available_cpu_cores = cluster_resources.get('CPU', 0)\n",
"print(cluster_resources)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ray",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -1,57 +0,0 @@
import os
import dask
import random
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
import torch
from torch.utils.data import Dataset, DataLoader
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet"
df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300)
df_features = df.loc[:, df.columns.str.contains('feature')]
df_labels = df.loc[:, df.columns.str.contains('label')]
# Create a StandardScaler object
scaler_features = MinMaxScaler()
df_features_scaled = scaler_features.fit_transform(df_features)
#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')]
#Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0)
dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True)
model = torch.nn.Sequential(
torch.nn.Linear(784, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 10)
)
#X_train = X_train.loc[:, X_train.columns.str.contains('feature')]
#y_train = y_train.loc[:, y_train.columns.str.contains('label')]
# Initialize the Linear Regression model
model = LinearRegression().fit(X_train, y_train)
score = model.score(X_test, y_test)

View File

@ -1,82 +0,0 @@
import os
import dask
import random
import torch
from torch.utils.data import Dataset, DataLoader
import time
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from joblib import parallel_backend
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
from daskdataset import DaskDataset, ShallowNet
start_time = time.time()
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB")
#df_future = client.scatter(df)
#separate the features and labels
df_labels = df.loc[:, df.columns.str.contains('label')]
# Create a StandardScaler object
scaler_features = MinMaxScaler()
df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])
#Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)
X = torch.tensor(X_train.compute().values)
y = torch.tensor(y_train.compute().values)
from skorch import NeuralNetRegressor
import torch.optim as optim
niceties = {
"callbacks": False,
"warm_start": False,
"train_split": None,
"max_epochs": 5,
}
model = NeuralNetRegressor(
module=ShallowNet,
module__n_features=X.size(dim=1),
criterion=nn.MSELoss,
optimizer=optim.SGD,
optimizer__lr=0.1,
optimizer__momentum=0.9,
batch_size=64,
**niceties,
)
# Initialize the Linear Regression model
model = LinearRegression()
model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True))
end_time = time.time()
print("Time to load data: ", end_time - start_time)
dask_dataset = DaskDataset(X_train, y_train)
dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True)
for feature, label in dataloader:
print(feature)
print(label)
break

View File

@ -1,32 +0,0 @@
import torch
from torch.utils.data import Dataset, DataLoader
import dask.dataframe as dd
import torch.nn as nn
import torch.nn.functional as F
# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns
class DaskDataset(Dataset):
def __init__(self, df_features, df_labels):
self.features = df_features.to_dask_array(lengths=True)
self.labels = df_labels.to_dask_array(lengths=True)
def __len__(self):
return self.features.size
def __getitem__(self, idx):
return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values)
class ShallowNet(nn.Module):
def __init__(self, n_features):
super().__init__()
self.layer1 = nn.Linear(n_features, 128)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(128, 8)
def forward(self, x):
x = self.layer1(x)
x = self.relu(x)
x = self.layer2(x)
return x

74
src/monte-carlo-pi.py Normal file
View File

@ -0,0 +1,74 @@
# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html
import ray
import math
import time
import random
import os
ray.init(address="auto", _node_ip_address=os.environ["IP_ADDRESS"], _redis_password=os.environ["REDIS_PASSWORD"])
cluster_resources = ray.available_resources()
available_cpu_cores = cluster_resources.get('CPU', 0)
print(cluster_resources)
@ray.remote
class ProgressActor:
def __init__(self, total_num_samples: int):
self.total_num_samples = total_num_samples
self.num_samples_completed_per_task = {}
def report_progress(self, task_id: int, num_samples_completed: int) -> None:
self.num_samples_completed_per_task[task_id] = num_samples_completed
def get_progress(self) -> float:
return (
sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
)
@ray.remote
def sampling_task(num_samples: int, task_id: int,
progress_actor: ray.actor.ActorHandle) -> int:
num_inside = 0
for i in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
if math.hypot(x, y) <= 1:
num_inside += 1
# Report progress every 1 million samples.
if (i + 1) % 1_000_000 == 0:
# This is async.
progress_actor.report_progress.remote(task_id, i + 1)
# Report the final progress.
progress_actor.report_progress.remote(task_id, num_samples)
return num_inside
# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 100
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
# Create and execute all sampling tasks in parallel.
results = [
sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
for i in range(NUM_SAMPLING_TASKS)
]
# Query progress periodically.
while True:
progress = ray.get(progress_actor.get_progress.remote())
print(f"Progress: {int(progress * 100)}%")
if progress == 1:
break
time.sleep(1)
# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")