forked from SiVeGCS/dask_template
Changes according to issue #1
This commit is contained in:
parent
f3b8da05d9
commit
251f190e11
7 changed files with 47 additions and 196 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
__pycache__
|
||||
notebooks/
|
58
README.md
58
README.md
|
@ -1,7 +1,5 @@
|
|||
# Dask: How to execute python workloads using a Dask cluster on Vulcan
|
||||
|
||||
Wiki link:
|
||||
|
||||
Motivation: This document aims to show users how to launch a Dask cluster in our compute platforms and perform a simple workload using it.
|
||||
|
||||
Structure:
|
||||
|
@ -28,54 +26,58 @@ This repository looks at a deployment of a Dask cluster on Vulcan, and executing
|
|||
## Prerequisites
|
||||
|
||||
Before running the application, make sure you have the following prerequisites installed in a conda environment:
|
||||
- [Python 3.8.18](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
||||
- [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters).
|
||||
- [Dask](https://dask.org/): Install Dask using conda.
|
||||
- [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan.
|
||||
|
||||
## Getting Started
|
||||
|
||||
1. Clone [this repository](https://code.hlrs.de/hpcrsaxe/spark_template) to your local machine:
|
||||
1. Build and transfer the Conda environment to Hawk:
|
||||
|
||||
```bash
|
||||
git clone <repository_url>
|
||||
```
|
||||
Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Hawk.
|
||||
|
||||
2. Go into the direcotry and create an environment using Conda and enirvonment.yaml. Note: Be sure to add the necessary packages in environemnt.yaml:
|
||||
Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder).
|
||||
|
||||
```bash
|
||||
./deployment_scripts/create-env.sh <your-env>
|
||||
```
|
||||
2. Allocate workspace on Hawk:
|
||||
|
||||
3. Send all files using `deploy-env.sh`:
|
||||
Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide.
|
||||
|
||||
```bash
|
||||
./deployment_scripts/deploy-env.sh <your-env> <destination_host>:<destination_directory>
|
||||
```
|
||||
```bash
|
||||
ws_allocate dask_workspace 10
|
||||
ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step
|
||||
```
|
||||
|
||||
3. Clone the repository on Hawk to use the deployment scripts and project structure:
|
||||
|
||||
```bash
|
||||
cd <workspace_directory>
|
||||
git clone <repository_url>
|
||||
```
|
||||
|
||||
4. Send all the code to the appropriate directory on Vulcan using `scp`:
|
||||
|
||||
```bash
|
||||
scp <your_script>.py <destination_host>:<destination_directory>
|
||||
```
|
||||
```bash
|
||||
scp <your_script>.py <destination_host>:<destination_directory>
|
||||
```
|
||||
|
||||
5. SSH into Vulcan and start a job interatively using:
|
||||
5. SSH into Vulcan and start a job interactively using:
|
||||
|
||||
```bash
|
||||
qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00
|
||||
```
|
||||
```bash
|
||||
qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00
|
||||
```
|
||||
Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`.
|
||||
|
||||
6. Go into the directory with all code:
|
||||
|
||||
```bash
|
||||
cd <destination_directory>
|
||||
```
|
||||
```bash
|
||||
cd <destination_directory>
|
||||
```
|
||||
|
||||
7. Initialize the Dask cluster:
|
||||
|
||||
```bash
|
||||
source deploy-dask.sh "$(pwd)"
|
||||
```
|
||||
```bash
|
||||
source deploy-dask.sh "$(pwd)"
|
||||
```
|
||||
Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs.
|
||||
Note: Make sure all permissions are set using `chmod +x` for all scripts.
|
||||
|
||||
|
|
|
@ -19,7 +19,11 @@ else
|
|||
echo "Environment '$CONDA_ENV_NAME' does not exist, creating it."
|
||||
|
||||
# Create Conda environment
|
||||
conda env create --name $CONDA_ENV_NAME -f environment.yaml
|
||||
CONDA_SUBDIR=linux-64 conda env create --name $CONDA_ENV_NAME -f environment.yaml
|
||||
|
||||
echo "Conda environment '$CONDA_ENV_NAME' created."
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Conda environment '$CONDA_ENV_NAME' created successfully."
|
||||
else
|
||||
echo "Failed to create Conda environment '$CONDA_ENV_NAME'."
|
||||
fi
|
||||
fi
|
|
@ -1,7 +1,5 @@
|
|||
# Reference Guide: Dask Cluster Deployment Scripts
|
||||
|
||||
Wiki link:
|
||||
|
||||
Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment.
|
||||
|
||||
Structure:
|
|
@ -1,57 +1,16 @@
|
|||
import os
|
||||
import dask
|
||||
import dask.bag as db
|
||||
import random
|
||||
|
||||
from dask.distributed import Client
|
||||
import dask.dataframe as dd
|
||||
import pandas as pd
|
||||
|
||||
from dask_ml.preprocessing import MinMaxScaler
|
||||
from dask_ml.model_selection import train_test_split
|
||||
from dask_ml.linear_model import LinearRegression
|
||||
|
||||
import torch
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
|
||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
||||
NUM_SAMPLES=100
|
||||
|
||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
||||
def inside(p):
|
||||
x, y = random.random(), random.random()
|
||||
return x*x + y*y < 1
|
||||
|
||||
#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet"
|
||||
def calc_pi():
|
||||
count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute()
|
||||
return 4.0 * count / NUM_SAMPLES
|
||||
|
||||
df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300)
|
||||
|
||||
df_features = df.loc[:, df.columns.str.contains('feature')]
|
||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
||||
|
||||
# Create a StandardScaler object
|
||||
scaler_features = MinMaxScaler()
|
||||
df_features_scaled = scaler_features.fit_transform(df_features)
|
||||
|
||||
#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')]
|
||||
|
||||
#Split the data into training and test sets
|
||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0)
|
||||
|
||||
dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True)
|
||||
|
||||
model = torch.nn.Sequential(
|
||||
torch.nn.Linear(784, 128),
|
||||
torch.nn.ReLU(),
|
||||
torch.nn.Linear(128, 10)
|
||||
)
|
||||
|
||||
#X_train = X_train.loc[:, X_train.columns.str.contains('feature')]
|
||||
#y_train = y_train.loc[:, y_train.columns.str.contains('label')]
|
||||
|
||||
# Initialize the Linear Regression model
|
||||
model = LinearRegression().fit(X_train, y_train)
|
||||
|
||||
score = model.score(X_test, y_test)
|
||||
print(calc_pi())
|
|
@ -1,82 +0,0 @@
|
|||
import os
|
||||
import dask
|
||||
import random
|
||||
import torch
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
import time
|
||||
|
||||
from dask.distributed import Client
|
||||
import dask.dataframe as dd
|
||||
import pandas as pd
|
||||
from joblib import parallel_backend
|
||||
|
||||
from dask_ml.preprocessing import MinMaxScaler
|
||||
from dask_ml.model_selection import train_test_split
|
||||
from dask_ml.linear_model import LinearRegression
|
||||
|
||||
from daskdataset import DaskDataset, ShallowNet
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786")
|
||||
|
||||
sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet",
|
||||
"/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",]
|
||||
df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB")
|
||||
#df_future = client.scatter(df)
|
||||
|
||||
#separate the features and labels
|
||||
df_labels = df.loc[:, df.columns.str.contains('label')]
|
||||
|
||||
# Create a StandardScaler object
|
||||
scaler_features = MinMaxScaler()
|
||||
df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')])
|
||||
|
||||
#Split the data into training and test sets
|
||||
X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True)
|
||||
|
||||
X = torch.tensor(X_train.compute().values)
|
||||
y = torch.tensor(y_train.compute().values)
|
||||
|
||||
from skorch import NeuralNetRegressor
|
||||
import torch.optim as optim
|
||||
|
||||
niceties = {
|
||||
"callbacks": False,
|
||||
"warm_start": False,
|
||||
"train_split": None,
|
||||
"max_epochs": 5,
|
||||
}
|
||||
|
||||
model = NeuralNetRegressor(
|
||||
module=ShallowNet,
|
||||
module__n_features=X.size(dim=1),
|
||||
criterion=nn.MSELoss,
|
||||
optimizer=optim.SGD,
|
||||
optimizer__lr=0.1,
|
||||
optimizer__momentum=0.9,
|
||||
batch_size=64,
|
||||
**niceties,
|
||||
)
|
||||
|
||||
# Initialize the Linear Regression model
|
||||
model = LinearRegression()
|
||||
model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True))
|
||||
|
||||
end_time = time.time()
|
||||
|
||||
print("Time to load data: ", end_time - start_time)
|
||||
|
||||
dask_dataset = DaskDataset(X_train, y_train)
|
||||
dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True)
|
||||
|
||||
for feature, label in dataloader:
|
||||
print(feature)
|
||||
print(label)
|
||||
break
|
|
@ -1,32 +0,0 @@
|
|||
import torch
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
import dask.dataframe as dd
|
||||
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
|
||||
# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns
|
||||
|
||||
class DaskDataset(Dataset):
|
||||
def __init__(self, df_features, df_labels):
|
||||
self.features = df_features.to_dask_array(lengths=True)
|
||||
self.labels = df_labels.to_dask_array(lengths=True)
|
||||
|
||||
def __len__(self):
|
||||
return self.features.size
|
||||
|
||||
def __getitem__(self, idx):
|
||||
return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values)
|
||||
|
||||
class ShallowNet(nn.Module):
|
||||
def __init__(self, n_features):
|
||||
super().__init__()
|
||||
self.layer1 = nn.Linear(n_features, 128)
|
||||
self.relu = nn.ReLU()
|
||||
self.layer2 = nn.Linear(128, 8)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.layer1(x)
|
||||
x = self.relu(x)
|
||||
x = self.layer2(x)
|
||||
return x
|
Loading…
Reference in a new issue