diff --git a/.gitignore b/.gitignore index ed8ebf5..df625d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,12 @@ -__pycache__ \ No newline at end of file +# Compiled source +__pycache__ + + +# Packages +*.gz +*.rar +*.tar +*.zip + +# OS generated files +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index 4490444..1c569af 100644 --- a/README.md +++ b/README.md @@ -7,88 +7,113 @@ This guide shows you how to launch a Ray cluster on HLRS' Hawk system. - [Table of Contents](#table-of-contents) - [Prerequisites](#prerequisites) - [Getting Started](#getting-started) - - [Usage](#usage) - - [Notes](#notes) + - [Launch a Ray Cluster in Interactive Mode](#launch-a-ray-cluster-in-interactive-mode) + - [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode) ## Prerequisites -Before running the application, make sure you have the following prerequisites installed in a conda environment: -- [Python 3.9](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters). -- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters). -- [Ray](https://dask.org/): You can install Ray inside -- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan. +Before building the environment, make sure you have the following prerequisites: +- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. +- [Conda-Pack](https://conda.github.io/conda-pack/) installed in the base environment: Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to the target system. +- `linux-64` platform for installing the Conda packages because Conda/pip downloads and installs precompiled binaries suitable to the architecture and OS of the local environment. + +For more information, look at the documentation for [Conda on HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters) ## Getting Started +Only the main and r channels are available using the conda module on the clusters. To use custom packages, we need to move the local conda environment to Hawk. + 1. Clone this repository to your local machine: ```bash git clone ``` -2. Go into the directory and create an environment using Conda and environment.yaml. Note: Be sure to add the necessary packages in environment.yaml: +2. Go into the directory and create an environment using Conda and environment.yaml. + +Note: Be sure to add the necessary packages in `deployment_scripts/environment.yaml`: ```bash cd deployment_scripts ./create-env.sh ``` -3. Send all files using `deploy-env.sh`: - - ```bash - ./deployment_scripts/deploy-env.sh : - ``` - -4. Send all the code to the appropriate directory on Vulcan using `scp`: - - ```bash - scp .py : - ``` - -5. SSH into Vulcan and start a job interatively using: - - ```bash - qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00 - ``` - -6. Go into the directory with all code: - - ```bash - cd - ``` - -7. Initialize the Dask cluster: - - ```bash - source deploy-dask.sh "$(pwd)" - ``` - Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs. - Note: Make sure all permissions are set using `chmod +x` for all scripts. - -## Usage - -To run the application interactively, execute the following command after all the cluster's nodes are up and running: +3. Package the environment and transfer the archive to the target system: ```bash -python +(my_env) $ conda deactivate +(base) $ conda pack -n my_env -o my_env.tar.gz # conda-pack must be installed in the base environment ``` -Or to run a full script: +A workspace is suitable to store the compressed Conda environment archive on Hawk. Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide. + ```bash -python .py +ws_allocate hpda_project 10 +ws_find hpda_project # find the path to workspace, which is the destination directory in the next step ``` -Note: If you don't see your environment in the python interpretor, then manually activate it using: +You can send your data to an existing workspace using: + ```bash -conda activate +scp my_env.tar.gz @hawk.hww.hlrs.de: +rm my_env.tar.gz # We don't need the archive locally anymore. ``` -Do this before using the python interpretor. -## Notes +4. Clone the repository on Hawk to use the deployment scripts and project structure: + +```bash +cd +git clone +``` + +## Launch a Ray Cluster in Interactive Mode + +Using a single node interactively provides opportunities for faster code debugging. + +1. On the Hawk login node, start an interactive job using: + +```bash +qsub -I -l select=1:node_type=rome -l walltime=01:00:00 +``` + +2. Go into the directory with all code: + +```bash +cd /deployment_scripts +``` + +3. Deploy the conda environment to the ram disk: + +```bash +source deploy-env.sh +``` +Note: Make sure all permissions are set using `chmod +x`. + +4. Initialize the Ray cluster. + +You can use a Python interpreter to start a Ray cluster: -Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster: ```python -client = Client(..., silence_logs='error') +import ray + +ray.init(dashboard_host='127.0.0.1') ``` -Note: Replace all filenames within `<>` with the actual values applicable to your project. \ No newline at end of file +1. Connect to the dashboard. + +Warning: Always use `127.0.0.1` as the dashboard host to make the Ray cluster reachable by only you. + +## Launch a Ray Cluster in Batch Mode + +1. Add execution permissions to `start-ray-worker.sh` + +```bash +cd deployment_scripts +chmod +x ray-start-worker.sh +``` + +2. Submit a job to launch the head and worker nodes. + +You must modify the following variables in `submit-ray-job.sh`: +- Line 3 changes the cluster size. The default configuration launches a 3 node cluster. +- `$PROJECT_DIR` diff --git a/deployment_scripts/start-ray-worker.sh b/deployment_scripts/start-ray-worker.sh new file mode 100644 index 0000000..04242fd --- /dev/null +++ b/deployment_scripts/start-ray-worker.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +if [ $# -ne 5 ]; then + echo "Usage: $0 " + exit 1 +fi + +export WS_DIR=$1 +export ENV_ARCHIVE=$2 +export RAY_ADDRESS=$3 +export REDIS_PASSWORD=$4 +export OBJECT_STORE_MEMORY=$5 + +# printenv | grep 'RAY_ADDRESS\|REDIS_PASSWORD' + +# module load system/nvidia/ALL.ALL.525.125.06 + +export ENV_PATH=/run/user/$PBS_JOBID/ray_env + +mkdir -p $ENV_PATH +tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH +source $ENV_PATH/bin/activate +conda-unpack + +ray start --address=$RAY_ADDRESS \ + --redis-password=$REDIS_PASSWORD \ + --object-store-memory=$OBJECT_STORE_MEMORY \ + --block + +rm -rf $ENV_PATH \ No newline at end of file diff --git a/deployment_scripts/submit-ray-job.sh b/deployment_scripts/submit-ray-job.sh new file mode 100644 index 0000000..97d0487 --- /dev/null +++ b/deployment_scripts/submit-ray-job.sh @@ -0,0 +1,59 @@ +#!/bin/bash +#PBS -N output-ray-job +#PBS -l select=2:node_type=rome-ai +#PBS -l walltime=1:00:00 + +export JOB_SCRIPT=modeling_evaluation.py + +export WS_DIR=/lustre/hpe/ws10/ws10.3/ws/hpckkaya-ifu +export ENV_ARCHIVE=ray-environment-v0.3.tar.gz + +export SRC_DIR=$WS_DIR/ifu/src/ray-workflow +export DATA_DIR=/lustre/hpe/ws10/ws10.3/ws/hpckkaya-ifu-data/hpclzhon-ifu_data-1668830707 +export RESULTS_DIR=$WS_DIR/ray_results + +export NCCL_DEBUG=INFO + +# Environment variables after this line should not change + +export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT +export ENV_PATH=/run/user/$PBS_JOBID/ray_env + +mkdir -p $ENV_PATH +tar -xzf $WS_DIR/$ENV_ARCHIVE -C $ENV_PATH +source $ENV_PATH/bin/activate +conda-unpack + +export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'` + +export RAY_ADDRESS=$IP_ADDRESS:6379 +export REDIS_PASSWORD=$(uuidgen) + +# export RAY_scheduler_spread_threshold=0.0 +export OBJECT_STORE_MEMORY=128000000000 + +ray start --disable-usage-stats \ + --head \ + --node-ip-address=$IP_ADDRESS \ + --port=6379 \ + --dashboard-host=127.0.0.1 \ + --redis-password=$REDIS_PASSWORD \ + --object-store-memory=$OBJECT_STORE_MEMORY + +export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l) + +for ((i=1;i<$NUM_NODES;i++)); do + pbsdsh -n $i -- bash -l -c "'$SRC_DIR/ray-start-worker.sh' '$WS_DIR' '$ENV_ARCHIVE' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" & +done + +# uncomment if you don't already control inside the code +# if [[ $NUM_NODES -gt 1 ]] +# then + # sleep 90 +#fi + +python3 $PYTHON_FILE + +ray stop + +rm -rf $ENV_PATH \ No newline at end of file diff --git a/notebooks/dask_test.ipynb b/notebooks/dask_test.ipynb deleted file mode 100644 index e17e343..0000000 --- a/notebooks/dask_test.ipynb +++ /dev/null @@ -1,413 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import dask\n", - "import random\n", - "import torch\n", - "from torch.utils.data import Dataset, DataLoader\n", - "\n", - "from dask.distributed import Client\n", - "import dask.dataframe as dd\n", - "import pandas as pd\n", - "\n", - "from dask_ml.preprocessing import MinMaxScaler\n", - "from dask_ml.model_selection import train_test_split\n", - "from dask_ml.linear_model import LinearRegression\n", - "\n", - "from daskdataset import DaskDataset, ShallowNet\n", - "import torch.nn as nn\n", - "import torch.nn.functional as F" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [], - "source": [ - "client = Client()\n", - "\n", - "sample_dataset=\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\"\n", - "\n", - "df = dd.read_parquet(sample_dataset, engine=\"fastparquet\")#.repartition(npartitions=10) #using pyarrow throws error with numpy" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Run this only on the cluster" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client = Client(str(os.getenv('HOSTNAME')) + \"-ib:8786\")\n", - "\n", - "sample_datasets=[\"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet\",\n", - " \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet\",]\n", - "\n", - "df = dd.read_parquet(sample_datasets, engine=\"fastparquet\") " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Convert old Parquet to new Parquet" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2023-11-28 16:43:12,666 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:14,316 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:15,668 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33977 (pid=48276) exceeded 95% memory budget. Restarting...\n", - "2023-11-28 16:43:15,946 - distributed.nanny - WARNING - Restarting worker\n", - "2023-11-28 16:43:21,786 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.74 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:23,443 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.09 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:24,862 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46711 (pid=48247) exceeded 95% memory budget. Restarting...\n", - "2023-11-28 16:43:25,144 - distributed.nanny - WARNING - Restarting worker\n", - "2023-11-28 16:43:31,078 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.76 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:32,615 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.11 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:34,068 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:39735 (pid=48375) exceeded 95% memory budget. Restarting...\n", - "2023-11-28 16:43:34,366 - distributed.nanny - WARNING - Restarting worker\n", - "2023-11-28 16:43:40,997 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:42,760 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:43:44,718 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43435 (pid=48187) exceeded 95% memory budget. Restarting...\n", - "2023-11-28 16:43:45,089 - distributed.nanny - WARNING - Restarting worker\n" - ] - }, - { - "ename": "KilledWorker", - "evalue": "Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Compute the Dask dataframe to get a Pandas dataframe\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m df_pandas \u001b[38;5;241m=\u001b[39m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;66;03m# Create a new Pandas dataframe with the expanded 'features' columns\u001b[39;00m\n\u001b[1;32m 5\u001b[0m features_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mto_list(), columns\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeature_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m i \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(\u001b[38;5;28mlen\u001b[39m(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;241m0\u001b[39m]))])\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:3224\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3222\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3223\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3224\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3225\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3226\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2359\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2358\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2359\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2360\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2361\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2362\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2363\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2364\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2365\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2366\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:351\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 351\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 352\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 353\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:418\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 416\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 417\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 418\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 419\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 420\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:391\u001b[0m, in \u001b[0;36msync..f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 389\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 390\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 391\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 392\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 393\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n", - "File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2222\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2220\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2221\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2222\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2223\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2224\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n", - "\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html." - ] - } - ], - "source": [ - "# Compute the Dask dataframe to get a Pandas dataframe\n", - "df_pandas = df.compute()\n", - "\n", - "# Create a new Pandas dataframe with the expanded 'features' columns\n", - "features_df = pd.DataFrame(df_pandas['features'].to_list(), columns=[f'feature_{i}' for i in range(len(df_pandas['features'][0]))])\n", - "labels_df = pd.DataFrame(df_pandas['labels'].to_list(), columns=[f'label_{i}' for i in range(len(df_pandas['labels'][0]))])\n", - "\n", - "# Concatenate the original dataframe with the expanded features and labels dataframes\n", - "df_pandas = pd.concat([features_df, labels_df], axis=1)\n", - "\n", - "#save df_pandas a parquet\n", - "df_pandas.to_parquet(\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\", engine=\"fastparquet\")" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [], - "source": [ - "#separate the features and labels\n", - "df_labels = df.loc[:, df.columns.str.contains('label')]\n", - "\n", - "# Create a StandardScaler object\n", - "scaler_features = MinMaxScaler()\n", - "df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])\n", - "\n", - "#Split the data into training and test sets\n", - "X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2023-11-28 16:58:28,387 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.16 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:58:29,636 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.26 GiB -- Worker memory limit: 3.86 GiB\n" - ] - } - ], - "source": [ - "X = torch.tensor(X_train.compute().values)\n", - "y = torch.tensor(y_train.compute().values)" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2023-11-28 16:52:11,130 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.15 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 16:52:12,561 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.25 GiB -- Worker memory limit: 3.86 GiB\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "4860\n" - ] - } - ], - "source": [ - "from skorch import NeuralNetRegressor\n", - "import torch.optim as optim\n", - "\n", - "niceties = {\n", - " \"callbacks\": False,\n", - " \"warm_start\": False,\n", - " \"train_split\": None,\n", - " \"max_epochs\": 5,\n", - "}\n", - "\n", - "model = NeuralNetRegressor(\n", - " module=ShallowNet,\n", - " module__n_features=X.size(dim=1),\n", - " criterion=nn.MSELoss,\n", - " optimizer=optim.SGD,\n", - " optimizer__lr=0.1,\n", - " optimizer__momentum=0.9,\n", - " batch_size=64,\n", - " **niceties,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "model.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n", - "model = model.share_memory()" - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Re-initializing module because the following parameters were re-set: n_features.\n", - "Re-initializing criterion.\n", - "Re-initializing optimizer.\n", - " epoch train_loss dur\n", - "------- ------------ ------\n", - " 1 \u001b[36m0.3994\u001b[0m 4.5545\n" - ] - }, - { - "data": { - "text/plain": [ - "[initialized](\n", - " module_=ShallowNet(\n", - " (layer1): Linear(in_features=4860, out_features=8, bias=True)\n", - " ),\n", - ")" - ] - }, - "execution_count": 20, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "model.fit(X, y)" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2023-11-28 17:00:11,618 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n", - "2023-11-28 17:00:13,143 - distributed.worker.memory - WARNING - Worker is at 57% memory usage. Resuming worker. Process memory: 2.22 GiB -- Worker memory limit: 3.86 GiB\n" - ] - } - ], - "source": [ - "test_X = torch.tensor(X_test.compute().values)\n", - "test_y = torch.tensor(y_test.compute().values)" - ] - }, - { - "cell_type": "code", - "execution_count": 22, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "-3.410176609207851\n" - ] - } - ], - "source": [ - "print(model.score(test_X, test_y))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Define the loss function and optimizer\n", - "criterion = torch.nn.CrossEntropyLoss()\n", - "optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n", - "# Move the model to the GPU\n", - "device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n", - "model = model.to(device)\n", - "# Distribute the model across workers\n", - "model = model.share_memory()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Train the model\n", - "for epoch in range(10):\n", - " for batch in dataloader:\n", - " # Split the batch across workers\n", - " batch = [b.to(device) for b in batch]\n", - " futures = client.map(lambda data: model(data[0]), batch)\n", - " # Compute the loss\n", - " losses = client.map(criterion, futures, batch[1])\n", - " loss = client.submit(torch.mean, client.gather(losses))\n", - " # Compute the gradients and update the model parameters\n", - " optimizer.zero_grad()\n", - " gradients = client.map(lambda loss, future: torch.autograd.grad(loss, future)[0], loss, futures)\n", - " gradients =client.submit(torch.mean, client.gather(gradients))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client.map(lambda parameter, gradient: parameter.grad.copy_(gradient), model.parameters(), gradients)\n", - "optimizer.step()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from dask_ml.linear_model import LinearRegression\n", - "from sklearn.linear_model import LinearRegression\n", - "\n", - "# Initialize the Linear Regression model\n", - "model = LinearRegression()\n", - "model.fit(X_train, y_train)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "new_dask_df = df['features'].apply(pd.Series, meta=meta)\n", - "new_dask_df.compute()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Use this code for PyArrow tables\n", - "\n", - "#import pyarrow as pa\n", - "\n", - "# Define schema for features and labels columns\n", - "#schema = pa.schema({\n", - " #'features': pa.list_(pa.float32()),\n", - " #'labels': pa.list_(pa.float32())\n", - "#})\n", - "\n", - "#import pyarrow.parquet as pq\n", - "#df = dd.from_pandas(pq.read_table(sample_dataset, schema=schema).to_pandas(), npartitions=10)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "dask-env", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.18" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/notebooks/local_ray_cluster.ipynb b/notebooks/local_ray_cluster.ipynb new file mode 100644 index 0000000..6a75e54 --- /dev/null +++ b/notebooks/local_ray_cluster.ipynb @@ -0,0 +1,54 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import ray" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ray.init()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cluster_resources = ray.available_resources()\n", + "available_cpu_cores = cluster_resources.get('CPU', 0)\n", + "print(cluster_resources)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ray", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/dask-example-pi.py b/src/dask-example-pi.py deleted file mode 100644 index 354a25c..0000000 --- a/src/dask-example-pi.py +++ /dev/null @@ -1,57 +0,0 @@ -import os -import dask -import random - -from dask.distributed import Client -import dask.dataframe as dd -import pandas as pd - -from dask_ml.preprocessing import MinMaxScaler -from dask_ml.model_selection import train_test_split -from dask_ml.linear_model import LinearRegression - -import torch -from torch.utils.data import Dataset, DataLoader - -client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786") - -sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",] - -#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet" - -df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300) - -df_features = df.loc[:, df.columns.str.contains('feature')] -df_labels = df.loc[:, df.columns.str.contains('label')] - -# Create a StandardScaler object -scaler_features = MinMaxScaler() -df_features_scaled = scaler_features.fit_transform(df_features) - -#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')] - -#Split the data into training and test sets -X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0) - -dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True) - -model = torch.nn.Sequential( - torch.nn.Linear(784, 128), - torch.nn.ReLU(), - torch.nn.Linear(128, 10) -) - -#X_train = X_train.loc[:, X_train.columns.str.contains('feature')] -#y_train = y_train.loc[:, y_train.columns.str.contains('label')] - -# Initialize the Linear Regression model -model = LinearRegression().fit(X_train, y_train) - -score = model.score(X_test, y_test) \ No newline at end of file diff --git a/src/dask-perf-check.py b/src/dask-perf-check.py deleted file mode 100644 index 9a46036..0000000 --- a/src/dask-perf-check.py +++ /dev/null @@ -1,82 +0,0 @@ -import os -import dask -import random -import torch -from torch.utils.data import Dataset, DataLoader -import time - -from dask.distributed import Client -import dask.dataframe as dd -import pandas as pd -from joblib import parallel_backend - -from dask_ml.preprocessing import MinMaxScaler -from dask_ml.model_selection import train_test_split -from dask_ml.linear_model import LinearRegression - -from daskdataset import DaskDataset, ShallowNet - -start_time = time.time() - -client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786") - -sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",] -df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB") -#df_future = client.scatter(df) - -#separate the features and labels -df_labels = df.loc[:, df.columns.str.contains('label')] - -# Create a StandardScaler object -scaler_features = MinMaxScaler() -df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')]) - -#Split the data into training and test sets -X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True) - -X = torch.tensor(X_train.compute().values) -y = torch.tensor(y_train.compute().values) - -from skorch import NeuralNetRegressor -import torch.optim as optim - -niceties = { - "callbacks": False, - "warm_start": False, - "train_split": None, - "max_epochs": 5, -} - -model = NeuralNetRegressor( - module=ShallowNet, - module__n_features=X.size(dim=1), - criterion=nn.MSELoss, - optimizer=optim.SGD, - optimizer__lr=0.1, - optimizer__momentum=0.9, - batch_size=64, - **niceties, -) - -# Initialize the Linear Regression model -model = LinearRegression() -model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True)) - -end_time = time.time() - -print("Time to load data: ", end_time - start_time) - -dask_dataset = DaskDataset(X_train, y_train) -dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True) - -for feature, label in dataloader: - print(feature) - print(label) - break \ No newline at end of file diff --git a/src/daskdataset.py b/src/daskdataset.py deleted file mode 100644 index 34c64b3..0000000 --- a/src/daskdataset.py +++ /dev/null @@ -1,32 +0,0 @@ -import torch -from torch.utils.data import Dataset, DataLoader -import dask.dataframe as dd - -import torch.nn as nn -import torch.nn.functional as F - -# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns - -class DaskDataset(Dataset): - def __init__(self, df_features, df_labels): - self.features = df_features.to_dask_array(lengths=True) - self.labels = df_labels.to_dask_array(lengths=True) - - def __len__(self): - return self.features.size - - def __getitem__(self, idx): - return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values) - -class ShallowNet(nn.Module): - def __init__(self, n_features): - super().__init__() - self.layer1 = nn.Linear(n_features, 128) - self.relu = nn.ReLU() - self.layer2 = nn.Linear(128, 8) - - def forward(self, x): - x = self.layer1(x) - x = self.relu(x) - x = self.layer2(x) - return x \ No newline at end of file diff --git a/src/monte-carlo-pi.py b/src/monte-carlo-pi.py new file mode 100644 index 0000000..79e1f19 --- /dev/null +++ b/src/monte-carlo-pi.py @@ -0,0 +1,74 @@ +# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html + +import ray +import math +import time +import random +import os + +ray.init(address="auto", _node_ip_address=os.environ["IP_ADDRESS"], _redis_password=os.environ["REDIS_PASSWORD"]) + +cluster_resources = ray.available_resources() +available_cpu_cores = cluster_resources.get('CPU', 0) +print(cluster_resources) + +@ray.remote +class ProgressActor: + def __init__(self, total_num_samples: int): + self.total_num_samples = total_num_samples + self.num_samples_completed_per_task = {} + + def report_progress(self, task_id: int, num_samples_completed: int) -> None: + self.num_samples_completed_per_task[task_id] = num_samples_completed + + def get_progress(self) -> float: + return ( + sum(self.num_samples_completed_per_task.values()) / self.total_num_samples + ) + +@ray.remote +def sampling_task(num_samples: int, task_id: int, + progress_actor: ray.actor.ActorHandle) -> int: + num_inside = 0 + for i in range(num_samples): + x, y = random.uniform(-1, 1), random.uniform(-1, 1) + if math.hypot(x, y) <= 1: + num_inside += 1 + + # Report progress every 1 million samples. + if (i + 1) % 1_000_000 == 0: + # This is async. + progress_actor.report_progress.remote(task_id, i + 1) + + # Report the final progress. + progress_actor.report_progress.remote(task_id, num_samples) + return num_inside + +# Change this to match your cluster scale. +NUM_SAMPLING_TASKS = 100 +NUM_SAMPLES_PER_TASK = 10_000_000 +TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK + +# Create the progress actor. +progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES) + +# Create and execute all sampling tasks in parallel. +results = [ + sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor) + for i in range(NUM_SAMPLING_TASKS) +] + +# Query progress periodically. +while True: + progress = ray.get(progress_actor.get_progress.remote()) + print(f"Progress: {int(progress * 100)}%") + + if progress == 1: + break + + time.sleep(1) + +# Get all the sampling tasks results. +total_num_inside = sum(ray.get(results)) +pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES +print(f"Estimated value of π is: {pi}")