Compare commits
14 commits
main
...
ray-tune-t
Author | SHA1 | Date | |
---|---|---|---|
1fd520d225 | |||
5a7702ec67 | |||
ac99d53c62 | |||
4485c24dd7 | |||
6c4b028131 | |||
5a8bf27936 | |||
ef8058ea34 | |||
ad953fd96a | |||
5e25f899fa | |||
c2230177d2 | |||
79f28f8e02 | |||
8474b4328d | |||
bd2a9e7f33 | |||
212441c516 |
18 changed files with 560 additions and 932 deletions
12
.gitignore
vendored
Normal file
12
.gitignore
vendored
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
# Compiled source
|
||||||
|
__pycache__
|
||||||
|
|
||||||
|
|
||||||
|
# Packages
|
||||||
|
*.gz
|
||||||
|
*.rar
|
||||||
|
*.tar
|
||||||
|
*.zip
|
||||||
|
|
||||||
|
# OS generated files
|
||||||
|
.DS_Store
|
181
README.md
181
README.md
|
@ -1,108 +1,123 @@
|
||||||
# Dask: How to execute python workloads using a Dask cluster on Vulcan
|
# Ray: How to launch a Ray Cluster on Hawk?
|
||||||
|
|
||||||
Wiki link:
|
This guide shows you how to launch a Ray cluster on HLRS' Hawk system.
|
||||||
|
|
||||||
Motivation: This document aims to show users how to launch a Dask cluster in our compute platforms and perform a simple workload using it.
|
|
||||||
|
|
||||||
Structure:
|
|
||||||
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
|
|
||||||
- [x] [How-to guide](https://diataxis.fr/how-to-guides/)
|
|
||||||
- [ ] [Reference](https://diataxis.fr/reference/)
|
|
||||||
- [ ] [Explanation](https://diataxis.fr/explanation/)
|
|
||||||
|
|
||||||
To do:
|
|
||||||
- [x] Made scripts for environment creation and deployment in the folder `local_scripts`
|
|
||||||
- [x] Changed scripts to `deployment_scripts`
|
|
||||||
- [x] Added step about sending python file
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
|
|
||||||
|
|
||||||
## Table of Contents
|
## Table of Contents
|
||||||
- [Prerequisites](#prerequisites)
|
- [Ray: How to launch a Ray Cluster on Hawk?](#ray-how-to-launch-a-ray-cluster-on-hawk)
|
||||||
|
- [Table of Contents](#table-of-contents)
|
||||||
- [Getting Started](#getting-started)
|
- [Getting Started](#getting-started)
|
||||||
- [Usage](#usage)
|
- [Launch a local Ray Cluster in Interactive Mode](#launch-a-local-ray-cluster-in-interactive-mode)
|
||||||
- [Notes](#notes)
|
- [Launch a Ray Cluster in Batch Mode](#launch-a-ray-cluster-in-batch-mode)
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
Before running the application, make sure you have the following prerequisites installed in a conda environment:
|
|
||||||
- [Python 3.8.18](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
|
||||||
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
|
||||||
- [Dask](https://dask.org/): Install Dask using conda.
|
|
||||||
- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan.
|
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
1. Clone [this repository](https://code.hlrs.de/hpcrsaxe/spark_template) to your local machine:
|
**Step 1.** Build and transfer the Conda environment to Hawk:
|
||||||
|
|
||||||
|
Only the main and r channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Hawk.
|
||||||
|
|
||||||
|
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment to run Ray workflows.
|
||||||
|
|
||||||
|
**Step 2.** Allocate workspace on Hawk:
|
||||||
|
|
||||||
|
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
ws_allocate hpda_project 10
|
||||||
|
ws_find hpda_project # find the path to workspace, which is the destination directory in the next step
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2.** Clone the repository on Hawk to use the deployment scripts and project structure:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd <workspace_directory>
|
||||||
git clone <repository_url>
|
git clone <repository_url>
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Go into the direcotry and create an environment using Conda and enirvonment.yaml. Note: Be sure to add the necessary packages in environemnt.yaml:
|
## Launch a local Ray Cluster in Interactive Mode
|
||||||
|
|
||||||
|
Using a single node interactively provides opportunities for faster code debugging.
|
||||||
|
|
||||||
|
**Step 1.** On the Hawk login node, start an interactive job using:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./deployment_scripts/create-env.sh <your-env>
|
qsub -I -l select=1:node_type=rome -l walltime=01:00:00
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Send all files using `deploy-env.sh`:
|
**Step 2.** Activate the Conda environment:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./deployment_scripts/deploy-env.sh <your-env> <destination_host>:<destination_directory>
|
# Load the Conda module
|
||||||
|
module load bigdata/conda
|
||||||
|
source activate # activates the base environment
|
||||||
|
|
||||||
|
# List available Conda environments for verification purposes
|
||||||
|
conda env list
|
||||||
|
|
||||||
|
# Activate a specific Conda environment.
|
||||||
|
conda activate ray_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
|
||||||
```
|
```
|
||||||
|
|
||||||
4. Send all the code to the appropriate directory on Vulcan using `scp`:
|
**Step 3.** Initialize the Ray cluster.
|
||||||
|
|
||||||
```bash
|
You can use a Python interpreter to start a local Ray cluster:
|
||||||
scp <your_script>.py <destination_host>:<destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
5. SSH into Vulcan and start a job interatively using:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00
|
|
||||||
```
|
|
||||||
|
|
||||||
6. Go into the directory with all code:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd <destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
7. Initialize the Dask cluster:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
source deploy-dask.sh "$(pwd)"
|
|
||||||
```
|
|
||||||
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
|
|
||||||
Note: Make sure all permissions are set using `chmod +x` for all scripts.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
To run the application interactively, execute the following command after all the cluster's nodes are up and running:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
python
|
|
||||||
```
|
|
||||||
|
|
||||||
Or to run a full script:
|
|
||||||
```bash
|
|
||||||
python <your-script>.py
|
|
||||||
```
|
|
||||||
|
|
||||||
Note: If you don't see your environment in the python interpretor, then manually activate it using:
|
|
||||||
```bash
|
|
||||||
conda activate <your-env>
|
|
||||||
```
|
|
||||||
Do this before using the python interpretor.
|
|
||||||
|
|
||||||
## Notes
|
|
||||||
|
|
||||||
Note: Dask Cluster is set to verbose, add the following to your code while connecting to the Dask cluster:
|
|
||||||
```python
|
```python
|
||||||
client = Client(..., silence_logs='error')
|
import ray
|
||||||
|
|
||||||
|
ray.init()
|
||||||
```
|
```
|
||||||
|
|
||||||
Note: Replace all filenames within `<>` with the actual values applicable to your project.
|
**Step 4.** Connect to the dashboard.
|
||||||
|
|
||||||
|
Warning: Do not change the default dashboard host `127.0.0.1` to keep Ray cluster reachable by only you.
|
||||||
|
|
||||||
|
Note: We recommend using a dedicated Firefox profile for accessing web-based services on HLRS Compute Platforms. If you haven't created a profile, check out our [guide](https://kb.hlrs.de/platforms/index.php/How_to_use_Web_Based_Services_on_HLRS_Compute_Platforms).
|
||||||
|
|
||||||
|
You need the job id and the hostname for your current job. You can obtain this information on the login node using:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
qstat -anw # get the job id and the hostname
|
||||||
|
```
|
||||||
|
|
||||||
|
Then, on your local computer,
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export PBS_JOBID=<job-id> # e.g., 2316419.hawk-pbs5
|
||||||
|
ssh <compute-host> # e.g., r38c3t8n3
|
||||||
|
```
|
||||||
|
|
||||||
|
Check your SSH config in the first step if this doesn't work.
|
||||||
|
|
||||||
|
Then, launch Firefox web browser using the configured profile. Open `localhost:8265` to access the Ray dashboard.
|
||||||
|
|
||||||
|
## Launch a Ray Cluster in Batch Mode
|
||||||
|
|
||||||
|
Let us [estimate the value of π](https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html) as an example application.
|
||||||
|
|
||||||
|
**Step 1.** Add execution permissions to `start-ray-worker.sh`
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd deployment_scripts
|
||||||
|
chmod +x start-ray-worker.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2.** Submit a job to launch the head and worker nodes.
|
||||||
|
|
||||||
|
You must modify the following lines in `submit-ray-job.sh`:
|
||||||
|
- Line 3 changes the cluster size. The default configuration launches a 3 node cluster.
|
||||||
|
- `export WS_DIR=<workspace_dir>` - set the correct workspace directory.
|
||||||
|
- `export PROJECT_DIR=$WS_DIR/<project_name>` - set the correct project directory.
|
||||||
|
|
||||||
|
Note: The job script `src/monte-carlo-pi.py` waits for all nodes in the Ray cluster to become available. Preserve this pattern in your Python code while using a multiple node Ray cluster.
|
||||||
|
|
||||||
|
Launch the job and monitor the progress. As the job starts, its status (S) shifts from Q (Queued) to R (Running). Upon completion, the job will no longer appear in the `qstat -a` display.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
qsub submit-ray-job.pbs
|
||||||
|
qstat -anw # Q: Queued, R: Running, E: Ending
|
||||||
|
ls -l # list files after the job finishes
|
||||||
|
cat ray-job.o... # inspect the output file
|
||||||
|
cat ray-job.e... # inspect the error file
|
||||||
|
```
|
||||||
|
|
||||||
|
If you need to delete the job, use `qdel <job-id>`. If this doesn't work, use the `-W force` option: `qdel -W force <job-id>`
|
Binary file not shown.
|
@ -1,104 +0,0 @@
|
||||||
# Reference Guide: Dask Cluster Deployment Scripts
|
|
||||||
|
|
||||||
Wiki link:
|
|
||||||
|
|
||||||
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
|
|
||||||
|
|
||||||
Structure:
|
|
||||||
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
|
|
||||||
- [ ] [How-to guide](https://diataxis.fr/how-to-guides/)
|
|
||||||
- [x] [Reference](https://diataxis.fr/reference/)
|
|
||||||
- [ ] [Explanation](https://diataxis.fr/explanation/)
|
|
||||||
|
|
||||||
To do:
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script:
|
|
||||||
|
|
||||||
### Note: Permissions
|
|
||||||
|
|
||||||
Ensure that execution permissions (`chmod +x`) are granted to these scripts before attempting to run them. This can be done using the following command:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
chmod +x script_name.sh
|
|
||||||
```
|
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
Before using these scripts, ensure that the following prerequisites are met:
|
|
||||||
|
|
||||||
1. **Conda Installation**: Ensure that Conda is installed on your local system. Follow the [official Conda installation guide](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html) if not already installed.
|
|
||||||
|
|
||||||
2. **PBS Job Scheduler**: The deployment scripts (`deploy-dask.sh` and `dask-worker.sh`) are designed for use with the PBS job scheduler. Modify accordingly if using a different job scheduler.
|
|
||||||
|
|
||||||
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
|
|
||||||
|
|
||||||
## 1. create-env.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`create-env.sh` is designed to create a Conda environment. It checks for the existence of the specified environment and either creates it or notifies the user if it already exists.
|
|
||||||
Note: Define your Conda environment in `environment.yaml` before running this script.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./create-env.sh <conda_environment_name>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system where Conda is installed.
|
|
||||||
|
|
||||||
## 2. deploy-env.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`deploy-env.sh` is responsible for deploying the Conda environment to a remote server. If the tar.gz file already exists, it is copied; otherwise, it is created before being transferred.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./deploy-env.sh <environment_name> <destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system.
|
|
||||||
|
|
||||||
## 3. deploy-dask.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`deploy-dask.sh` initiates the Dask cluster on an HPC environment using the PBS job scheduler. It extracts the Conda environment, activates it, and starts the Dask scheduler and workers on allocated nodes.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./deploy-dask.sh <current_workspace_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Notes
|
|
||||||
|
|
||||||
- This script is designed for an HPC environment with PBS job scheduling.
|
|
||||||
- Modifications may be necessary for different job schedulers.
|
|
||||||
|
|
||||||
## 4. dask-worker.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`dask-worker.sh` is a worker script designed to be executed on each allocated node. It sets up the Dask environment, extracts the Conda environment, activates it, and starts the Dask worker to connect to the scheduler. This script is not directly executed by the user.
|
|
||||||
|
|
||||||
### Notes
|
|
||||||
|
|
||||||
- Execute this script on each allocated node to connect them to the Dask scheduler.
|
|
||||||
- Designed for use with PBS job scheduling.
|
|
||||||
|
|
||||||
## Workflow
|
|
||||||
|
|
||||||
1. **Create Conda Environment**: Execute `create-env.sh` to create a Conda environment locally.
|
|
||||||
2. **Deploy Conda Environment**: Execute `deploy-env.sh` to deploy the Conda environment to a remote server.
|
|
||||||
3. **Deploy Dask Cluster**: Execute `deploy-dask.sh` to start the Dask cluster on an HPC environment.
|
|
|
@ -1,25 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Display usage
|
|
||||||
if [ "$#" -ne 1 ]; then
|
|
||||||
echo "Usage: $0 <conda_environment_name>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Name of the Conda environment
|
|
||||||
CONDA_ENV_NAME=$1
|
|
||||||
|
|
||||||
# Check if the Conda environment already exists
|
|
||||||
if conda env list | grep -q "$CONDA_ENV_NAME"; then
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' already exists."
|
|
||||||
|
|
||||||
else
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
|
|
||||||
|
|
||||||
# Create Conda environment
|
|
||||||
conda env create --name $CONDA_ENV_NAME -f environment.yaml
|
|
||||||
|
|
||||||
echo "Conda environment '$CONDA_ENV_NAME' created."
|
|
||||||
fi
|
|
|
@ -1,31 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
#Get the current workspace directory and the master node
|
|
||||||
export CURRENT_WORKSPACE=$1
|
|
||||||
export DASK_SCHEDULER_HOST=$2
|
|
||||||
|
|
||||||
# Path to localscratch
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
|
||||||
export DASK_ENV="/localscratch/${PBS_JOBID}/dask"
|
|
||||||
mkdir -p $DASK_ENV
|
|
||||||
|
|
||||||
# Extract Dask environment in localscratch
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
|
|
||||||
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
|
||||||
chmod -R 700 $DASK_ENV
|
|
||||||
|
|
||||||
# Start the dask environment
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
|
||||||
source $DASK_ENV/bin/activate
|
|
||||||
conda-unpack
|
|
||||||
|
|
||||||
# Start Dask worker
|
|
||||||
export DASK_SCHEDULER_PORT="8786" # Replace with the port on which the Dask scheduler is running
|
|
||||||
|
|
||||||
# Additional Dask worker options can be added here if needed
|
|
||||||
# Change local directory if memory is an issue
|
|
||||||
|
|
||||||
# Change directory to localscratch and start Dask worker
|
|
||||||
cd $DASK_ENV
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Starting Dask worker at $DASK_SCHEDULER_HOST on port $DASK_SCHEDULER_PORT"
|
|
||||||
dask worker $DASK_SCHEDULER_HOST:$DASK_SCHEDULER_PORT
|
|
|
@ -1,54 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
export CURRENT_WORKSPACE=$1
|
|
||||||
|
|
||||||
# Check if running in a PBS Job environment
|
|
||||||
if [ -z ${PBS_NODEFILE+x} ]; then
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] ERROR: This script is meant to run as a part of PBS Job. Don't start it at login nodes."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
export NUM_NODES=$(wc -l < $PBS_NODEFILE)
|
|
||||||
|
|
||||||
if [ $NUM_NODES -lt 2 ]; then
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] WARNING: You have a single node job running. Dask cluster requires at least 2 nodes."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
export ALL_NODES=$(cat $PBS_NODEFILE)
|
|
||||||
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
|
|
||||||
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
|
|
||||||
|
|
||||||
export DASK_SCHEDULER_PORT=8786
|
|
||||||
export DASK_UI_PORT=8787
|
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
|
|
||||||
# Path to localscratch
|
|
||||||
export DASK_ENV="/localscratch/${PBS_JOBID}/dask"
|
|
||||||
mkdir -p $DASK_ENV
|
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
|
|
||||||
# Extract Dask environment in localscratch
|
|
||||||
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
|
||||||
chmod -R 700 $DASK_ENV
|
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Setting up Dask environment"
|
|
||||||
# Start the dask environment
|
|
||||||
source $DASK_ENV/bin/activate
|
|
||||||
conda-unpack
|
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Scheduler at $SCHEDULER_NODE on port $DASK_SCHEDULER_PORT"
|
|
||||||
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
|
|
||||||
|
|
||||||
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
|
||||||
|
|
||||||
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
|
|
||||||
for ((i=1;i<$NUM_NODES;i++)); do
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
|
|
||||||
pbsdsh -n $i -o -- bash -l -c "source $CURRENT_WORKSPACE/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
|
|
||||||
done
|
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Dask cluster ready, wait for workers to connect to the scheduler."
|
|
||||||
|
|
||||||
# Optionally, you can provide a script for the workers to execute using ssh, similar to Spark.
|
|
||||||
# Example: ssh $node "source activate your_conda_env && python your_dask_worker_script.py" &
|
|
|
@ -1,41 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Check if a destination and environment name are provided
|
|
||||||
if [ "$#" -ne 2 ]; then
|
|
||||||
echo "Usage: $0 <environment_name> <destination_directory>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Name of the Conda environment
|
|
||||||
CONDA_ENV_NAME="$1"
|
|
||||||
TAR_FILE="$CONDA_ENV_NAME.tar.gz"
|
|
||||||
|
|
||||||
# Check if the tar.gz file already exists
|
|
||||||
if [ -e "$TAR_FILE" ]; then
|
|
||||||
echo "Using existing $TAR_FILE"
|
|
||||||
else
|
|
||||||
# Pack the Conda environment if the file doesn't exist
|
|
||||||
conda pack -n "$CONDA_ENV_NAME" -o "$TAR_FILE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Parse the destination host and directory
|
|
||||||
DESTINATION=$2
|
|
||||||
IFS=':' read -ra DEST <<< "$DESTINATION"
|
|
||||||
DEST_HOST="${DEST[0]}"
|
|
||||||
DEST_DIR="${DEST[1]}"
|
|
||||||
|
|
||||||
# Copy the environment tarball to the remote server
|
|
||||||
scp "$TAR_FILE" "$DEST_HOST":"$DEST_DIR"
|
|
||||||
scp deploy-dask.sh "$DEST_HOST":"$DEST_DIR"
|
|
||||||
scp dask-worker.sh "$DEST_HOST":"$DEST_DIR"
|
|
||||||
|
|
||||||
echo "Conda environment '$CONDA_ENV_NAME' packed and deployed to '$DEST_HOST:$DEST_DIR' as '$TAR_FILE'."
|
|
||||||
|
|
||||||
# Ask the user if they want to delete the tar.gz file
|
|
||||||
read -p "Do you want to delete the local tar.gz file? (y/n): " answer
|
|
||||||
if [ "$answer" == "y" ]; then
|
|
||||||
rm "$TAR_FILE"
|
|
||||||
echo "Local tar.gz file deleted."
|
|
||||||
else
|
|
||||||
echo "Local tar.gz file not deleted."
|
|
||||||
fi
|
|
|
@ -1,9 +0,0 @@
|
||||||
channels:
|
|
||||||
- defaults
|
|
||||||
- conda-forge
|
|
||||||
dependencies:
|
|
||||||
- python=3.8.18
|
|
||||||
- dask
|
|
||||||
- numpy
|
|
||||||
- scikit-learn
|
|
||||||
- conda-pack
|
|
19
deployment_scripts/start-ray-worker.sh
Normal file
19
deployment_scripts/start-ray-worker.sh
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
if [ $# -ne 5 ]; then
|
||||||
|
echo "Usage: $0 <ws_dir> <env_path> <ray_address> <redis_password> <obj_store_memory>"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
export WS_DIR=$1
|
||||||
|
export ENV_PATH=$2
|
||||||
|
export RAY_ADDRESS=$3
|
||||||
|
export REDIS_PASSWORD=$4
|
||||||
|
export OBJECT_STORE_MEMORY=$5
|
||||||
|
|
||||||
|
source $ENV_PATH/bin/activate
|
||||||
|
|
||||||
|
ray start --address=$RAY_ADDRESS \
|
||||||
|
--redis-password=$REDIS_PASSWORD \
|
||||||
|
--object-store-memory=$OBJECT_STORE_MEMORY \
|
||||||
|
--block
|
44
deployment_scripts/submit-ray-job.pbs
Normal file
44
deployment_scripts/submit-ray-job.pbs
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#PBS -N ray-job
|
||||||
|
#PBS -l select=2:node_type=rome
|
||||||
|
#PBS -l walltime=1:00:00
|
||||||
|
|
||||||
|
export WS_DIR=<workspace_dir>
|
||||||
|
export PROJECT_DIR=$WS_DIR/<project_name>
|
||||||
|
export ENV_PATH=<env_path>
|
||||||
|
export JOB_SCRIPT=monte-carlo-pi.py
|
||||||
|
|
||||||
|
export OBJECT_STORE_MEMORY=128000000000
|
||||||
|
|
||||||
|
# Environment variables after this line should not change
|
||||||
|
|
||||||
|
export SRC_DIR=$PROJECT_DIR/src
|
||||||
|
export PYTHON_FILE=$SRC_DIR/$JOB_SCRIPT
|
||||||
|
export DEPLOYMENT_SCRIPTS=$PROJECT_DIR/deployment_scripts
|
||||||
|
|
||||||
|
source $ENV_PATH/bin/activate
|
||||||
|
|
||||||
|
export IP_ADDRESS=`ip addr show ib0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}' | awk '{print $1}'`
|
||||||
|
|
||||||
|
export RAY_ADDRESS=$IP_ADDRESS:6379
|
||||||
|
export REDIS_PASSWORD=$(openssl rand -base64 32)
|
||||||
|
|
||||||
|
export NCCL_DEBUG=INFO
|
||||||
|
|
||||||
|
ray start --disable-usage-stats \
|
||||||
|
--head \
|
||||||
|
--node-ip-address=$IP_ADDRESS \
|
||||||
|
--port=6379 \
|
||||||
|
--dashboard-host=127.0.0.1 \
|
||||||
|
--redis-password=$REDIS_PASSWORD \
|
||||||
|
--object-store-memory=$OBJECT_STORE_MEMORY
|
||||||
|
|
||||||
|
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
||||||
|
|
||||||
|
for ((i=1;i<$NUM_NODES;i++)); do
|
||||||
|
pbsdsh -n $i -- bash -l -c "'$DEPLOYMENT_SCRIPTS/start-ray-worker.sh' '$WS_DIR' '$ENV_PATH' '$RAY_ADDRESS' '$REDIS_PASSWORD' '$OBJECT_STORE_MEMORY'" &
|
||||||
|
done
|
||||||
|
|
||||||
|
python3 $PYTHON_FILE
|
||||||
|
|
||||||
|
ray stop --grace-period 30
|
|
@ -1,413 +0,0 @@
|
||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 1,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import os\n",
|
|
||||||
"import dask\n",
|
|
||||||
"import random\n",
|
|
||||||
"import torch\n",
|
|
||||||
"from torch.utils.data import Dataset, DataLoader\n",
|
|
||||||
"\n",
|
|
||||||
"from dask.distributed import Client\n",
|
|
||||||
"import dask.dataframe as dd\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"\n",
|
|
||||||
"from dask_ml.preprocessing import MinMaxScaler\n",
|
|
||||||
"from dask_ml.model_selection import train_test_split\n",
|
|
||||||
"from dask_ml.linear_model import LinearRegression\n",
|
|
||||||
"\n",
|
|
||||||
"from daskdataset import DaskDataset, ShallowNet\n",
|
|
||||||
"import torch.nn as nn\n",
|
|
||||||
"import torch.nn.functional as F"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 9,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client = Client()\n",
|
|
||||||
"\n",
|
|
||||||
"sample_dataset=\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\"\n",
|
|
||||||
"\n",
|
|
||||||
"df = dd.read_parquet(sample_dataset, engine=\"fastparquet\")#.repartition(npartitions=10) #using pyarrow throws error with numpy"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Run this only on the cluster"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client = Client(str(os.getenv('HOSTNAME')) + \"-ib:8786\")\n",
|
|
||||||
"\n",
|
|
||||||
"sample_datasets=[\"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet\",]\n",
|
|
||||||
"\n",
|
|
||||||
"df = dd.read_parquet(sample_datasets, engine=\"fastparquet\") "
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Convert old Parquet to new Parquet"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 8,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:43:12,666 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:14,316 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:15,668 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33977 (pid=48276) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:15,946 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:21,786 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.74 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:23,443 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.09 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:24,862 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46711 (pid=48247) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:25,144 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:31,078 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.76 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:32,615 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.11 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:34,068 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:39735 (pid=48375) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:34,366 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:40,997 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:42,760 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:44,718 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43435 (pid=48187) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:45,089 - distributed.nanny - WARNING - Restarting worker\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"ename": "KilledWorker",
|
|
||||||
"evalue": "Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.",
|
|
||||||
"output_type": "error",
|
|
||||||
"traceback": [
|
|
||||||
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
|
||||||
"\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)",
|
|
||||||
"Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Compute the Dask dataframe to get a Pandas dataframe\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m df_pandas \u001b[38;5;241m=\u001b[39m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;66;03m# Create a new Pandas dataframe with the expanded 'features' columns\u001b[39;00m\n\u001b[1;32m 5\u001b[0m features_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mto_list(), columns\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeature_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m i \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(\u001b[38;5;28mlen\u001b[39m(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;241m0\u001b[39m]))])\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:3224\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3222\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3223\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3224\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3225\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3226\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2359\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2358\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2359\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2360\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2361\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2362\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2363\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2364\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2365\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2366\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:351\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 351\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 352\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 353\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:418\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 416\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 417\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 418\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 419\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 420\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:391\u001b[0m, in \u001b[0;36msync.<locals>.f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 389\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 390\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 391\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 392\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 393\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2222\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2220\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2221\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2222\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2223\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2224\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n",
|
|
||||||
"\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html."
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"# Compute the Dask dataframe to get a Pandas dataframe\n",
|
|
||||||
"df_pandas = df.compute()\n",
|
|
||||||
"\n",
|
|
||||||
"# Create a new Pandas dataframe with the expanded 'features' columns\n",
|
|
||||||
"features_df = pd.DataFrame(df_pandas['features'].to_list(), columns=[f'feature_{i}' for i in range(len(df_pandas['features'][0]))])\n",
|
|
||||||
"labels_df = pd.DataFrame(df_pandas['labels'].to_list(), columns=[f'label_{i}' for i in range(len(df_pandas['labels'][0]))])\n",
|
|
||||||
"\n",
|
|
||||||
"# Concatenate the original dataframe with the expanded features and labels dataframes\n",
|
|
||||||
"df_pandas = pd.concat([features_df, labels_df], axis=1)\n",
|
|
||||||
"\n",
|
|
||||||
"#save df_pandas a parquet\n",
|
|
||||||
"df_pandas.to_parquet(\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\", engine=\"fastparquet\")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 10,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#separate the features and labels\n",
|
|
||||||
"df_labels = df.loc[:, df.columns.str.contains('label')]\n",
|
|
||||||
"\n",
|
|
||||||
"# Create a StandardScaler object\n",
|
|
||||||
"scaler_features = MinMaxScaler()\n",
|
|
||||||
"df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])\n",
|
|
||||||
"\n",
|
|
||||||
"#Split the data into training and test sets\n",
|
|
||||||
"X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 19,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:58:28,387 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.16 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:58:29,636 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.26 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"X = torch.tensor(X_train.compute().values)\n",
|
|
||||||
"y = torch.tensor(y_train.compute().values)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 14,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:52:11,130 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.15 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:52:12,561 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.25 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"4860\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"from skorch import NeuralNetRegressor\n",
|
|
||||||
"import torch.optim as optim\n",
|
|
||||||
"\n",
|
|
||||||
"niceties = {\n",
|
|
||||||
" \"callbacks\": False,\n",
|
|
||||||
" \"warm_start\": False,\n",
|
|
||||||
" \"train_split\": None,\n",
|
|
||||||
" \"max_epochs\": 5,\n",
|
|
||||||
"}\n",
|
|
||||||
"\n",
|
|
||||||
"model = NeuralNetRegressor(\n",
|
|
||||||
" module=ShallowNet,\n",
|
|
||||||
" module__n_features=X.size(dim=1),\n",
|
|
||||||
" criterion=nn.MSELoss,\n",
|
|
||||||
" optimizer=optim.SGD,\n",
|
|
||||||
" optimizer__lr=0.1,\n",
|
|
||||||
" optimizer__momentum=0.9,\n",
|
|
||||||
" batch_size=64,\n",
|
|
||||||
" **niceties,\n",
|
|
||||||
")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"model.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
|
|
||||||
"model = model.share_memory()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 20,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"Re-initializing module because the following parameters were re-set: n_features.\n",
|
|
||||||
"Re-initializing criterion.\n",
|
|
||||||
"Re-initializing optimizer.\n",
|
|
||||||
" epoch train_loss dur\n",
|
|
||||||
"------- ------------ ------\n",
|
|
||||||
" 1 \u001b[36m0.3994\u001b[0m 4.5545\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"text/plain": [
|
|
||||||
"<class 'skorch.regressor.NeuralNetRegressor'>[initialized](\n",
|
|
||||||
" module_=ShallowNet(\n",
|
|
||||||
" (layer1): Linear(in_features=4860, out_features=8, bias=True)\n",
|
|
||||||
" ),\n",
|
|
||||||
")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"execution_count": 20,
|
|
||||||
"metadata": {},
|
|
||||||
"output_type": "execute_result"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"model.fit(X, y)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 21,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 17:00:11,618 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 17:00:13,143 - distributed.worker.memory - WARNING - Worker is at 57% memory usage. Resuming worker. Process memory: 2.22 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"test_X = torch.tensor(X_test.compute().values)\n",
|
|
||||||
"test_y = torch.tensor(y_test.compute().values)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 22,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"-3.410176609207851\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"print(model.score(test_X, test_y))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Define the loss function and optimizer\n",
|
|
||||||
"criterion = torch.nn.CrossEntropyLoss()\n",
|
|
||||||
"optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n",
|
|
||||||
"# Move the model to the GPU\n",
|
|
||||||
"device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
|
|
||||||
"model = model.to(device)\n",
|
|
||||||
"# Distribute the model across workers\n",
|
|
||||||
"model = model.share_memory()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Train the model\n",
|
|
||||||
"for epoch in range(10):\n",
|
|
||||||
" for batch in dataloader:\n",
|
|
||||||
" # Split the batch across workers\n",
|
|
||||||
" batch = [b.to(device) for b in batch]\n",
|
|
||||||
" futures = client.map(lambda data: model(data[0]), batch)\n",
|
|
||||||
" # Compute the loss\n",
|
|
||||||
" losses = client.map(criterion, futures, batch[1])\n",
|
|
||||||
" loss = client.submit(torch.mean, client.gather(losses))\n",
|
|
||||||
" # Compute the gradients and update the model parameters\n",
|
|
||||||
" optimizer.zero_grad()\n",
|
|
||||||
" gradients = client.map(lambda loss, future: torch.autograd.grad(loss, future)[0], loss, futures)\n",
|
|
||||||
" gradients =client.submit(torch.mean, client.gather(gradients))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client.map(lambda parameter, gradient: parameter.grad.copy_(gradient), model.parameters(), gradients)\n",
|
|
||||||
"optimizer.step()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from dask_ml.linear_model import LinearRegression\n",
|
|
||||||
"from sklearn.linear_model import LinearRegression\n",
|
|
||||||
"\n",
|
|
||||||
"# Initialize the Linear Regression model\n",
|
|
||||||
"model = LinearRegression()\n",
|
|
||||||
"model.fit(X_train, y_train)\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"new_dask_df = df['features'].apply(pd.Series, meta=meta)\n",
|
|
||||||
"new_dask_df.compute()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Use this code for PyArrow tables\n",
|
|
||||||
"\n",
|
|
||||||
"#import pyarrow as pa\n",
|
|
||||||
"\n",
|
|
||||||
"# Define schema for features and labels columns\n",
|
|
||||||
"#schema = pa.schema({\n",
|
|
||||||
" #'features': pa.list_(pa.float32()),\n",
|
|
||||||
" #'labels': pa.list_(pa.float32())\n",
|
|
||||||
"#})\n",
|
|
||||||
"\n",
|
|
||||||
"#import pyarrow.parquet as pq\n",
|
|
||||||
"#df = dd.from_pandas(pq.read_table(sample_dataset, schema=schema).to_pandas(), npartitions=10)"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "dask-env",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.8.18"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
54
notebooks/local_ray_cluster.ipynb
Normal file
54
notebooks/local_ray_cluster.ipynb
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"import ray"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"ray.init()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"cluster_resources = ray.available_resources()\n",
|
||||||
|
"available_cpu_cores = cluster_resources.get('CPU', 0)\n",
|
||||||
|
"print(cluster_resources)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "ray",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.10.13"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
|
@ -1,57 +0,0 @@
|
||||||
import os
|
|
||||||
import dask
|
|
||||||
import random
|
|
||||||
|
|
||||||
from dask.distributed import Client
|
|
||||||
import dask.dataframe as dd
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
from dask_ml.preprocessing import MinMaxScaler
|
|
||||||
from dask_ml.model_selection import train_test_split
|
|
||||||
from dask_ml.linear_model import LinearRegression
|
|
||||||
|
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
|
|
||||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
|
||||||
|
|
||||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
|
||||||
|
|
||||||
#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet"
|
|
||||||
|
|
||||||
df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300)
|
|
||||||
|
|
||||||
df_features = df.loc[:, df.columns.str.contains('feature')]
|
|
||||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Create a StandardScaler object
|
|
||||||
scaler_features = MinMaxScaler()
|
|
||||||
df_features_scaled = scaler_features.fit_transform(df_features)
|
|
||||||
|
|
||||||
#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')]
|
|
||||||
|
|
||||||
#Split the data into training and test sets
|
|
||||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0)
|
|
||||||
|
|
||||||
dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True)
|
|
||||||
|
|
||||||
model = torch.nn.Sequential(
|
|
||||||
torch.nn.Linear(784, 128),
|
|
||||||
torch.nn.ReLU(),
|
|
||||||
torch.nn.Linear(128, 10)
|
|
||||||
)
|
|
||||||
|
|
||||||
#X_train = X_train.loc[:, X_train.columns.str.contains('feature')]
|
|
||||||
#y_train = y_train.loc[:, y_train.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Initialize the Linear Regression model
|
|
||||||
model = LinearRegression().fit(X_train, y_train)
|
|
||||||
|
|
||||||
score = model.score(X_test, y_test)
|
|
|
@ -1,82 +0,0 @@
|
||||||
import os
|
|
||||||
import dask
|
|
||||||
import random
|
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
import time
|
|
||||||
|
|
||||||
from dask.distributed import Client
|
|
||||||
import dask.dataframe as dd
|
|
||||||
import pandas as pd
|
|
||||||
from joblib import parallel_backend
|
|
||||||
|
|
||||||
from dask_ml.preprocessing import MinMaxScaler
|
|
||||||
from dask_ml.model_selection import train_test_split
|
|
||||||
from dask_ml.linear_model import LinearRegression
|
|
||||||
|
|
||||||
from daskdataset import DaskDataset, ShallowNet
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
|
||||||
|
|
||||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
|
||||||
df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB")
|
|
||||||
#df_future = client.scatter(df)
|
|
||||||
|
|
||||||
#separate the features and labels
|
|
||||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Create a StandardScaler object
|
|
||||||
scaler_features = MinMaxScaler()
|
|
||||||
df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])
|
|
||||||
|
|
||||||
#Split the data into training and test sets
|
|
||||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)
|
|
||||||
|
|
||||||
X = torch.tensor(X_train.compute().values)
|
|
||||||
y = torch.tensor(y_train.compute().values)
|
|
||||||
|
|
||||||
from skorch import NeuralNetRegressor
|
|
||||||
import torch.optim as optim
|
|
||||||
|
|
||||||
niceties = {
|
|
||||||
"callbacks": False,
|
|
||||||
"warm_start": False,
|
|
||||||
"train_split": None,
|
|
||||||
"max_epochs": 5,
|
|
||||||
}
|
|
||||||
|
|
||||||
model = NeuralNetRegressor(
|
|
||||||
module=ShallowNet,
|
|
||||||
module__n_features=X.size(dim=1),
|
|
||||||
criterion=nn.MSELoss,
|
|
||||||
optimizer=optim.SGD,
|
|
||||||
optimizer__lr=0.1,
|
|
||||||
optimizer__momentum=0.9,
|
|
||||||
batch_size=64,
|
|
||||||
**niceties,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Initialize the Linear Regression model
|
|
||||||
model = LinearRegression()
|
|
||||||
model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True))
|
|
||||||
|
|
||||||
end_time = time.time()
|
|
||||||
|
|
||||||
print("Time to load data: ", end_time - start_time)
|
|
||||||
|
|
||||||
dask_dataset = DaskDataset(X_train, y_train)
|
|
||||||
dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True)
|
|
||||||
|
|
||||||
for feature, label in dataloader:
|
|
||||||
print(feature)
|
|
||||||
print(label)
|
|
||||||
break
|
|
|
@ -1,32 +0,0 @@
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
import dask.dataframe as dd
|
|
||||||
|
|
||||||
import torch.nn as nn
|
|
||||||
import torch.nn.functional as F
|
|
||||||
|
|
||||||
# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns
|
|
||||||
|
|
||||||
class DaskDataset(Dataset):
|
|
||||||
def __init__(self, df_features, df_labels):
|
|
||||||
self.features = df_features.to_dask_array(lengths=True)
|
|
||||||
self.labels = df_labels.to_dask_array(lengths=True)
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
return self.features.size
|
|
||||||
|
|
||||||
def __getitem__(self, idx):
|
|
||||||
return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values)
|
|
||||||
|
|
||||||
class ShallowNet(nn.Module):
|
|
||||||
def __init__(self, n_features):
|
|
||||||
super().__init__()
|
|
||||||
self.layer1 = nn.Linear(n_features, 128)
|
|
||||||
self.relu = nn.ReLU()
|
|
||||||
self.layer2 = nn.Linear(128, 8)
|
|
||||||
|
|
||||||
def forward(self, x):
|
|
||||||
x = self.layer1(x)
|
|
||||||
x = self.relu(x)
|
|
||||||
x = self.layer2(x)
|
|
||||||
return x
|
|
89
src/monte-carlo-pi.py
Normal file
89
src/monte-carlo-pi.py
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
# Adopted from: https://docs.ray.io/en/releases-2.8.0/ray-core/examples/monte_carlo_pi.html
|
||||||
|
|
||||||
|
import ray
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Change this to match your cluster scale.
|
||||||
|
NUM_SAMPLING_TASKS = 100
|
||||||
|
NUM_SAMPLES_PER_TASK = 10_000_000
|
||||||
|
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
|
||||||
|
|
||||||
|
@ray.remote
|
||||||
|
class ProgressActor:
|
||||||
|
def __init__(self, total_num_samples: int):
|
||||||
|
self.total_num_samples = total_num_samples
|
||||||
|
self.num_samples_completed_per_task = {}
|
||||||
|
|
||||||
|
def report_progress(self, task_id: int, num_samples_completed: int) -> None:
|
||||||
|
self.num_samples_completed_per_task[task_id] = num_samples_completed
|
||||||
|
|
||||||
|
def get_progress(self) -> float:
|
||||||
|
return (
|
||||||
|
sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
|
||||||
|
)
|
||||||
|
|
||||||
|
@ray.remote
|
||||||
|
def sampling_task(num_samples: int, task_id: int,
|
||||||
|
progress_actor: ray.actor.ActorHandle) -> int:
|
||||||
|
num_inside = 0
|
||||||
|
for i in range(num_samples):
|
||||||
|
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
|
||||||
|
if math.hypot(x, y) <= 1:
|
||||||
|
num_inside += 1
|
||||||
|
|
||||||
|
# Report progress every 1 million samples.
|
||||||
|
if (i + 1) % 1_000_000 == 0:
|
||||||
|
# This is async.
|
||||||
|
progress_actor.report_progress.remote(task_id, i + 1)
|
||||||
|
|
||||||
|
# Report the final progress.
|
||||||
|
progress_actor.report_progress.remote(task_id, num_samples)
|
||||||
|
return num_inside
|
||||||
|
|
||||||
|
def wait_for_nodes(expected_num_nodes: int):
|
||||||
|
while True:
|
||||||
|
num_nodes = len(ray.nodes())
|
||||||
|
if num_nodes >= expected_num_nodes:
|
||||||
|
break
|
||||||
|
print(f'Currently {num_nodes} nodes connected. Waiting for more...')
|
||||||
|
time.sleep(5) # wait for 5 seconds before checking again
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
num_nodes = int(os.environ["NUM_NODES"])
|
||||||
|
assert num_nodes > 1, "If the environment variable NUM_NODES is set, it should be greater than 1."
|
||||||
|
|
||||||
|
redis_password = os.environ["REDIS_PASSWORD"]
|
||||||
|
ray.init(address="auto", _redis_password=redis_password)
|
||||||
|
|
||||||
|
wait_for_nodes(num_nodes)
|
||||||
|
|
||||||
|
cluster_resources = ray.available_resources()
|
||||||
|
print(cluster_resources)
|
||||||
|
|
||||||
|
# Create the progress actor.
|
||||||
|
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
|
||||||
|
|
||||||
|
# Create and execute all sampling tasks in parallel.
|
||||||
|
results = [
|
||||||
|
sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
|
||||||
|
for i in range(NUM_SAMPLING_TASKS)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Query progress periodically.
|
||||||
|
while True:
|
||||||
|
progress = ray.get(progress_actor.get_progress.remote())
|
||||||
|
print(f"Progress: {int(progress * 100)}%")
|
||||||
|
|
||||||
|
if progress == 1:
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Get all the sampling tasks results.
|
||||||
|
total_num_inside = sum(ray.get(results))
|
||||||
|
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
|
||||||
|
print(f"Estimated value of π is: {pi}")
|
243
src/ray-tune-keras-cifar10.py
Normal file
243
src/ray-tune-keras-cifar10.py
Normal file
|
@ -0,0 +1,243 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""Train keras CNN on the CIFAR10 small images dataset.
|
||||||
|
|
||||||
|
The model comes from: https://zhuanlan.zhihu.com/p/29214791,
|
||||||
|
and it gets to about 87% validation accuracy in 100 epochs.
|
||||||
|
|
||||||
|
Note that the script requires a machine with 4 GPUs. You
|
||||||
|
can set {"gpu": 0} to use CPUs for training, although
|
||||||
|
it is less efficient.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import tensorflow as tf
|
||||||
|
from tensorflow.keras.datasets import cifar10
|
||||||
|
from tensorflow.keras.layers import (
|
||||||
|
Convolution2D,
|
||||||
|
Dense,
|
||||||
|
Dropout,
|
||||||
|
Flatten,
|
||||||
|
Input,
|
||||||
|
MaxPooling2D,
|
||||||
|
)
|
||||||
|
from tensorflow.keras.models import Model, load_model
|
||||||
|
from tensorflow.keras.preprocessing.image import ImageDataGenerator
|
||||||
|
|
||||||
|
from ray import train, tune
|
||||||
|
from ray.tune import Trainable
|
||||||
|
from ray.tune.schedulers import PopulationBasedTraining
|
||||||
|
|
||||||
|
num_classes = 10
|
||||||
|
NUM_SAMPLES = 128
|
||||||
|
|
||||||
|
|
||||||
|
class Cifar10Model(Trainable):
|
||||||
|
def _read_data(self):
|
||||||
|
# The data, split between train and test sets:
|
||||||
|
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
|
||||||
|
|
||||||
|
# Convert class vectors to binary class matrices.
|
||||||
|
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
|
||||||
|
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
|
||||||
|
|
||||||
|
x_train = x_train.astype("float32")
|
||||||
|
x_train /= 255
|
||||||
|
x_test = x_test.astype("float32")
|
||||||
|
x_test /= 255
|
||||||
|
|
||||||
|
return (x_train, y_train), (x_test, y_test)
|
||||||
|
|
||||||
|
def _build_model(self, input_shape):
|
||||||
|
x = Input(shape=(32, 32, 3))
|
||||||
|
y = x
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=64,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=64,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
|
||||||
|
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=128,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=128,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
|
||||||
|
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=256,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = Convolution2D(
|
||||||
|
filters=256,
|
||||||
|
kernel_size=3,
|
||||||
|
strides=1,
|
||||||
|
padding="same",
|
||||||
|
activation="relu",
|
||||||
|
kernel_initializer="he_normal",
|
||||||
|
)(y)
|
||||||
|
y = MaxPooling2D(pool_size=2, strides=2, padding="same")(y)
|
||||||
|
|
||||||
|
y = Flatten()(y)
|
||||||
|
y = Dropout(self.config.get("dropout", 0.5))(y)
|
||||||
|
y = Dense(units=10, activation="softmax", kernel_initializer="he_normal")(y)
|
||||||
|
|
||||||
|
model = Model(inputs=x, outputs=y, name="model1")
|
||||||
|
return model
|
||||||
|
|
||||||
|
def setup(self, config):
|
||||||
|
self.train_data, self.test_data = self._read_data()
|
||||||
|
x_train = self.train_data[0]
|
||||||
|
model = self._build_model(x_train.shape[1:])
|
||||||
|
|
||||||
|
opt = tf.keras.optimizers.Adadelta(
|
||||||
|
lr=self.config.get("lr", 1e-4), weight_decay=self.config.get("decay", 1e-4)
|
||||||
|
)
|
||||||
|
model.compile(
|
||||||
|
loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"]
|
||||||
|
)
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
def step(self):
|
||||||
|
x_train, y_train = self.train_data
|
||||||
|
x_train, y_train = x_train[:NUM_SAMPLES], y_train[:NUM_SAMPLES]
|
||||||
|
x_test, y_test = self.test_data
|
||||||
|
x_test, y_test = x_test[:NUM_SAMPLES], y_test[:NUM_SAMPLES]
|
||||||
|
|
||||||
|
aug_gen = ImageDataGenerator(
|
||||||
|
# set input mean to 0 over the dataset
|
||||||
|
featurewise_center=False,
|
||||||
|
# set each sample mean to 0
|
||||||
|
samplewise_center=False,
|
||||||
|
# divide inputs by dataset std
|
||||||
|
featurewise_std_normalization=False,
|
||||||
|
# divide each input by its std
|
||||||
|
samplewise_std_normalization=False,
|
||||||
|
# apply ZCA whitening
|
||||||
|
zca_whitening=False,
|
||||||
|
# randomly rotate images in the range (degrees, 0 to 180)
|
||||||
|
rotation_range=0,
|
||||||
|
# randomly shift images horizontally (fraction of total width)
|
||||||
|
width_shift_range=0.1,
|
||||||
|
# randomly shift images vertically (fraction of total height)
|
||||||
|
height_shift_range=0.1,
|
||||||
|
# randomly flip images
|
||||||
|
horizontal_flip=True,
|
||||||
|
# randomly flip images
|
||||||
|
vertical_flip=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
aug_gen.fit(x_train)
|
||||||
|
batch_size = self.config.get("batch_size", 64)
|
||||||
|
gen = aug_gen.flow(x_train, y_train, batch_size=batch_size)
|
||||||
|
self.model.fit_generator(
|
||||||
|
generator=gen, epochs=self.config.get("epochs", 1), validation_data=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# loss, accuracy
|
||||||
|
_, accuracy = self.model.evaluate(x_test, y_test, verbose=0)
|
||||||
|
return {"mean_accuracy": accuracy}
|
||||||
|
|
||||||
|
def save_checkpoint(self, checkpoint_dir):
|
||||||
|
file_path = checkpoint_dir + "/model"
|
||||||
|
self.model.save(file_path)
|
||||||
|
|
||||||
|
def load_checkpoint(self, checkpoint_dir):
|
||||||
|
# See https://stackoverflow.com/a/42763323
|
||||||
|
del self.model
|
||||||
|
file_path = checkpoint_dir + "/model"
|
||||||
|
self.model = load_model(file_path)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
# If need, save your model when exit.
|
||||||
|
# saved_path = self.model.save(self.logdir)
|
||||||
|
# print("save model at: ", saved_path)
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
"--smoke-test", action="store_true", help="Finish quickly for testing"
|
||||||
|
)
|
||||||
|
args, _ = parser.parse_known_args()
|
||||||
|
|
||||||
|
space = {
|
||||||
|
"epochs": 1,
|
||||||
|
"batch_size": 64,
|
||||||
|
"lr": tune.grid_search([10**-4, 10**-5]),
|
||||||
|
"decay": tune.sample_from(lambda spec: spec.config.lr / 100.0),
|
||||||
|
"dropout": tune.grid_search([0.25, 0.5]),
|
||||||
|
}
|
||||||
|
if args.smoke_test:
|
||||||
|
space["lr"] = 10**-4
|
||||||
|
space["dropout"] = 0.5
|
||||||
|
|
||||||
|
perturbation_interval = 10
|
||||||
|
pbt = PopulationBasedTraining(
|
||||||
|
time_attr="training_iteration",
|
||||||
|
perturbation_interval=perturbation_interval,
|
||||||
|
hyperparam_mutations={
|
||||||
|
"dropout": lambda _: np.random.uniform(0, 1),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
tuner = tune.Tuner(
|
||||||
|
tune.with_resources(
|
||||||
|
Cifar10Model,
|
||||||
|
resources={"cpu": 1, "gpu": 1},
|
||||||
|
),
|
||||||
|
run_config=train.RunConfig(
|
||||||
|
name="pbt_cifar10",
|
||||||
|
stop={
|
||||||
|
"mean_accuracy": 0.80,
|
||||||
|
"training_iteration": 30,
|
||||||
|
},
|
||||||
|
checkpoint_config=train.CheckpointConfig(
|
||||||
|
checkpoint_frequency=perturbation_interval,
|
||||||
|
checkpoint_score_attribute="mean_accuracy",
|
||||||
|
num_to_keep=2,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
tune_config=tune.TuneConfig(
|
||||||
|
scheduler=pbt,
|
||||||
|
num_samples=4,
|
||||||
|
metric="mean_accuracy",
|
||||||
|
mode="max",
|
||||||
|
reuse_actors=True,
|
||||||
|
),
|
||||||
|
param_space=space,
|
||||||
|
)
|
||||||
|
results = tuner.fit()
|
||||||
|
print("Best hyperparameters found were: ", results.get_best_result().config)
|
Loading…
Reference in a new issue