Last Updated on 2024-08-07 by Clay
Introduction
CuPy is an open-source GPU-accelerated numerical computation library designed for deep learning and scientific computing. It shares many of the same methods and functions as the popular NumPy package in Python but extends its capabilities to perform computations on the GPU. In short, tasks that can benefit from parallel computation on the GPU, such as matrix operations, can achieve significant acceleration with CuPy.
I have known about CuPy for a long time but never had the need to explore it further. Most of my tasks requiring GPU acceleration involve deep learning, so I simply used PyTorch.
The reason I wanted to delve into CuPy now is that a friend of mine recently mentioned that his tasks, such as flow field analysis, could benefit from GPU acceleration and asked if I had any experience in this area.
With some free time over the holiday, I decided to explore how to use CuPy. For my practical implementation, I tested it using a neural network, which I am familiar with, rather than immediately jumping into my friend's flow field analysis tasks.
Installation and Usage
You can install CuPy according to your environment, just make sure to choose the appropriate version for your GPU.
# For CUDA 10.2
pip install cupy-cuda102
# For CUDA 11.0
pip install cupy-cuda110
# For CUDA 11.1
pip install cupy-cuda111
# For CUDA 11.2 ~ 11.x
pip install cupy-cuda11x
# For CUDA 12.x
pip install cupy-cuda12x
# For AMD ROCm 4.3
pip install cupy-rocm-4-3
# For AMD ROCm 5.0
pip install cupy-rocm-5-0
Once installed, you can easily use it with
import cupy as cp
a = cp.array([1, 2, 3])
to move data to the GPU. In practice, using CuPy is almost identical to using NumPy.
Initial Tests
Earlier, we moved data to the GPU using CuPy. How do we verify that it indeed uses GPU acceleration for computations?
I implemented a simple three-layer model (Input Layer / Hidden Layer / Output Layer) to train the well-known MNIST handwritten digit recognition dataset. We can set the backend
to either cp (CuPy) or np (NumPy) to determine whether to use GPU or CPU for computations.
Here's the structure:
mnist_cupy/ ├── dataloader.py ├── loss_function.py ├── models.py ├── test.py ├── train.py └── utils.py
DataLoader
Here I defined how to fetch MNIST data. Having worked with PyTorch for a long time, my format leans towards PyTorch's structure.
from typing import Tuple
import numpy as np
import cupy as cp
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
def get_mnist_dataloader(backend=cp) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
mnist = fetch_openml("mnist_784")
x = mnist["data"].to_numpy()
y = mnist["target"].astype(int).to_numpy()
mu = np.mean(x)
sigma = np.std(x)
x = (x - mu) / sigma
train_x, test_x = train_test_split(x, test_size=0.2, random_state=2999)
train_y, test_y = train_test_split(y, test_size=0.2, random_state=2999)
# OneHotEncoder
encoder = OneHotEncoder(sparse=False)
train_y = encoder.fit_transform(train_y.reshape(-1, 1))
test_y = encoder.fit_transform(test_y.reshape(-1, 1))
train_x = backend.asarray(train_x)
test_x = backend.asarray(test_x)
train_y = backend.asarray(train_y)
test_y = backend.asarray(test_y)
return train_x, test_x, train_y, test_y
Model
Here is a basic model with random weight initialization. forward()
handles the forward propagation path, and backward()
handles backpropagation with a simple gradient descent implementation. This kind of manual implementation makes one truly appreciate the computation graph support provided by PyTorch.
At the bottom are my custom save and load methods.
from typing import Union
import numpy as np
import cupy as cp
from utils import ReLU, softmax
# Settings
SEED = 2999
INPUT_DIM = 28*28
HIDDEN_DIM = 28*28
OUTPUT_DIM = 10
DataArray = Union[np.ndarray, cp.ndarray]
class CustomModel:
def __init__(self, lr: float = 2e-3, backend=np):
self.backend = backend
self.w1 = backend.random.uniform(
low=-1.0,
high=1.0,
size=(INPUT_DIM, HIDDEN_DIM),
)
self.w2 = backend.random.uniform(
low=-1.0,
high=1.0,
size=(HIDDEN_DIM, OUTPUT_DIM),
)
self.b1 = backend.zeros((1, HIDDEN_DIM))
self.b2 = backend.zeros((1, OUTPUT_DIM))
self.lr = lr
def forward(self, x: DataArray) -> DataArray:
self.x = x
self.out_layer_1 = x.dot(self.w1) + self.b1
self.out_activate_1 = ReLU(self.out_layer_1)
self.out_layer_2 = self.out_activate_1.dot(self.w2) + self.b2
self.out_activate_2 = softmax(self.out_layer_2)
return self.out_activate_2
def backward(self, y_true: DataArray) -> None:
# Compute cross-entropy gradient
init_gradient = self.out_activate_2 - y_true
# Compute the second layer gradient
dL_dw2 = self.out_activate_1.T.dot(init_gradient)
dL_db2 = self.backend.sum(init_gradient, axis=0)
# Compute the first layer gradient
gradient_2_to_1 = init_gradient.dot(self.w2.T) * (self.out_layer_1 > 0)
dL_dw1 = self.x.T.dot(gradient_2_to_1)
dL_db1 = self.backend.sum(gradient_2_to_1, axis=0)
# Update weights and biases
self.w1 -= self.lr * dL_dw1
self.b1 -= self.lr * dL_db1
self.w2 -= self.lr * dL_dw2
self.b2 -= self.lr * dL_db2
def save_checkpoint(self, path: str = "./checkpoint.npz") -> None:
self.backend.savez(
path,
w1=self.w1,
w2=self.w2,
b1=self.b1,
b2=self.b2,
)
def load_checkpoint(self, path: str = "./checkpoint.npz") -> None:
with self.backend.load(path) as data:
self.w1 = self.backend.asarray(data["w1"])
self.w2 = self.backend.asarray(data["w2"])
self.b1 = self.backend.asarray(data["b1"])
self.b2 = self.backend.asarray(data["b2"])
Loss Function
You might wonder why this section is here since the loss function (partial derivative version) was already hardcoded into the backpropagation in the model.
That's right! I included this section solely to print out the loss during training. In practice, if we don't need to monitor the loss, we wouldn't need to write this.
from typing import Union
import numpy as np
import cupy as cp
from utils import get_backend
DataArray = Union[np.ndarray, cp.ndarray]
def cross_entropy_loss(y_true: DataArray, y_pred: DataArray, backend=np) -> DataArray:
backend = get_backend(y_true)
# Note: y_true must be in one-hot encoding format
# y_true's shape is (batch_size, classes_num)
# y_pred's shape is (batch_size, classes_num), it's a logits
batch_size = y_true.shape[0]
smoothing = 1e-15
loss = -1 / batch_size * backend.sum(y_true * backend.log(y_pred + smoothing))
return loss
Utils
Here, I defined get_backend()
to determine the backend being used, as well as the softmax()
and ReLU()
activation functions. The AverageMeter class at the bottom is a simple class to log the loss values.
from typing import Union
import cupy as cp
import numpy as np
DataArray = Union[np.ndarray, cp.ndarray]
def get_backend(x: DataArray):
return np if isinstance(x, np.ndarray) else cp
def softmax(x: DataArray) -> DataArray:
backend = get_backend(x)
exps = backend.exp(x - backend.max(x, axis=-1, keepdims=True))
return exps / backend.sum(exps, axis=-1, keepdims=True)
def ReLU(x: DataArray) -> DataArray:
backend = get_backend(x)
return backend.maximum(0, x)
class AverageMeter:
"""Computes and stores the average and current value of losses"""
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
"""Reset all attributes"""
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val: float, count_num: int = 1) -> None:
"""Update the loss value"""
self.val = val
self.sum += val * count_num
self.count += count_num
self.avg = self.sum / self.count
Training
Finally, we get to the main event. Now we can start training the model. We just need to switch the backend
to choose between using the GPU or CPU.
import cupy as cp
import numpy as np
from tqdm import tqdm
from dataloader import get_mnist_dataloader
from loss_function import cross_entropy_loss
from models import CustomModel
from utils import AverageMeter
def main() -> None:
backend = np
model = CustomModel(lr=0.02, backend=backend)
# Get dataloader
train_x, test_x, train_y, test_y = get_mnist_dataloader(backend=backend)
batch_size = 16
epochs = 30
loss_logger = AverageMeter()
for epoch in range(1, epochs + 1):
steps = len(train_x) // batch_size
train_pbar = tqdm(total=steps, desc=f"[Epoch {epoch}/{epochs}]")
for times in range(steps):
inputs = train_x[times * batch_size : (times + 1) * batch_size]
labels = train_y[times * batch_size : (times + 1) * batch_size]
outputs = model.forward(inputs)
loss = cross_entropy_loss(labels, outputs)
loss_logger.update(loss)
model.backward(labels)
train_pbar.set_description(f"[Epoch {epoch}/{epochs}], Loss: {loss_logger.avg:.4f}")
train_pbar.update(1)
train_pbar.close()
# Save checkpoint
model.save_checkpoint()
if __name__ == "__main__":
main()
In my tests, using the CPU took a full 10 minutes, whereas using the GPU took only 2 minutes!
Isn't it fascinating? It really highlights the power of GPU parallel computation.
By the way, here are the final results on the test data:
import cupy as cp
import numpy as np
import pickle
from tqdm import tqdm
from sklearn.metrics import classification_report
from dataloader import get_mnist_dataloader
from models import CustomModel
def main() -> None:
backend = cp
model = CustomModel(backend=backend)
model.load_checkpoint("./checkpoint.npz")
# Get dataloader
train_x, test_x, train_y, test_y = get_mnist_dataloader()
batch_size = 4
all_preds = []
all_labels = []
steps = len(train_x) // batch_size
for times in tqdm(range(steps)):
inputs = test_x[times * batch_size : (times + 1) * batch_size]
labels = test_y[times * batch_size : (times + 1) * batch_size]
inputs = backend.asarray(inputs)
labels = backend.asarray(labels)
outputs = model.forward(inputs)
# Get predictions
preds = np.argmax(outputs, axis=1).tolist()
labels = np.argmax(labels, axis=1).tolist()
all_preds.extend(preds)
all_labels.extend(labels)
print(classification_report(all_labels, all_preds))
if __name__ == "__main__":
main()
Output: