Compare commits

...

7 commits

14 changed files with 130 additions and 769 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
__pycache__
notebooks/
/deployment_scripts/create-env.sh
/deployment_scripts/deploy-env.sh
/deployment_scripts/environment.yaml

115
README.md
View file

@ -1,77 +1,56 @@
# Dask: How to execute python workloads using a Dask cluster on Vulcan # Dask: How to execute python workloads using a Dask cluster on Vulcan
Wiki link:
Motivation: This document aims to show users how to launch a Dask cluster in our compute platforms and perform a simple workload using it.
Structure:
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
- [x] [How-to guide](https://diataxis.fr/how-to-guides/)
- [ ] [Reference](https://diataxis.fr/reference/)
- [ ] [Explanation](https://diataxis.fr/explanation/)
To do:
- [x] Made scripts for environment creation and deployment in the folder `local_scripts`
- [x] Changed scripts to `deployment_scripts`
- [x] Added step about sending python file
---
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster. This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
## Table of Contents ## Table of Contents
- [Prerequisites](#prerequisites)
- [Getting Started](#getting-started) - [Getting Started](#getting-started)
- [Usage](#usage) - [Usage](#usage)
- [Notes](#notes) - [Notes](#notes)
## Prerequisites
Before running the application, make sure you have the following prerequisites installed in a conda environment:
- [Python 3.8.18](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
- [Dask](https://dask.org/): Install Dask using conda.
- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan.
## Getting Started ## Getting Started
1. Clone [this repository](https://code.hlrs.de/hpcrsaxe/spark_template) to your local machine: ### 1. Build and transfer the Conda environment to Vulcan:
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Vulcan.
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment.
### 2. Allocate workspace on Vulcan:
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
```bash ```bash
ws_allocate dask_workspace 10
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
```
### 3. Clone the repository on Vulcan to use the deployment scripts and project structure:
```bash
cd <workspace_directory>
git clone <repository_url> git clone <repository_url>
``` ```
2. Go into the direcotry and create an environment using Conda and enirvonment.yaml. Note: Be sure to add the necessary packages in environemnt.yaml: ### 4. Send all the code to the appropriate directory on Vulcan using `scp`:
```bash
./deployment_scripts/create-env.sh <your-env>
```
3. Send all files using `deploy-env.sh`:
```bash
./deployment_scripts/deploy-env.sh <your-env> <destination_host>:<destination_directory>
```
4. Send all the code to the appropriate directory on Vulcan using `scp`:
```bash ```bash
scp <your_script>.py <destination_host>:<destination_directory> scp <your_script>.py <destination_host>:<destination_directory>
``` ```
5. SSH into Vulcan and start a job interatively using: ### 5. SSH into Vulcan and start a job interactively using:
```bash ```bash
qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00 qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
``` ```
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. Follow section [Multiple Nodes](#multiple-nodes) for more information.
6. Go into the directory with all code: ### 6. Go into the directory with all code:
```bash ```bash
cd <destination_directory> cd <destination_directory>
``` ```
7. Initialize the Dask cluster: ### 7. Initialize the Dask cluster:
```bash ```bash
source deploy-dask.sh "$(pwd)" source deploy-dask.sh "$(pwd)"
@ -81,7 +60,22 @@ Before running the application, make sure you have the following prerequisites i
## Usage ## Usage
To run the application interactively, execute the following command after all the cluster's nodes are up and running: ### Single Node
To run the application interactively on a single node, follow points 4, 5, 6 and, 7 from [Getting Started](#getting-started), and execute the following command after all the job has started:
```bash
# Load the Conda module
module load bigdata/conda
source activate # activates the base environment
# List available Conda environments for verification purposes
conda env list
# Activate a specific Conda environment.
conda activate dask_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
```
After the environment is activated, you can run the python interpretor:
```bash ```bash
python python
@ -92,11 +86,36 @@ Or to run a full script:
python <your-script>.py python <your-script>.py
``` ```
Note: If you don't see your environment in the python interpretor, then manually activate it using: ### Multiple Nodes
To run the application on multiple nodes, you need to write a `.pbs` script and submit it using `qsub`. Follow lines 1-4 from the [Getting Started](#getting-started) section. Write a `submit-dask-job.pbs` script:
```bash ```bash
conda activate <your-env> #!/bin/bash
#PBS -N dask-job
#PBS -l select=3:node_type=rome
#PBS -l walltime=1:00:00
#Go to the directory where the code is
cd <destination_directory>
#Deploy the Dask cluster
source deploy-dask.sh "$(pwd)"
#Run the python script
python <your-script>.py
```
A more thorough example is available in the `deployment_scripts` directory under `submit-dask-job.pbs`.
And then execute the following commands to submit the job:
```bash
qsub submit-dask-job.pbs
qstat -anw # Q: Queued, R: Running, E: Ending
ls -l # list files after the job finishes
cat dask-job.o... # inspect the output file
cat dask-job.e... # inspect the error file
``` ```
Do this before using the python interpretor.
## Notes ## Notes

View file

@ -1,25 +0,0 @@
#!/bin/bash
# Display usage
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <conda_environment_name>"
exit 1
fi
# Name of the Conda environment
CONDA_ENV_NAME=$1
# Check if the Conda environment already exists
if conda env list | grep -q "$CONDA_ENV_NAME"; then
echo "Environment '$CONDA_ENV_NAME' already exists."
else
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
# Create Conda environment
conda env create --name $CONDA_ENV_NAME -f environment.yaml
echo "Conda environment '$CONDA_ENV_NAME' created."
fi

View file

@ -6,13 +6,13 @@ export DASK_SCHEDULER_HOST=$2
# Path to localscratch # Path to localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment" echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
export DASK_ENV="/localscratch/${PBS_JOBID}/dask" export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV mkdir -p $DASK_ENV
# Extract Dask environment in localscratch # Extract Dask environment in localscratch
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV" echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV #tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV #chmod -R 700 $DASK_ENV
# Start the dask environment # Start the dask environment
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment" echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"

View file

@ -24,7 +24,7 @@ export DASK_UI_PORT=8787
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes." echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
# Path to localscratch # Path to localscratch
export DASK_ENV="/localscratch/${PBS_JOBID}/dask" export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV mkdir -p $DASK_ENV
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV" echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"

View file

@ -1,41 +0,0 @@
#!/bin/bash
# Check if a destination and environment name are provided
if [ "$#" -ne 2 ]; then
echo "Usage: $0 <environment_name> <destination_directory>"
exit 1
fi
# Name of the Conda environment
CONDA_ENV_NAME="$1"
TAR_FILE="$CONDA_ENV_NAME.tar.gz"
# Check if the tar.gz file already exists
if [ -e "$TAR_FILE" ]; then
echo "Using existing $TAR_FILE"
else
# Pack the Conda environment if the file doesn't exist
conda pack -n "$CONDA_ENV_NAME" -o "$TAR_FILE"
fi
# Parse the destination host and directory
DESTINATION=$2
IFS=':' read -ra DEST <<< "$DESTINATION"
DEST_HOST="${DEST[0]}"
DEST_DIR="${DEST[1]}"
# Copy the environment tarball to the remote server
scp "$TAR_FILE" "$DEST_HOST":"$DEST_DIR"
scp deploy-dask.sh "$DEST_HOST":"$DEST_DIR"
scp dask-worker.sh "$DEST_HOST":"$DEST_DIR"
echo "Conda environment '$CONDA_ENV_NAME' packed and deployed to '$DEST_HOST:$DEST_DIR' as '$TAR_FILE'."
# Ask the user if they want to delete the tar.gz file
read -p "Do you want to delete the local tar.gz file? (y/n): " answer
if [ "$answer" == "y" ]; then
rm "$TAR_FILE"
echo "Local tar.gz file deleted."
else
echo "Local tar.gz file not deleted."
fi

View file

@ -1,19 +1,5 @@
# Reference Guide: Dask Cluster Deployment Scripts # Reference Guide: Dask Cluster Deployment Scripts
Wiki link:
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
Structure:
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
- [ ] [How-to guide](https://diataxis.fr/how-to-guides/)
- [x] [Reference](https://diataxis.fr/reference/)
- [ ] [Explanation](https://diataxis.fr/explanation/)
To do:
---
## Overview ## Overview
This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script: This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script:
@ -36,40 +22,7 @@ Before using these scripts, ensure that the following prerequisites are met:
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication. 3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
## 1. create-env.sh ## 1. deploy-dask.sh
### Overview
`create-env.sh` is designed to create a Conda environment. It checks for the existence of the specified environment and either creates it or notifies the user if it already exists.
Note: Define your Conda environment in `environment.yaml` before running this script.
### Usage
```bash
./create-env.sh <conda_environment_name>
```
### Note
- This script is intended to run on a local system where Conda is installed.
## 2. deploy-env.sh
### Overview
`deploy-env.sh` is responsible for deploying the Conda environment to a remote server. If the tar.gz file already exists, it is copied; otherwise, it is created before being transferred.
### Usage
```bash
./deploy-env.sh <environment_name> <destination_directory>
```
### Note
- This script is intended to run on a local system.
## 3. deploy-dask.sh
### Overview ### Overview
@ -86,7 +39,7 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
- This script is designed for an HPC environment with PBS job scheduling. - This script is designed for an HPC environment with PBS job scheduling.
- Modifications may be necessary for different job schedulers. - Modifications may be necessary for different job schedulers.
## 4. dask-worker.sh ## 2. dask-worker.sh
### Overview ### Overview
@ -96,9 +49,3 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
- Execute this script on each allocated node to connect them to the Dask scheduler. - Execute this script on each allocated node to connect them to the Dask scheduler.
- Designed for use with PBS job scheduling. - Designed for use with PBS job scheduling.
## Workflow
1. **Create Conda Environment**: Execute `create-env.sh` to create a Conda environment locally.
2. **Deploy Conda Environment**: Execute `deploy-env.sh` to deploy the Conda environment to a remote server.
3. **Deploy Dask Cluster**: Execute `deploy-dask.sh` to start the Dask cluster on an HPC environment.

View file

@ -1,9 +0,0 @@
channels:
- defaults
- conda-forge
dependencies:
- python=3.8.18
- dask
- numpy
- scikit-learn
- conda-pack

View file

@ -0,0 +1,33 @@
#!/bin/bash
#PBS -N dask-job
#PBS -l select=2:node_type=rome
#PBS -l walltime=1:00:00
export PYTHON_FILE= # Path to the Python file you want to run
export CURRENT_WORKSPACE= # Path to the workspace where you have pulled this repo and the dask-env.tar.gz file
export ALL_NODES=$(cat $PBS_NODEFILE)
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
export DASK_SCHEDULER_PORT=8786
export DASK_UI_PORT=8787
export DASK_ENV="$HOME/dask"
mkdir -p $DASK_ENV
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
chmod -R 700 $DASK_ENV
source $DASK_ENV/bin/activate
conda-unpack
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
for ((i=1;i<$NUM_NODES;i++)); do
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
pbsdsh -n $i -o -- bash -l -c "source /deplyment_scripts/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
done
python3 $PYTHON_FILE

View file

@ -1,413 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import dask\n",
"import random\n",
"import torch\n",
"from torch.utils.data import Dataset, DataLoader\n",
"\n",
"from dask.distributed import Client\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"\n",
"from dask_ml.preprocessing import MinMaxScaler\n",
"from dask_ml.model_selection import train_test_split\n",
"from dask_ml.linear_model import LinearRegression\n",
"\n",
"from daskdataset import DaskDataset, ShallowNet\n",
"import torch.nn as nn\n",
"import torch.nn.functional as F"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"client = Client()\n",
"\n",
"sample_dataset=\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\"\n",
"\n",
"df = dd.read_parquet(sample_dataset, engine=\"fastparquet\")#.repartition(npartitions=10) #using pyarrow throws error with numpy"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run this only on the cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = Client(str(os.getenv('HOSTNAME')) + \"-ib:8786\")\n",
"\n",
"sample_datasets=[\"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet\",\n",
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet\",]\n",
"\n",
"df = dd.read_parquet(sample_datasets, engine=\"fastparquet\") "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Convert old Parquet to new Parquet"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:43:12,666 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:14,316 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:15,668 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33977 (pid=48276) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:15,946 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:21,786 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.74 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:23,443 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.09 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:24,862 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46711 (pid=48247) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:25,144 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:31,078 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.76 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:32,615 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.11 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:34,068 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:39735 (pid=48375) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:34,366 - distributed.nanny - WARNING - Restarting worker\n",
"2023-11-28 16:43:40,997 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:42,760 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:43:44,718 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43435 (pid=48187) exceeded 95% memory budget. Restarting...\n",
"2023-11-28 16:43:45,089 - distributed.nanny - WARNING - Restarting worker\n"
]
},
{
"ename": "KilledWorker",
"evalue": "Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Compute the Dask dataframe to get a Pandas dataframe\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m df_pandas \u001b[38;5;241m=\u001b[39m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;66;03m# Create a new Pandas dataframe with the expanded 'features' columns\u001b[39;00m\n\u001b[1;32m 5\u001b[0m features_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mto_list(), columns\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeature_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m i \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(\u001b[38;5;28mlen\u001b[39m(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;241m0\u001b[39m]))])\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:3224\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3222\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3223\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3224\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3225\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3226\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2359\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2358\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2359\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2360\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2361\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2362\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2363\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2364\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2365\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2366\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:351\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 351\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 352\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 353\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:418\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 416\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 417\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 418\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 419\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 420\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:391\u001b[0m, in \u001b[0;36msync.<locals>.f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 389\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 390\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 391\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 392\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 393\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n",
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2222\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2220\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2221\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2222\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2223\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2224\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n",
"\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html."
]
}
],
"source": [
"# Compute the Dask dataframe to get a Pandas dataframe\n",
"df_pandas = df.compute()\n",
"\n",
"# Create a new Pandas dataframe with the expanded 'features' columns\n",
"features_df = pd.DataFrame(df_pandas['features'].to_list(), columns=[f'feature_{i}' for i in range(len(df_pandas['features'][0]))])\n",
"labels_df = pd.DataFrame(df_pandas['labels'].to_list(), columns=[f'label_{i}' for i in range(len(df_pandas['labels'][0]))])\n",
"\n",
"# Concatenate the original dataframe with the expanded features and labels dataframes\n",
"df_pandas = pd.concat([features_df, labels_df], axis=1)\n",
"\n",
"#save df_pandas a parquet\n",
"df_pandas.to_parquet(\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\", engine=\"fastparquet\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"#separate the features and labels\n",
"df_labels = df.loc[:, df.columns.str.contains('label')]\n",
"\n",
"# Create a StandardScaler object\n",
"scaler_features = MinMaxScaler()\n",
"df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])\n",
"\n",
"#Split the data into training and test sets\n",
"X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:58:28,387 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.16 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:58:29,636 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.26 GiB -- Worker memory limit: 3.86 GiB\n"
]
}
],
"source": [
"X = torch.tensor(X_train.compute().values)\n",
"y = torch.tensor(y_train.compute().values)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 16:52:11,130 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.15 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 16:52:12,561 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.25 GiB -- Worker memory limit: 3.86 GiB\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"4860\n"
]
}
],
"source": [
"from skorch import NeuralNetRegressor\n",
"import torch.optim as optim\n",
"\n",
"niceties = {\n",
" \"callbacks\": False,\n",
" \"warm_start\": False,\n",
" \"train_split\": None,\n",
" \"max_epochs\": 5,\n",
"}\n",
"\n",
"model = NeuralNetRegressor(\n",
" module=ShallowNet,\n",
" module__n_features=X.size(dim=1),\n",
" criterion=nn.MSELoss,\n",
" optimizer=optim.SGD,\n",
" optimizer__lr=0.1,\n",
" optimizer__momentum=0.9,\n",
" batch_size=64,\n",
" **niceties,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
"model = model.share_memory()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Re-initializing module because the following parameters were re-set: n_features.\n",
"Re-initializing criterion.\n",
"Re-initializing optimizer.\n",
" epoch train_loss dur\n",
"------- ------------ ------\n",
" 1 \u001b[36m0.3994\u001b[0m 4.5545\n"
]
},
{
"data": {
"text/plain": [
"<class 'skorch.regressor.NeuralNetRegressor'>[initialized](\n",
" module_=ShallowNet(\n",
" (layer1): Linear(in_features=4860, out_features=8, bias=True)\n",
" ),\n",
")"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"model.fit(X, y)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-11-28 17:00:11,618 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
"2023-11-28 17:00:13,143 - distributed.worker.memory - WARNING - Worker is at 57% memory usage. Resuming worker. Process memory: 2.22 GiB -- Worker memory limit: 3.86 GiB\n"
]
}
],
"source": [
"test_X = torch.tensor(X_test.compute().values)\n",
"test_y = torch.tensor(y_test.compute().values)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-3.410176609207851\n"
]
}
],
"source": [
"print(model.score(test_X, test_y))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the loss function and optimizer\n",
"criterion = torch.nn.CrossEntropyLoss()\n",
"optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n",
"# Move the model to the GPU\n",
"device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
"model = model.to(device)\n",
"# Distribute the model across workers\n",
"model = model.share_memory()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Train the model\n",
"for epoch in range(10):\n",
" for batch in dataloader:\n",
" # Split the batch across workers\n",
" batch = [b.to(device) for b in batch]\n",
" futures = client.map(lambda data: model(data[0]), batch)\n",
" # Compute the loss\n",
" losses = client.map(criterion, futures, batch[1])\n",
" loss = client.submit(torch.mean, client.gather(losses))\n",
" # Compute the gradients and update the model parameters\n",
" optimizer.zero_grad()\n",
" gradients = client.map(lambda loss, future: torch.autograd.grad(loss, future)[0], loss, futures)\n",
" gradients =client.submit(torch.mean, client.gather(gradients))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.map(lambda parameter, gradient: parameter.grad.copy_(gradient), model.parameters(), gradients)\n",
"optimizer.step()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_ml.linear_model import LinearRegression\n",
"from sklearn.linear_model import LinearRegression\n",
"\n",
"# Initialize the Linear Regression model\n",
"model = LinearRegression()\n",
"model.fit(X_train, y_train)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_dask_df = df['features'].apply(pd.Series, meta=meta)\n",
"new_dask_df.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use this code for PyArrow tables\n",
"\n",
"#import pyarrow as pa\n",
"\n",
"# Define schema for features and labels columns\n",
"#schema = pa.schema({\n",
" #'features': pa.list_(pa.float32()),\n",
" #'labels': pa.list_(pa.float32())\n",
"#})\n",
"\n",
"#import pyarrow.parquet as pq\n",
"#df = dd.from_pandas(pq.read_table(sample_dataset, schema=schema).to_pandas(), npartitions=10)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dask-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.18"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -1,57 +1,16 @@
import os import dask.bag as db
import dask
import random import random
from dask.distributed import Client from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
import torch
from torch.utils.data import Dataset, DataLoader
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786") client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
NUM_SAMPLES=100
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet", def inside(p):
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet", x, y = random.random(), random.random()
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet", return x*x + y*y < 1
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet" def calc_pi():
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
return 4.0 * count / NUM_SAMPLES
df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300) print(calc_pi())
df_features = df.loc[:, df.columns.str.contains('feature')]
df_labels = df.loc[:, df.columns.str.contains('label')]
# Create a StandardScaler object
scaler_features = MinMaxScaler()
df_features_scaled = scaler_features.fit_transform(df_features)
#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')]
#Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0)
dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True)
model = torch.nn.Sequential(
torch.nn.Linear(784, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 10)
)
#X_train = X_train.loc[:, X_train.columns.str.contains('feature')]
#y_train = y_train.loc[:, y_train.columns.str.contains('label')]
# Initialize the Linear Regression model
model = LinearRegression().fit(X_train, y_train)
score = model.score(X_test, y_test)

View file

@ -1,82 +0,0 @@
import os
import dask
import random
import torch
from torch.utils.data import Dataset, DataLoader
import time
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from joblib import parallel_backend
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
from daskdataset import DaskDataset, ShallowNet
start_time = time.time()
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB")
#df_future = client.scatter(df)
#separate the features and labels
df_labels = df.loc[:, df.columns.str.contains('label')]
# Create a StandardScaler object
scaler_features = MinMaxScaler()
df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])
#Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)
X = torch.tensor(X_train.compute().values)
y = torch.tensor(y_train.compute().values)
from skorch import NeuralNetRegressor
import torch.optim as optim
niceties = {
"callbacks": False,
"warm_start": False,
"train_split": None,
"max_epochs": 5,
}
model = NeuralNetRegressor(
module=ShallowNet,
module__n_features=X.size(dim=1),
criterion=nn.MSELoss,
optimizer=optim.SGD,
optimizer__lr=0.1,
optimizer__momentum=0.9,
batch_size=64,
**niceties,
)
# Initialize the Linear Regression model
model = LinearRegression()
model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True))
end_time = time.time()
print("Time to load data: ", end_time - start_time)
dask_dataset = DaskDataset(X_train, y_train)
dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True)
for feature, label in dataloader:
print(feature)
print(label)
break

View file

@ -1,32 +0,0 @@
import torch
from torch.utils.data import Dataset, DataLoader
import dask.dataframe as dd
import torch.nn as nn
import torch.nn.functional as F
# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns
class DaskDataset(Dataset):
def __init__(self, df_features, df_labels):
self.features = df_features.to_dask_array(lengths=True)
self.labels = df_labels.to_dask_array(lengths=True)
def __len__(self):
return self.features.size
def __getitem__(self, idx):
return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values)
class ShallowNet(nn.Module):
def __init__(self, n_features):
super().__init__()
self.layer1 = nn.Linear(n_features, 128)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(128, 8)
def forward(self, x):
x = self.layer1(x)
x = self.relu(x)
x = self.layer2(x)
return x