Skip to content

Using CuPy to Accelerate Matrix Operations with GPU

Last Updated on 2024-08-07 by Clay

Introduction

CuPy is an open-source GPU-accelerated numerical computation library designed for deep learning and scientific computing. It shares many of the same methods and functions as the popular NumPy package in Python but extends its capabilities to perform computations on the GPU. In short, tasks that can benefit from parallel computation on the GPU, such as matrix operations, can achieve significant acceleration with CuPy.

I have known about CuPy for a long time but never had the need to explore it further. Most of my tasks requiring GPU acceleration involve deep learning, so I simply used PyTorch.

The reason I wanted to delve into CuPy now is that a friend of mine recently mentioned that his tasks, such as flow field analysis, could benefit from GPU acceleration and asked if I had any experience in this area.

With some free time over the holiday, I decided to explore how to use CuPy. For my practical implementation, I tested it using a neural network, which I am familiar with, rather than immediately jumping into my friend's flow field analysis tasks.


Installation and Usage

You can install CuPy according to your environment, just make sure to choose the appropriate version for your GPU.

# For CUDA 10.2
pip install cupy-cuda102

# For CUDA 11.0
pip install cupy-cuda110

# For CUDA 11.1
pip install cupy-cuda111

# For CUDA 11.2 ~ 11.x
pip install cupy-cuda11x

# For CUDA 12.x
pip install cupy-cuda12x

# For AMD ROCm 4.3
pip install cupy-rocm-4-3

# For AMD ROCm 5.0
pip install cupy-rocm-5-0


Once installed, you can easily use it with

import cupy as cp
a = cp.array([1, 2, 3])


to move data to the GPU. In practice, using CuPy is almost identical to using NumPy.


Initial Tests

Earlier, we moved data to the GPU using CuPy. How do we verify that it indeed uses GPU acceleration for computations?

I implemented a simple three-layer model (Input Layer / Hidden Layer / Output Layer) to train the well-known MNIST handwritten digit recognition dataset. We can set the backend to either cp (CuPy) or np (NumPy) to determine whether to use GPU or CPU for computations.

Here's the structure:

mnist_cupy/
├── dataloader.py
├── loss_function.py
├── models.py
├── test.py
├── train.py
└── utils.py


DataLoader

Here I defined how to fetch MNIST data. Having worked with PyTorch for a long time, my format leans towards PyTorch's structure.

from typing import Tuple
import numpy as np
import cupy as cp
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder


def get_mnist_dataloader(backend=cp) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    mnist = fetch_openml("mnist_784")

    x = mnist["data"].to_numpy()
    y = mnist["target"].astype(int).to_numpy()

    mu = np.mean(x)
    sigma = np.std(x)
    x = (x - mu) / sigma

    train_x, test_x = train_test_split(x, test_size=0.2, random_state=2999)
    train_y, test_y = train_test_split(y, test_size=0.2, random_state=2999)

    # OneHotEncoder
    encoder = OneHotEncoder(sparse=False)
    train_y = encoder.fit_transform(train_y.reshape(-1, 1))
    test_y = encoder.fit_transform(test_y.reshape(-1, 1))

    train_x = backend.asarray(train_x)
    test_x = backend.asarray(test_x)
    train_y = backend.asarray(train_y)
    test_y = backend.asarray(test_y)

    return train_x, test_x, train_y, test_y


Model

Here is a basic model with random weight initialization. forward() handles the forward propagation path, and backward() handles backpropagation with a simple gradient descent implementation. This kind of manual implementation makes one truly appreciate the computation graph support provided by PyTorch.

At the bottom are my custom save and load methods.

from typing import Union
import numpy as np
import cupy as cp
from utils import ReLU, softmax


# Settings
SEED = 2999
INPUT_DIM = 28*28
HIDDEN_DIM = 28*28
OUTPUT_DIM = 10

DataArray = Union[np.ndarray, cp.ndarray]


class CustomModel:
    def __init__(self, lr: float = 2e-3, backend=np):
        self.backend = backend
        self.w1 = backend.random.uniform(
            low=-1.0,
            high=1.0,
            size=(INPUT_DIM, HIDDEN_DIM),
        )
        self.w2 = backend.random.uniform(
            low=-1.0,
            high=1.0,
            size=(HIDDEN_DIM, OUTPUT_DIM),
        )
        self.b1 = backend.zeros((1, HIDDEN_DIM))
        self.b2 = backend.zeros((1, OUTPUT_DIM))
        self.lr = lr

    def forward(self, x: DataArray) -> DataArray:
        self.x = x
        self.out_layer_1 = x.dot(self.w1) + self.b1
        self.out_activate_1 = ReLU(self.out_layer_1)
        self.out_layer_2 = self.out_activate_1.dot(self.w2) + self.b2
        self.out_activate_2 = softmax(self.out_layer_2)
        
        return self.out_activate_2

    def backward(self, y_true: DataArray) -> None:
        # Compute cross-entropy gradient
        init_gradient = self.out_activate_2 - y_true

        # Compute the second layer gradient
        dL_dw2 = self.out_activate_1.T.dot(init_gradient)
        dL_db2 = self.backend.sum(init_gradient, axis=0)

        # Compute the first layer gradient
        gradient_2_to_1 = init_gradient.dot(self.w2.T) * (self.out_layer_1 > 0)
        dL_dw1 = self.x.T.dot(gradient_2_to_1)
        dL_db1 = self.backend.sum(gradient_2_to_1, axis=0)

        # Update weights and biases
        self.w1 -= self.lr * dL_dw1
        self.b1 -= self.lr * dL_db1
        self.w2 -= self.lr * dL_dw2
        self.b2 -= self.lr * dL_db2        

    def save_checkpoint(self, path: str = "./checkpoint.npz") -> None:
        self.backend.savez(
            path,
            w1=self.w1,
            w2=self.w2,
            b1=self.b1,
            b2=self.b2,
        )

    def load_checkpoint(self, path: str = "./checkpoint.npz") -> None:
        with self.backend.load(path) as data:
            self.w1 = self.backend.asarray(data["w1"])
            self.w2 = self.backend.asarray(data["w2"])
            self.b1 = self.backend.asarray(data["b1"])
            self.b2 = self.backend.asarray(data["b2"])


