forked from SiVeGCS/dask_template
Compare commits
7 commits
ray-tune-t
...
main
Author | SHA1 | Date | |
---|---|---|---|
26cb3ed722 | |||
aa3d6b6799 | |||
1eb6024fd0 | |||
1e56496f56 | |||
3ceddcaed1 | |||
48328cc85f | |||
251f190e11 |
14 changed files with 130 additions and 769 deletions
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
__pycache__
|
||||||
|
notebooks/
|
||||||
|
/deployment_scripts/create-env.sh
|
||||||
|
/deployment_scripts/deploy-env.sh
|
||||||
|
/deployment_scripts/environment.yaml
|
115
README.md
115
README.md
|
@ -1,77 +1,56 @@
|
||||||
# Dask: How to execute python workloads using a Dask cluster on Vulcan
|
# Dask: How to execute python workloads using a Dask cluster on Vulcan
|
||||||
|
|
||||||
Wiki link:
|
|
||||||
|
|
||||||
Motivation: This document aims to show users how to launch a Dask cluster in our compute platforms and perform a simple workload using it.
|
|
||||||
|
|
||||||
Structure:
|
|
||||||
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
|
|
||||||
- [x] [How-to guide](https://diataxis.fr/how-to-guides/)
|
|
||||||
- [ ] [Reference](https://diataxis.fr/reference/)
|
|
||||||
- [ ] [Explanation](https://diataxis.fr/explanation/)
|
|
||||||
|
|
||||||
To do:
|
|
||||||
- [x] Made scripts for environment creation and deployment in the folder `local_scripts`
|
|
||||||
- [x] Changed scripts to `deployment_scripts`
|
|
||||||
- [x] Added step about sending python file
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
|
This repository looks at a deployment of a Dask cluster on Vulcan, and executing your programs using this cluster.
|
||||||
|
|
||||||
## Table of Contents
|
## Table of Contents
|
||||||
- [Prerequisites](#prerequisites)
|
|
||||||
- [Getting Started](#getting-started)
|
- [Getting Started](#getting-started)
|
||||||
- [Usage](#usage)
|
- [Usage](#usage)
|
||||||
- [Notes](#notes)
|
- [Notes](#notes)
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
Before running the application, make sure you have the following prerequisites installed in a conda environment:
|
|
||||||
- [Python 3.8.18](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
|
||||||
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
|
||||||
- [Dask](https://dask.org/): Install Dask using conda.
|
|
||||||
- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan.
|
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
1. Clone [this repository](https://code.hlrs.de/hpcrsaxe/spark_template) to your local machine:
|
### 1. Build and transfer the Conda environment to Vulcan:
|
||||||
|
|
||||||
|
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Vulcan.
|
||||||
|
|
||||||
|
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder), which includes a YAML file for building a test environment.
|
||||||
|
|
||||||
|
### 2. Allocate workspace on Vulcan:
|
||||||
|
|
||||||
|
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
ws_allocate dask_workspace 10
|
||||||
|
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Clone the repository on Vulcan to use the deployment scripts and project structure:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd <workspace_directory>
|
||||||
git clone <repository_url>
|
git clone <repository_url>
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Go into the direcotry and create an environment using Conda and enirvonment.yaml. Note: Be sure to add the necessary packages in environemnt.yaml:
|
### 4. Send all the code to the appropriate directory on Vulcan using `scp`:
|
||||||
|
|
||||||
```bash
|
|
||||||
./deployment_scripts/create-env.sh <your-env>
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Send all files using `deploy-env.sh`:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./deployment_scripts/deploy-env.sh <your-env> <destination_host>:<destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
4. Send all the code to the appropriate directory on Vulcan using `scp`:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
scp <your_script>.py <destination_host>:<destination_directory>
|
scp <your_script>.py <destination_host>:<destination_directory>
|
||||||
```
|
```
|
||||||
|
|
||||||
5. SSH into Vulcan and start a job interatively using:
|
### 5. SSH into Vulcan and start a job interactively using:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00
|
qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
|
||||||
```
|
```
|
||||||
|
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. Follow section [Multiple Nodes](#multiple-nodes) for more information.
|
||||||
|
|
||||||
6. Go into the directory with all code:
|
### 6. Go into the directory with all code:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd <destination_directory>
|
cd <destination_directory>
|
||||||
```
|
```
|
||||||
|
|
||||||
7. Initialize the Dask cluster:
|
### 7. Initialize the Dask cluster:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
source deploy-dask.sh "$(pwd)"
|
source deploy-dask.sh "$(pwd)"
|
||||||
|
@ -81,7 +60,22 @@ Before running the application, make sure you have the following prerequisites i
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
To run the application interactively, execute the following command after all the cluster's nodes are up and running:
|
### Single Node
|
||||||
|
To run the application interactively on a single node, follow points 4, 5, 6 and, 7 from [Getting Started](#getting-started), and execute the following command after all the job has started:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Load the Conda module
|
||||||
|
module load bigdata/conda
|
||||||
|
source activate # activates the base environment
|
||||||
|
|
||||||
|
# List available Conda environments for verification purposes
|
||||||
|
conda env list
|
||||||
|
|
||||||
|
# Activate a specific Conda environment.
|
||||||
|
conda activate dask_environment # you need to execute `source activate` first, or use `source [ENV_PATH]/bin/activate`
|
||||||
|
```
|
||||||
|
|
||||||
|
After the environment is activated, you can run the python interpretor:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
python
|
python
|
||||||
|
@ -92,11 +86,36 @@ Or to run a full script:
|
||||||
python <your-script>.py
|
python <your-script>.py
|
||||||
```
|
```
|
||||||
|
|
||||||
Note: If you don't see your environment in the python interpretor, then manually activate it using:
|
### Multiple Nodes
|
||||||
|
To run the application on multiple nodes, you need to write a `.pbs` script and submit it using `qsub`. Follow lines 1-4 from the [Getting Started](#getting-started) section. Write a `submit-dask-job.pbs` script:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
conda activate <your-env>
|
#!/bin/bash
|
||||||
|
#PBS -N dask-job
|
||||||
|
#PBS -l select=3:node_type=rome
|
||||||
|
#PBS -l walltime=1:00:00
|
||||||
|
|
||||||
|
#Go to the directory where the code is
|
||||||
|
cd <destination_directory>
|
||||||
|
|
||||||
|
#Deploy the Dask cluster
|
||||||
|
source deploy-dask.sh "$(pwd)"
|
||||||
|
|
||||||
|
#Run the python script
|
||||||
|
python <your-script>.py
|
||||||
|
```
|
||||||
|
|
||||||
|
A more thorough example is available in the `deployment_scripts` directory under `submit-dask-job.pbs`.
|
||||||
|
|
||||||
|
And then execute the following commands to submit the job:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
qsub submit-dask-job.pbs
|
||||||
|
qstat -anw # Q: Queued, R: Running, E: Ending
|
||||||
|
ls -l # list files after the job finishes
|
||||||
|
cat dask-job.o... # inspect the output file
|
||||||
|
cat dask-job.e... # inspect the error file
|
||||||
```
|
```
|
||||||
Do this before using the python interpretor.
|
|
||||||
|
|
||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
|
|
Binary file not shown.
|
@ -1,25 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Display usage
|
|
||||||
if [ "$#" -ne 1 ]; then
|
|
||||||
echo "Usage: $0 <conda_environment_name>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Name of the Conda environment
|
|
||||||
CONDA_ENV_NAME=$1
|
|
||||||
|
|
||||||
# Check if the Conda environment already exists
|
|
||||||
if conda env list | grep -q "$CONDA_ENV_NAME"; then
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' already exists."
|
|
||||||
|
|
||||||
else
|
|
||||||
|
|
||||||
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
|
|
||||||
|
|
||||||
# Create Conda environment
|
|
||||||
conda env create --name $CONDA_ENV_NAME -f environment.yaml
|
|
||||||
|
|
||||||
echo "Conda environment '$CONDA_ENV_NAME' created."
|
|
||||||
fi
|
|
|
@ -6,13 +6,13 @@ export DASK_SCHEDULER_HOST=$2
|
||||||
|
|
||||||
# Path to localscratch
|
# Path to localscratch
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
||||||
export DASK_ENV="/localscratch/${PBS_JOBID}/dask"
|
export DASK_ENV="$HOME/dask"
|
||||||
mkdir -p $DASK_ENV
|
mkdir -p $DASK_ENV
|
||||||
|
|
||||||
# Extract Dask environment in localscratch
|
# Extract Dask environment in localscratch
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Extracting Dask environment to $DASK_ENV"
|
||||||
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
#tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
||||||
chmod -R 700 $DASK_ENV
|
#chmod -R 700 $DASK_ENV
|
||||||
|
|
||||||
# Start the dask environment
|
# Start the dask environment
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Worker $HOSTNAME] INFO: Setting up Dask environment"
|
||||||
|
|
|
@ -24,7 +24,7 @@ export DASK_UI_PORT=8787
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask cluster with $NUM_NODES nodes."
|
||||||
# Path to localscratch
|
# Path to localscratch
|
||||||
export DASK_ENV="/localscratch/${PBS_JOBID}/dask"
|
export DASK_ENV="$HOME/dask"
|
||||||
mkdir -p $DASK_ENV
|
mkdir -p $DASK_ENV
|
||||||
|
|
||||||
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Extracting Dask environment to $DASK_ENV"
|
||||||
|
|
|
@ -1,41 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Check if a destination and environment name are provided
|
|
||||||
if [ "$#" -ne 2 ]; then
|
|
||||||
echo "Usage: $0 <environment_name> <destination_directory>"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Name of the Conda environment
|
|
||||||
CONDA_ENV_NAME="$1"
|
|
||||||
TAR_FILE="$CONDA_ENV_NAME.tar.gz"
|
|
||||||
|
|
||||||
# Check if the tar.gz file already exists
|
|
||||||
if [ -e "$TAR_FILE" ]; then
|
|
||||||
echo "Using existing $TAR_FILE"
|
|
||||||
else
|
|
||||||
# Pack the Conda environment if the file doesn't exist
|
|
||||||
conda pack -n "$CONDA_ENV_NAME" -o "$TAR_FILE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Parse the destination host and directory
|
|
||||||
DESTINATION=$2
|
|
||||||
IFS=':' read -ra DEST <<< "$DESTINATION"
|
|
||||||
DEST_HOST="${DEST[0]}"
|
|
||||||
DEST_DIR="${DEST[1]}"
|
|
||||||
|
|
||||||
# Copy the environment tarball to the remote server
|
|
||||||
scp "$TAR_FILE" "$DEST_HOST":"$DEST_DIR"
|
|
||||||
scp deploy-dask.sh "$DEST_HOST":"$DEST_DIR"
|
|
||||||
scp dask-worker.sh "$DEST_HOST":"$DEST_DIR"
|
|
||||||
|
|
||||||
echo "Conda environment '$CONDA_ENV_NAME' packed and deployed to '$DEST_HOST:$DEST_DIR' as '$TAR_FILE'."
|
|
||||||
|
|
||||||
# Ask the user if they want to delete the tar.gz file
|
|
||||||
read -p "Do you want to delete the local tar.gz file? (y/n): " answer
|
|
||||||
if [ "$answer" == "y" ]; then
|
|
||||||
rm "$TAR_FILE"
|
|
||||||
echo "Local tar.gz file deleted."
|
|
||||||
else
|
|
||||||
echo "Local tar.gz file not deleted."
|
|
||||||
fi
|
|
|
@ -1,19 +1,5 @@
|
||||||
# Reference Guide: Dask Cluster Deployment Scripts
|
# Reference Guide: Dask Cluster Deployment Scripts
|
||||||
|
|
||||||
Wiki link:
|
|
||||||
|
|
||||||
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
|
|
||||||
|
|
||||||
Structure:
|
|
||||||
- [ ] [Tutorial](https://diataxis.fr/tutorials/)
|
|
||||||
- [ ] [How-to guide](https://diataxis.fr/how-to-guides/)
|
|
||||||
- [x] [Reference](https://diataxis.fr/reference/)
|
|
||||||
- [ ] [Explanation](https://diataxis.fr/explanation/)
|
|
||||||
|
|
||||||
To do:
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script:
|
This repository contains a set of bash scripts designed to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. These scripts facilitate the creation of Conda environments, deployment of the environment to a remote server, and initiation of Dask clusters on distributed systems. Below is a comprehensive guide on how to use and understand each script:
|
||||||
|
@ -36,40 +22,7 @@ Before using these scripts, ensure that the following prerequisites are met:
|
||||||
|
|
||||||
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
|
3. **SSH Setup**: Ensure that SSH is set up and configured on your system for remote server communication.
|
||||||
|
|
||||||
## 1. create-env.sh
|
## 1. deploy-dask.sh
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`create-env.sh` is designed to create a Conda environment. It checks for the existence of the specified environment and either creates it or notifies the user if it already exists.
|
|
||||||
Note: Define your Conda environment in `environment.yaml` before running this script.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./create-env.sh <conda_environment_name>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system where Conda is installed.
|
|
||||||
|
|
||||||
## 2. deploy-env.sh
|
|
||||||
|
|
||||||
### Overview
|
|
||||||
|
|
||||||
`deploy-env.sh` is responsible for deploying the Conda environment to a remote server. If the tar.gz file already exists, it is copied; otherwise, it is created before being transferred.
|
|
||||||
|
|
||||||
### Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
./deploy-env.sh <environment_name> <destination_directory>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Note
|
|
||||||
|
|
||||||
- This script is intended to run on a local system.
|
|
||||||
|
|
||||||
## 3. deploy-dask.sh
|
|
||||||
|
|
||||||
### Overview
|
### Overview
|
||||||
|
|
||||||
|
@ -86,7 +39,7 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
|
||||||
- This script is designed for an HPC environment with PBS job scheduling.
|
- This script is designed for an HPC environment with PBS job scheduling.
|
||||||
- Modifications may be necessary for different job schedulers.
|
- Modifications may be necessary for different job schedulers.
|
||||||
|
|
||||||
## 4. dask-worker.sh
|
## 2. dask-worker.sh
|
||||||
|
|
||||||
### Overview
|
### Overview
|
||||||
|
|
||||||
|
@ -96,9 +49,3 @@ Note: Define your Conda environment in `environment.yaml` before running this sc
|
||||||
|
|
||||||
- Execute this script on each allocated node to connect them to the Dask scheduler.
|
- Execute this script on each allocated node to connect them to the Dask scheduler.
|
||||||
- Designed for use with PBS job scheduling.
|
- Designed for use with PBS job scheduling.
|
||||||
|
|
||||||
## Workflow
|
|
||||||
|
|
||||||
1. **Create Conda Environment**: Execute `create-env.sh` to create a Conda environment locally.
|
|
||||||
2. **Deploy Conda Environment**: Execute `deploy-env.sh` to deploy the Conda environment to a remote server.
|
|
||||||
3. **Deploy Dask Cluster**: Execute `deploy-dask.sh` to start the Dask cluster on an HPC environment.
|
|
|
@ -1,9 +0,0 @@
|
||||||
channels:
|
|
||||||
- defaults
|
|
||||||
- conda-forge
|
|
||||||
dependencies:
|
|
||||||
- python=3.8.18
|
|
||||||
- dask
|
|
||||||
- numpy
|
|
||||||
- scikit-learn
|
|
||||||
- conda-pack
|
|
33
deployment_scripts/submit-dask-job.pbs
Normal file
33
deployment_scripts/submit-dask-job.pbs
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#PBS -N dask-job
|
||||||
|
#PBS -l select=2:node_type=rome
|
||||||
|
#PBS -l walltime=1:00:00
|
||||||
|
|
||||||
|
export PYTHON_FILE= # Path to the Python file you want to run
|
||||||
|
export CURRENT_WORKSPACE= # Path to the workspace where you have pulled this repo and the dask-env.tar.gz file
|
||||||
|
|
||||||
|
export ALL_NODES=$(cat $PBS_NODEFILE)
|
||||||
|
export SCHEDULER_NODE="$(head -n1 $PBS_NODEFILE)-ib"
|
||||||
|
export WORKER_NODES=$(tail -n+2 $PBS_NODEFILE)
|
||||||
|
|
||||||
|
export DASK_SCHEDULER_PORT=8786
|
||||||
|
export DASK_UI_PORT=8787
|
||||||
|
|
||||||
|
export DASK_ENV="$HOME/dask"
|
||||||
|
mkdir -p $DASK_ENV
|
||||||
|
tar -xzf $CURRENT_WORKSPACE/dask-env.tar.gz -C $DASK_ENV
|
||||||
|
chmod -R 700 $DASK_ENV
|
||||||
|
|
||||||
|
source $DASK_ENV/bin/activate
|
||||||
|
conda-unpack
|
||||||
|
|
||||||
|
dask scheduler --host $SCHEDULER_NODE --port $DASK_SCHEDULER_PORT &
|
||||||
|
export NUM_NODES=$(sort $PBS_NODEFILE |uniq | wc -l)
|
||||||
|
|
||||||
|
# Assuming you have a Dask worker script named 'dask-worker-script.py', modify this accordingly
|
||||||
|
for ((i=1;i<$NUM_NODES;i++)); do
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S') - Master] INFO: Starting Dask Worker at $i"
|
||||||
|
pbsdsh -n $i -o -- bash -l -c "source /deplyment_scripts/dask-worker.sh $CURRENT_WORKSPACE $SCHEDULER_NODE"
|
||||||
|
done
|
||||||
|
|
||||||
|
python3 $PYTHON_FILE
|
|
@ -1,413 +0,0 @@
|
||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 1,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"import os\n",
|
|
||||||
"import dask\n",
|
|
||||||
"import random\n",
|
|
||||||
"import torch\n",
|
|
||||||
"from torch.utils.data import Dataset, DataLoader\n",
|
|
||||||
"\n",
|
|
||||||
"from dask.distributed import Client\n",
|
|
||||||
"import dask.dataframe as dd\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"\n",
|
|
||||||
"from dask_ml.preprocessing import MinMaxScaler\n",
|
|
||||||
"from dask_ml.model_selection import train_test_split\n",
|
|
||||||
"from dask_ml.linear_model import LinearRegression\n",
|
|
||||||
"\n",
|
|
||||||
"from daskdataset import DaskDataset, ShallowNet\n",
|
|
||||||
"import torch.nn as nn\n",
|
|
||||||
"import torch.nn.functional as F"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 9,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client = Client()\n",
|
|
||||||
"\n",
|
|
||||||
"sample_dataset=\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\"\n",
|
|
||||||
"\n",
|
|
||||||
"df = dd.read_parquet(sample_dataset, engine=\"fastparquet\")#.repartition(npartitions=10) #using pyarrow throws error with numpy"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Run this only on the cluster"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client = Client(str(os.getenv('HOSTNAME')) + \"-ib:8786\")\n",
|
|
||||||
"\n",
|
|
||||||
"sample_datasets=[\"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet\",\n",
|
|
||||||
" \"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet\",]\n",
|
|
||||||
"\n",
|
|
||||||
"df = dd.read_parquet(sample_datasets, engine=\"fastparquet\") "
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Convert old Parquet to new Parquet"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 8,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:43:12,666 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:14,316 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:15,668 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:33977 (pid=48276) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:15,946 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:21,786 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.74 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:23,443 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.09 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:24,862 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46711 (pid=48247) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:25,144 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:31,078 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.76 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:32,615 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 3.11 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:34,068 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:39735 (pid=48375) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:34,366 - distributed.nanny - WARNING - Restarting worker\n",
|
|
||||||
"2023-11-28 16:43:40,997 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 2.72 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:42,760 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:43:44,718 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43435 (pid=48187) exceeded 95% memory budget. Restarting...\n",
|
|
||||||
"2023-11-28 16:43:45,089 - distributed.nanny - WARNING - Restarting worker\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"ename": "KilledWorker",
|
|
||||||
"evalue": "Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.",
|
|
||||||
"output_type": "error",
|
|
||||||
"traceback": [
|
|
||||||
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
|
||||||
"\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)",
|
|
||||||
"Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Compute the Dask dataframe to get a Pandas dataframe\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m df_pandas \u001b[38;5;241m=\u001b[39m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;66;03m# Create a new Pandas dataframe with the expanded 'features' columns\u001b[39;00m\n\u001b[1;32m 5\u001b[0m features_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mto_list(), columns\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeature_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mi\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m i \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mrange\u001b[39m(\u001b[38;5;28mlen\u001b[39m(df_pandas[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mfeatures\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;241m0\u001b[39m]))])\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:3224\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3222\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3223\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3224\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3225\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3226\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2359\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2358\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2359\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2360\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2361\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2362\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2363\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2364\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2365\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2366\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:351\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 351\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 352\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 353\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:418\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 416\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 417\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 418\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 419\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 420\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/utils.py:391\u001b[0m, in \u001b[0;36msync.<locals>.f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 389\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 390\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 391\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 392\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 393\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n",
|
|
||||||
"File \u001b[0;32m~/miniconda3/envs/dask-env/lib/python3.8/site-packages/distributed/client.py:2222\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2220\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2221\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2222\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2223\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2224\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n",
|
|
||||||
"\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('read-parquet-dfab612cdfb5b1c27377f316ddefebac', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43435. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html."
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"# Compute the Dask dataframe to get a Pandas dataframe\n",
|
|
||||||
"df_pandas = df.compute()\n",
|
|
||||||
"\n",
|
|
||||||
"# Create a new Pandas dataframe with the expanded 'features' columns\n",
|
|
||||||
"features_df = pd.DataFrame(df_pandas['features'].to_list(), columns=[f'feature_{i}' for i in range(len(df_pandas['features'][0]))])\n",
|
|
||||||
"labels_df = pd.DataFrame(df_pandas['labels'].to_list(), columns=[f'label_{i}' for i in range(len(df_pandas['labels'][0]))])\n",
|
|
||||||
"\n",
|
|
||||||
"# Concatenate the original dataframe with the expanded features and labels dataframes\n",
|
|
||||||
"df_pandas = pd.concat([features_df, labels_df], axis=1)\n",
|
|
||||||
"\n",
|
|
||||||
"#save df_pandas a parquet\n",
|
|
||||||
"df_pandas.to_parquet(\"/home/hpcrsaxe/Desktop/Code/Dataset/sample_train_data/dataset1.parquet\", engine=\"fastparquet\")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 10,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#separate the features and labels\n",
|
|
||||||
"df_labels = df.loc[:, df.columns.str.contains('label')]\n",
|
|
||||||
"\n",
|
|
||||||
"# Create a StandardScaler object\n",
|
|
||||||
"scaler_features = MinMaxScaler()\n",
|
|
||||||
"df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])\n",
|
|
||||||
"\n",
|
|
||||||
"#Split the data into training and test sets\n",
|
|
||||||
"X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 19,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:58:28,387 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.16 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:58:29,636 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.26 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"X = torch.tensor(X_train.compute().values)\n",
|
|
||||||
"y = torch.tensor(y_train.compute().values)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 14,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 16:52:11,130 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.15 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 16:52:12,561 - distributed.worker.memory - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 2.25 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"4860\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"from skorch import NeuralNetRegressor\n",
|
|
||||||
"import torch.optim as optim\n",
|
|
||||||
"\n",
|
|
||||||
"niceties = {\n",
|
|
||||||
" \"callbacks\": False,\n",
|
|
||||||
" \"warm_start\": False,\n",
|
|
||||||
" \"train_split\": None,\n",
|
|
||||||
" \"max_epochs\": 5,\n",
|
|
||||||
"}\n",
|
|
||||||
"\n",
|
|
||||||
"model = NeuralNetRegressor(\n",
|
|
||||||
" module=ShallowNet,\n",
|
|
||||||
" module__n_features=X.size(dim=1),\n",
|
|
||||||
" criterion=nn.MSELoss,\n",
|
|
||||||
" optimizer=optim.SGD,\n",
|
|
||||||
" optimizer__lr=0.1,\n",
|
|
||||||
" optimizer__momentum=0.9,\n",
|
|
||||||
" batch_size=64,\n",
|
|
||||||
" **niceties,\n",
|
|
||||||
")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"model.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
|
|
||||||
"model = model.share_memory()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 20,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"Re-initializing module because the following parameters were re-set: n_features.\n",
|
|
||||||
"Re-initializing criterion.\n",
|
|
||||||
"Re-initializing optimizer.\n",
|
|
||||||
" epoch train_loss dur\n",
|
|
||||||
"------- ------------ ------\n",
|
|
||||||
" 1 \u001b[36m0.3994\u001b[0m 4.5545\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"text/plain": [
|
|
||||||
"<class 'skorch.regressor.NeuralNetRegressor'>[initialized](\n",
|
|
||||||
" module_=ShallowNet(\n",
|
|
||||||
" (layer1): Linear(in_features=4860, out_features=8, bias=True)\n",
|
|
||||||
" ),\n",
|
|
||||||
")"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"execution_count": 20,
|
|
||||||
"metadata": {},
|
|
||||||
"output_type": "execute_result"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"model.fit(X, y)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 21,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stderr",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2023-11-28 17:00:11,618 - distributed.worker.memory - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 3.13 GiB -- Worker memory limit: 3.86 GiB\n",
|
|
||||||
"2023-11-28 17:00:13,143 - distributed.worker.memory - WARNING - Worker is at 57% memory usage. Resuming worker. Process memory: 2.22 GiB -- Worker memory limit: 3.86 GiB\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"test_X = torch.tensor(X_test.compute().values)\n",
|
|
||||||
"test_y = torch.tensor(y_test.compute().values)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 22,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"-3.410176609207851\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"print(model.score(test_X, test_y))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Define the loss function and optimizer\n",
|
|
||||||
"criterion = torch.nn.CrossEntropyLoss()\n",
|
|
||||||
"optimizer = torch.optim.SGD(model.parameters(), lr=0.01)\n",
|
|
||||||
"# Move the model to the GPU\n",
|
|
||||||
"device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
|
|
||||||
"model = model.to(device)\n",
|
|
||||||
"# Distribute the model across workers\n",
|
|
||||||
"model = model.share_memory()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Train the model\n",
|
|
||||||
"for epoch in range(10):\n",
|
|
||||||
" for batch in dataloader:\n",
|
|
||||||
" # Split the batch across workers\n",
|
|
||||||
" batch = [b.to(device) for b in batch]\n",
|
|
||||||
" futures = client.map(lambda data: model(data[0]), batch)\n",
|
|
||||||
" # Compute the loss\n",
|
|
||||||
" losses = client.map(criterion, futures, batch[1])\n",
|
|
||||||
" loss = client.submit(torch.mean, client.gather(losses))\n",
|
|
||||||
" # Compute the gradients and update the model parameters\n",
|
|
||||||
" optimizer.zero_grad()\n",
|
|
||||||
" gradients = client.map(lambda loss, future: torch.autograd.grad(loss, future)[0], loss, futures)\n",
|
|
||||||
" gradients =client.submit(torch.mean, client.gather(gradients))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"client.map(lambda parameter, gradient: parameter.grad.copy_(gradient), model.parameters(), gradients)\n",
|
|
||||||
"optimizer.step()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"from dask_ml.linear_model import LinearRegression\n",
|
|
||||||
"from sklearn.linear_model import LinearRegression\n",
|
|
||||||
"\n",
|
|
||||||
"# Initialize the Linear Regression model\n",
|
|
||||||
"model = LinearRegression()\n",
|
|
||||||
"model.fit(X_train, y_train)\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"new_dask_df = df['features'].apply(pd.Series, meta=meta)\n",
|
|
||||||
"new_dask_df.compute()"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Use this code for PyArrow tables\n",
|
|
||||||
"\n",
|
|
||||||
"#import pyarrow as pa\n",
|
|
||||||
"\n",
|
|
||||||
"# Define schema for features and labels columns\n",
|
|
||||||
"#schema = pa.schema({\n",
|
|
||||||
" #'features': pa.list_(pa.float32()),\n",
|
|
||||||
" #'labels': pa.list_(pa.float32())\n",
|
|
||||||
"#})\n",
|
|
||||||
"\n",
|
|
||||||
"#import pyarrow.parquet as pq\n",
|
|
||||||
"#df = dd.from_pandas(pq.read_table(sample_dataset, schema=schema).to_pandas(), npartitions=10)"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "dask-env",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.8.18"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
|
@ -1,57 +1,16 @@
|
||||||
import os
|
import dask.bag as db
|
||||||
import dask
|
|
||||||
import random
|
import random
|
||||||
|
|
||||||
from dask.distributed import Client
|
from dask.distributed import Client
|
||||||
import dask.dataframe as dd
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
from dask_ml.preprocessing import MinMaxScaler
|
|
||||||
from dask_ml.model_selection import train_test_split
|
|
||||||
from dask_ml.linear_model import LinearRegression
|
|
||||||
|
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
|
|
||||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
||||||
|
NUM_SAMPLES=100
|
||||||
|
|
||||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
def inside(p):
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
x, y = random.random(), random.random()
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
return x*x + y*y < 1
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
|
||||||
|
|
||||||
#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet"
|
def calc_pi():
|
||||||
|
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
|
||||||
|
return 4.0 * count / NUM_SAMPLES
|
||||||
|
|
||||||
df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300)
|
print(calc_pi())
|
||||||
|
|
||||||
df_features = df.loc[:, df.columns.str.contains('feature')]
|
|
||||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Create a StandardScaler object
|
|
||||||
scaler_features = MinMaxScaler()
|
|
||||||
df_features_scaled = scaler_features.fit_transform(df_features)
|
|
||||||
|
|
||||||
#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')]
|
|
||||||
|
|
||||||
#Split the data into training and test sets
|
|
||||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0)
|
|
||||||
|
|
||||||
dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True)
|
|
||||||
|
|
||||||
model = torch.nn.Sequential(
|
|
||||||
torch.nn.Linear(784, 128),
|
|
||||||
torch.nn.ReLU(),
|
|
||||||
torch.nn.Linear(128, 10)
|
|
||||||
)
|
|
||||||
|
|
||||||
#X_train = X_train.loc[:, X_train.columns.str.contains('feature')]
|
|
||||||
#y_train = y_train.loc[:, y_train.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Initialize the Linear Regression model
|
|
||||||
model = LinearRegression().fit(X_train, y_train)
|
|
||||||
|
|
||||||
score = model.score(X_test, y_test)
|
|
|
@ -1,82 +0,0 @@
|
||||||
import os
|
|
||||||
import dask
|
|
||||||
import random
|
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
import time
|
|
||||||
|
|
||||||
from dask.distributed import Client
|
|
||||||
import dask.dataframe as dd
|
|
||||||
import pandas as pd
|
|
||||||
from joblib import parallel_backend
|
|
||||||
|
|
||||||
from dask_ml.preprocessing import MinMaxScaler
|
|
||||||
from dask_ml.model_selection import train_test_split
|
|
||||||
from dask_ml.linear_model import LinearRegression
|
|
||||||
|
|
||||||
from daskdataset import DaskDataset, ShallowNet
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
|
||||||
|
|
||||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
|
||||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
|
||||||
df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB")
|
|
||||||
#df_future = client.scatter(df)
|
|
||||||
|
|
||||||
#separate the features and labels
|
|
||||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
|
||||||
|
|
||||||
# Create a StandardScaler object
|
|
||||||
scaler_features = MinMaxScaler()
|
|
||||||
df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])
|
|
||||||
|
|
||||||
#Split the data into training and test sets
|
|
||||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)
|
|
||||||
|
|
||||||
X = torch.tensor(X_train.compute().values)
|
|
||||||
y = torch.tensor(y_train.compute().values)
|
|
||||||
|
|
||||||
from skorch import NeuralNetRegressor
|
|
||||||
import torch.optim as optim
|
|
||||||
|
|
||||||
niceties = {
|
|
||||||
"callbacks": False,
|
|
||||||
"warm_start": False,
|
|
||||||
"train_split": None,
|
|
||||||
"max_epochs": 5,
|
|
||||||
}
|
|
||||||
|
|
||||||
model = NeuralNetRegressor(
|
|
||||||
module=ShallowNet,
|
|
||||||
module__n_features=X.size(dim=1),
|
|
||||||
criterion=nn.MSELoss,
|
|
||||||
optimizer=optim.SGD,
|
|
||||||
optimizer__lr=0.1,
|
|
||||||
optimizer__momentum=0.9,
|
|
||||||
batch_size=64,
|
|
||||||
**niceties,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Initialize the Linear Regression model
|
|
||||||
model = LinearRegression()
|
|
||||||
model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True))
|
|
||||||
|
|
||||||
end_time = time.time()
|
|
||||||
|
|
||||||
print("Time to load data: ", end_time - start_time)
|
|
||||||
|
|
||||||
dask_dataset = DaskDataset(X_train, y_train)
|
|
||||||
dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True)
|
|
||||||
|
|
||||||
for feature, label in dataloader:
|
|
||||||
print(feature)
|
|
||||||
print(label)
|
|
||||||
break
|
|
|
@ -1,32 +0,0 @@
|
||||||
import torch
|
|
||||||
from torch.utils.data import Dataset, DataLoader
|
|
||||||
import dask.dataframe as dd
|
|
||||||
|
|
||||||
import torch.nn as nn
|
|
||||||
import torch.nn.functional as F
|
|
||||||
|
|
||||||
# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns
|
|
||||||
|
|
||||||
class DaskDataset(Dataset):
|
|
||||||
def __init__(self, df_features, df_labels):
|
|
||||||
self.features = df_features.to_dask_array(lengths=True)
|
|
||||||
self.labels = df_labels.to_dask_array(lengths=True)
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
return self.features.size
|
|
||||||
|
|
||||||
def __getitem__(self, idx):
|
|
||||||
return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values)
|
|
||||||
|
|
||||||
class ShallowNet(nn.Module):
|
|
||||||
def __init__(self, n_features):
|
|
||||||
super().__init__()
|
|
||||||
self.layer1 = nn.Linear(n_features, 128)
|
|
||||||
self.relu = nn.ReLU()
|
|
||||||
self.layer2 = nn.Linear(128, 8)
|
|
||||||
|
|
||||||
def forward(self, x):
|
|
||||||
x = self.layer1(x)
|
|
||||||
x = self.relu(x)
|
|
||||||
x = self.layer2(x)
|
|
||||||
return x
|
|
Loading…
Reference in a new issue