diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..de50d39 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +notebooks/ diff --git a/README.md b/README.md index a24b0b1..5356f3b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # Dask: How to execute python workloads using a Dask cluster on Vulcan -Wiki link: - Motivation: This document aims to show users how to launch a Dask cluster in our compute platforms and perform a simple workload using it. Structure: @@ -28,54 +26,58 @@ This repository looks at a deployment of a Dask cluster on Vulcan, and executing ## Prerequisites Before running the application, make sure you have the following prerequisites installed in a conda environment: -- [Python 3.8.18](https://www.python.org/downloads/release/python-3818/): This specific python version is used for all uses, you can select it using while creating the conda environment. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters). - [Conda Installation](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html): Ensure that Conda is installed on your local system. For more information on, look at the documentation for Conda on [HLRS HPC systems](https://kb.hlrs.de/platforms/index.php/How_to_move_local_conda_environments_to_the_clusters). - [Dask](https://dask.org/): Install Dask using conda. - [Conda Pack](https://conda.github.io/conda-pack/): Conda pack is used to package the Conda environment into a single tarball. This is used to transfer the environment to Vulcan. ## Getting Started -1. Clone [this repository](https://code.hlrs.de/hpcrsaxe/spark_template) to your local machine: +1. Build and transfer the Conda environment to Hawk: - ```bash - git clone - ``` +Only the `main` and `r` channels are available using the Conda module on the clusters. To use custom packages, we need to move the local Conda environment to Hawk. -2. Go into the direcotry and create an environment using Conda and enirvonment.yaml. Note: Be sure to add the necessary packages in environemnt.yaml: +Follow the instructions in [the Conda environment builder repository](https://code.hlrs.de/SiVeGCS/conda-env-builder). - ```bash - ./deployment_scripts/create-env.sh - ``` +2. Allocate workspace on Hawk: -3. Send all files using `deploy-env.sh`: +Proceed to the next step if you have already configured your workspace. Use the following command to create a workspace on the high-performance filesystem, which will expire in 10 days. For more information, such as how to enable reminder emails, refer to the [workspace mechanism](https://kb.hlrs.de/platforms/index.php/Workspace_mechanism) guide. - ```bash - ./deployment_scripts/deploy-env.sh : - ``` +```bash +ws_allocate dask_workspace 10 +ws_find dask_workspace # find the path to workspace, which is the destination directory in the next step +``` + +3. Clone the repository on Hawk to use the deployment scripts and project structure: + +```bash +cd +git clone +``` 4. Send all the code to the appropriate directory on Vulcan using `scp`: - ```bash - scp .py : - ``` +```bash +scp .py : +``` -5. SSH into Vulcan and start a job interatively using: +5. SSH into Vulcan and start a job interactively using: - ```bash - qsub -I -N DaskJob -l select=4:node_type=clx-21 -l walltime=02:00:00 - ``` +```bash +qsub -I -N DaskJob -l select=1:node_type=clx-21 -l walltime=02:00:00 +``` +Note: For multiple nodes, it is recommended to write a `.pbs` script and submit it using `qsub`. 6. Go into the directory with all code: - ```bash - cd - ``` +```bash +cd +``` 7. Initialize the Dask cluster: - ```bash - source deploy-dask.sh "$(pwd)" - ``` +```bash +source deploy-dask.sh "$(pwd)" +``` Note: At the moment, the deployment is verbose, and there is no implementation to silence the logs. Note: Make sure all permissions are set using `chmod +x` for all scripts. diff --git a/deployment_scripts/create-env.sh b/deployment_scripts/create-env.sh index 5cfe9fc..b8cb9cf 100755 --- a/deployment_scripts/create-env.sh +++ b/deployment_scripts/create-env.sh @@ -19,7 +19,11 @@ else echo "Environment '$CONDA_ENV_NAME' does not exist, creating it." # Create Conda environment - conda env create --name $CONDA_ENV_NAME -f environment.yaml + CONDA_SUBDIR=linux-64 conda env create --name $CONDA_ENV_NAME -f environment.yaml - echo "Conda environment '$CONDA_ENV_NAME' created." + if [ $? -eq 0 ]; then + echo "Conda environment '$CONDA_ENV_NAME' created successfully." + else + echo "Failed to create Conda environment '$CONDA_ENV_NAME'." + fi fi \ No newline at end of file diff --git a/deployment_scripts/README.md b/deployment_scripts/deployment_scripts_reference.md similarity index 99% rename from deployment_scripts/README.md rename to deployment_scripts/deployment_scripts_reference.md index 9dc3dc0..ac0afc2 100644 --- a/deployment_scripts/README.md +++ b/deployment_scripts/deployment_scripts_reference.md @@ -1,7 +1,5 @@ # Reference Guide: Dask Cluster Deployment Scripts -Wiki link: - Motivation: This document aims to show users how to use additional Dask deployment scripts to streamline the deployment and management of a Dask cluster on a high-performance computing (HPC) environment. Structure: diff --git a/src/dask-example-pi.py b/src/dask-example-pi.py index 354a25c..8ff801c 100644 --- a/src/dask-example-pi.py +++ b/src/dask-example-pi.py @@ -1,57 +1,16 @@ -import os -import dask +import dask.bag as db import random - from dask.distributed import Client -import dask.dataframe as dd -import pandas as pd - -from dask_ml.preprocessing import MinMaxScaler -from dask_ml.model_selection import train_test_split -from dask_ml.linear_model import LinearRegression - -import torch -from torch.utils.data import Dataset, DataLoader client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786") +NUM_SAMPLES=100 -sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",] +def inside(p): + x, y = random.random(), random.random() + return x*x + y*y < 1 -#sample_dataset="/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet" +def calc_pi(): + count = db.from_sequence(range(0, NUM_SAMPLES)).filter(inside).count().compute() + return 4.0 * count / NUM_SAMPLES -df = dd.read_parquet(sample_datasets, engine="fastparquet").repartition(npartitions=300) - -df_features = df.loc[:, df.columns.str.contains('feature')] -df_labels = df.loc[:, df.columns.str.contains('label')] - -# Create a StandardScaler object -scaler_features = MinMaxScaler() -df_features_scaled = scaler_features.fit_transform(df_features) - -#df_features_scaled = df_features_scaled.loc[:, df_features_scaled.columns.str.contains('feature')] - -#Split the data into training and test sets -X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, random_state=0) - -dataloader = DataLoader(df_features_scaled, batch_size=64, shuffle=True) - -model = torch.nn.Sequential( - torch.nn.Linear(784, 128), - torch.nn.ReLU(), - torch.nn.Linear(128, 10) -) - -#X_train = X_train.loc[:, X_train.columns.str.contains('feature')] -#y_train = y_train.loc[:, y_train.columns.str.contains('label')] - -# Initialize the Linear Regression model -model = LinearRegression().fit(X_train, y_train) - -score = model.score(X_test, y_test) \ No newline at end of file +print(calc_pi()) \ No newline at end of file diff --git a/src/dask-perf-check.py b/src/dask-perf-check.py deleted file mode 100644 index 9a46036..0000000 --- a/src/dask-perf-check.py +++ /dev/null @@ -1,82 +0,0 @@ -import os -import dask -import random -import torch -from torch.utils.data import Dataset, DataLoader -import time - -from dask.distributed import Client -import dask.dataframe as dd -import pandas as pd -from joblib import parallel_backend - -from dask_ml.preprocessing import MinMaxScaler -from dask_ml.model_selection import train_test_split -from dask_ml.linear_model import LinearRegression - -from daskdataset import DaskDataset, ShallowNet - -start_time = time.time() - -client = Client(str(os.getenv('HOSTNAME')) + "-ib:8786") - -sample_datasets=["/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset1.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset2.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset3.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset4.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset5.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset6.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset7.parquet", - "/lustre/nec/ws3/ws/hpcrsaxe-hpcrsaxe/datasets/dataset8.parquet",] -df = dd.read_parquet(sample_datasets, engine="fastparquet")#.repartition(partition_size="100MB") -#df_future = client.scatter(df) - -#separate the features and labels -df_labels = df.loc[:, df.columns.str.contains('label')] - -# Create a StandardScaler object -scaler_features = MinMaxScaler() -df_features_scaled = scaler_features.fit_transform(df.loc[:, df.columns.str.contains('feature')]) - -#Split the data into training and test sets -X_train, X_test, y_train, y_test = train_test_split(df_features_scaled, df_labels, shuffle=True) - -X = torch.tensor(X_train.compute().values) -y = torch.tensor(y_train.compute().values) - -from skorch import NeuralNetRegressor -import torch.optim as optim - -niceties = { - "callbacks": False, - "warm_start": False, - "train_split": None, - "max_epochs": 5, -} - -model = NeuralNetRegressor( - module=ShallowNet, - module__n_features=X.size(dim=1), - criterion=nn.MSELoss, - optimizer=optim.SGD, - optimizer__lr=0.1, - optimizer__momentum=0.9, - batch_size=64, - **niceties, -) - -# Initialize the Linear Regression model -model = LinearRegression() -model.fit(X_train.to_dask_array(lengths=True), y_train.to_dask_array(lengths=True)) - -end_time = time.time() - -print("Time to load data: ", end_time - start_time) - -dask_dataset = DaskDataset(X_train, y_train) -dataloader = DataLoader(dask_dataset, batch_size=64, shuffle=True) - -for feature, label in dataloader: - print(feature) - print(label) - break \ No newline at end of file diff --git a/src/daskdataset.py b/src/daskdataset.py deleted file mode 100644 index 34c64b3..0000000 --- a/src/daskdataset.py +++ /dev/null @@ -1,32 +0,0 @@ -import torch -from torch.utils.data import Dataset, DataLoader -import dask.dataframe as dd - -import torch.nn as nn -import torch.nn.functional as F - -# Assuming you have a Dask DataFrame df with 'features' and 'labels' columns - -class DaskDataset(Dataset): - def __init__(self, df_features, df_labels): - self.features = df_features.to_dask_array(lengths=True) - self.labels = df_labels.to_dask_array(lengths=True) - - def __len__(self): - return self.features.size - - def __getitem__(self, idx): - return torch.tensor(self.features.compute().values), torch.tensor(self.labels.compute().values) - -class ShallowNet(nn.Module): - def __init__(self, n_features): - super().__init__() - self.layer1 = nn.Linear(n_features, 128) - self.relu = nn.ReLU() - self.layer2 = nn.Linear(128, 8) - - def forward(self, x): - x = self.layer1(x) - x = self.relu(x) - x = self.layer2(x) - return x \ No newline at end of file