Loss Function

You might wonder why this section is here since the loss function (partial derivative version) was already hardcoded into the backpropagation in the model.

That's right! I included this section solely to print out the loss during training. In practice, if we don't need to monitor the loss, we wouldn't need to write this.

from typing import Union
import numpy as np
import cupy as cp
from utils import get_backend


DataArray = Union[np.ndarray, cp.ndarray]


def cross_entropy_loss(y_true: DataArray, y_pred: DataArray, backend=np) -> DataArray:
    backend = get_backend(y_true)
    # Note: y_true must be in one-hot encoding format
    # y_true's shape is (batch_size, classes_num)
    # y_pred's shape is (batch_size, classes_num), it's a logits
    batch_size = y_true.shape[0]

    smoothing = 1e-15
    loss = -1 / batch_size * backend.sum(y_true * backend.log(y_pred + smoothing))

    return loss


Utils

Here, I defined get_backend() to determine the backend being used, as well as the softmax() and ReLU() activation functions. The AverageMeter class at the bottom is a simple class to log the loss values.

from typing import Union
import cupy as cp
import numpy as np


DataArray = Union[np.ndarray, cp.ndarray]


def get_backend(x: DataArray):
    return np if isinstance(x, np.ndarray) else cp


def softmax(x: DataArray) -> DataArray:
    backend = get_backend(x)
    exps = backend.exp(x - backend.max(x, axis=-1, keepdims=True))
    return exps / backend.sum(exps, axis=-1, keepdims=True)


def ReLU(x: DataArray) -> DataArray:
    backend = get_backend(x)
    return backend.maximum(0, x)


class AverageMeter:
    """Computes and stores the average and current value of losses"""

    def __init__(self) -> None:
        self.reset()

    def reset(self) -> None:
        """Reset all attributes"""
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val: float, count_num: int = 1) -> None:
        """Update the loss value"""
        self.val = val
        self.sum += val * count_num
        self.count += count_num
        self.avg = self.sum / self.count



Training

Finally, we get to the main event. Now we can start training the model. We just need to switch the backend to choose between using the GPU or CPU.

import cupy as cp
import numpy as np
from tqdm import tqdm

from dataloader import get_mnist_dataloader
from loss_function import cross_entropy_loss
from models import CustomModel
from utils import AverageMeter


def main() -> None:
    backend = np
    model = CustomModel(lr=0.02, backend=backend)

    # Get dataloader
    train_x, test_x, train_y, test_y = get_mnist_dataloader(backend=backend)

    batch_size = 16
    epochs = 30

    loss_logger = AverageMeter()

    for epoch in range(1, epochs + 1):
        steps = len(train_x) // batch_size
        train_pbar = tqdm(total=steps, desc=f"[Epoch {epoch}/{epochs}]")

        for times in range(steps):
            inputs = train_x[times * batch_size : (times + 1) * batch_size]
            labels = train_y[times * batch_size : (times + 1) * batch_size]

            outputs = model.forward(inputs)

            loss = cross_entropy_loss(labels, outputs)
            loss_logger.update(loss)
            model.backward(labels)

            train_pbar.set_description(f"[Epoch {epoch}/{epochs}], Loss: {loss_logger.avg:.4f}")
            train_pbar.update(1)

        train_pbar.close()

        # Save checkpoint
        model.save_checkpoint()


if __name__ == "__main__":
    main()


In my tests, using the CPU took a full 10 minutes, whereas using the GPU took only 2 minutes!

Isn't it fascinating? It really highlights the power of GPU parallel computation.

By the way, here are the final results on the test data:

import cupy as cp
import numpy as np
import pickle
from tqdm import tqdm
from sklearn.metrics import classification_report

from dataloader import get_mnist_dataloader
from models import CustomModel


def main() -> None:
    backend = cp
    model = CustomModel(backend=backend)
    model.load_checkpoint("./checkpoint.npz")

    # Get dataloader
    train_x, test_x, train_y, test_y = get_mnist_dataloader()

    batch_size = 4

    all_preds = []
    all_labels = []

    steps = len(train_x) // batch_size

    for times in tqdm(range(steps)):
        inputs = test_x[times * batch_size : (times + 1) * batch_size]
        labels = test_y[times * batch_size : (times + 1) * batch_size]

        inputs = backend.asarray(inputs)
        labels = backend.asarray(labels)

        outputs = model.forward(inputs)

        # Get predictions
        preds = np.argmax(outputs, axis=1).tolist()
        labels = np.argmax(labels, axis=1).tolist()

        all_preds.extend(preds)
        all_labels.extend(labels)

    print(classification_report(all_labels, all_preds))


if __name__ == "__main__":
    main()


Output:


References


Read More

Leave a ReplyCancel reply

Exit mobile version