Skip to content

使用 CuPy 來利用 GPU 提昇矩陣運算速度

前言

CuPy 是一個開源的 GPU 加速數值計算函式庫,專為深度學習以及科學計算而設計。它和 Python 中著名的 NumPy 套件有許多相同的使用方法與函式,但更進一步能夠在 GPU 上執行運算。簡單來說,例如矩陣運算等能夠利用 GPU 平行化計算的用途,CuPy 能夠實現一定程度的加速。

我很久以前就知道 CuPy 了,但一直都沒有相關的需求去研究。因為我接觸大部分需要用到 GPU 加速的場景都是深度學習,所以 —— 我直接用 PyTorch 就好了

但之所以想要研究一下,是因為我朋友近日在跟我閒聊,說是他所作的流場分析啊等等的任務場景應該都可以用 GPU 加速,問我有沒有這方面相關的經驗。

所以閒來無事,也是趁著連假就研究了一下 CuPy 的使用方法。實作上我還是用了自己熟悉的神經網路來測試,並沒有一步跨到朋友正在做的流場分析。


安裝與使用

你可以根據你的環境自行安裝 CuPy,只需要注意對應的 GPU 版本。

# For CUDA 10.2
pip install cupy-cuda102

# For CUDA 11.0
pip install cupy-cuda110

# For CUDA 11.1
pip install cupy-cuda111

# For CUDA 11.2 ~ 11.x
pip install cupy-cuda11x

# For CUDA 12.x
pip install cupy-cuda12x

# For AMD ROCm 4.3
pip install cupy-rocm-4-3

# For AMD ROCm 5.0
pip install cupy-rocm-5-0

安裝完畢後,我們可以簡單地使用

import cupy as cp
a = cp.array([1, 2, 3])


來把資料放到 GPU。實際上,CuPy 的使用方式與 NumPy 幾乎一模一樣。


小試牛刀

剛剛,我們透過 CuPy 把資料放進 GPU 了,那麼我們該如何驗證它真的使用 GPU 加速計算呢?

現在,我簡單地實現了一個只有三層的模型(Input Layer / Hidden Layer / Output Layer)用來訓練知名的 MNIST 手寫數字辨識,並且我們可以設定 backendcp (CuPy) 或是 np (NumPy) 來決定要使用 GPU 或是 CPU 進行計算。

我的架構如下:

mnist_cupy/
├── dataloader.py
├── loss_function.py
├── models.py
├── test.py
├── train.py
└── utils.py


DataLoader

在這裡我定義了 MNIST 資料的取得,PyTorch 寫久了整個格式都偏向 PyTorch 的架構了。

from typing import Tuple
import numpy as np
import cupy as cp
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder


def get_mnist_dataloader(backend = cp) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    mnist = fetch_openml("mnist_784")

    x = mnist["data"].to_numpy()
    y = mnist["target"].astype(int).to_numpy()

    mu = np.mean(x)
    sigma = np.std(x)
    x = (x - mu) / sigma

    train_x, test_x = train_test_split(x, test_size=0.2, random_state=2999)
    train_y, test_y = train_test_split(y, test_size=0.2, random_state=2999)

    # OneHotEncoder
    encoder = OneHotEncoder(sparse=False)
    train_y = encoder.fit_transform(train_y.reshape(-1, 1))
    test_y = encoder.fit_transform(test_y.reshape(-1, 1))

    train_x = backend.asarray(train_x)
    test_x = backend.asarray(test_x)
    train_y = backend.asarray(train_y)
    test_y = backend.asarray(test_y)

    return train_x, test_x, train_y, test_y


Model

這裡只有一個最基本的模型,權重的初始化方式也是完全隨機,而非常態分佈或其他較好的初始化方式。

forward() 的部份是模型向前傳播的路徑,backward() 自然就是反向傳播,很暴力地寫了梯度下降。這種時候就真的會意識到平日 PyTorch 的計算圖究竟幫助了我們多少。

最下面是我自定義的儲存、讀取方式。

from typing import Union
import numpy as np
import cupy as cp
from utils import ReLU, softmax


# Settings
SEED = 2999
INPUT_DIM = 28*28
HIDDEN_DIM = 28*28
OUTPUT_DIM = 10

DataArray = Union[np.ndarray, cp.ndarray]


class CustomModel:
    def __init__(self, lr: float = 2e-3, backend = np):
        self.backend = backend
        self.w1 = backend.random.uniform(
            low=-1.0,
            high=1.0,
            size=(INPUT_DIM, HIDDEN_DIM),
        )
        self.w2 = backend.random.uniform(
            low=-1.0,
            high=1.0,
            size=(HIDDEN_DIM, OUTPUT_DIM),
        )
        self.b1 = backend.zeros((1, HIDDEN_DIM))
        self.b2 = backend.zeros((1, OUTPUT_DIM))
        self.lr = lr

    def forward(self, x: DataArray) -> DataArray:
        self.x = x
        self.out_layer_1 = x.dot(self.w1) + self.b1
        self.out_activate_1 = ReLU(self.out_layer_1)
        self.out_layer_2 = self.out_activate_1.dot(self.w2) + self.b2
        self.out_activate_2 = softmax(self.out_layer_2)
        
        return self.out_activate_2

    def backward(self, y_true: DataArray) -> None:
        # Compute cross-entropy gradient
        init_gradient = self.out_activate_2 - y_true

        # Compute the second layer gradient
        dL_dw2 = self.out_activate_1.T.dot(init_gradient)
        dL_db2 = self.backend.sum(init_gradient, axis=0)

        # Compute the first layer gradient
        gradient_2_to_1 = init_gradient.dot(self.w2.T) * (self.out_layer_1 > 0)
        dL_dw1 = self.x.T.dot(gradient_2_to_1)
        dL_db1 = self.backend.sum(gradient_2_to_1, axis=0)

        # Update weights and biases
        self.w1 -= self.lr * dL_dw1
        self.b1 -= self.lr * dL_db1
        self.w2 -= self.lr * dL_dw2
        self.b2 -= self.lr * dL_db2        

    def save_checkpoint(self, path: str = "./checkpoint.npz") -> None:
        self.backend.savez(
            path,
            w1=self.w1,
            w2=self.w2,
            b1=self.b1,
            b2=self.b2,
        )

    def load_checkpoint(self, path: str = "./checkpoint.npz") -> None:
        with self.backend.load(path) as data:
            self.w1 = self.backend.asarray(data["w1"])
            self.w2 = self.backend.asarray(data["w2"])
            self.b1 = self.backend.asarray(data["b1"])
            self.b2 = self.backend.asarray(data["b2"])


Loss Function

大家一定很疑惑這段在做什麼吧? loss function (偏微分 ver.)剛剛在模型那邊不是已經寫死在反向傳播裡了嗎?

沒錯!這裡我就是為了一邊訓練一邊印出 loss 才特意寫的,實際上我們如果不用看 loss 下降,我們就不需要寫這個。

from typing import Union
import numpy as np
import cupy as cp
from utils import get_backend


DataArray = Union[np.ndarray, cp.ndarray]


def cross_entropy_loss(y_true: DataArray, y_pred: DataArray, backend=np) -> DataArray:
    backend = get_backend(y_true)
    # Note: y_true must be one-hot encoding format
    # y_true's shape is (batch_size, classes_num)
    # y_pred's shape is (batch_size, classes_num), it's a logits
    batch_size = y_true.shape[0]

    smoothing = 1e-15
    loss = -1 / batch_size * backend.sum(y_true * backend.log(y_pred + smoothing))

    return loss


Utils

這裡我定義了 get_backend() 來確認我使用的後端、以及我實際上會使用到的 softmax()ReLU() 激活函數。

最下面的 AverageMeter 則是單純用來紀錄 loss 的一個類別。

from typing import Union
import cupy as cp
import numpy as np


DataArray = Union[np.ndarray, cp.ndarray]


def get_backend(x: DataArray):
    return np if isinstance(x, np.ndarray) else cp


def softmax(x: DataArray) -> DataArray:
    backend = get_backend(x)
    exps = backend.exp(x - backend.max(x, axis=-1, keepdims=True))
    return exps / backend.sum(exps, axis=-1, keepdims=True)


def ReLU(x: DataArray) -> DataArray:
    backend = get_backend(x)
    return backend.maximum(0, x)


class AverageMeter:
    """Computes and stores the average and current value of losses"""

    def __init__(self) -> None:
        self.reset()

    def reset(self) -> None:
        """Reset all attributes"""
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val: float, count_num: int = 1) -> None:
        """Update the loss value"""
        self.val = val
        self.sum += val * count_num
        self.count += count_num
        self.avg = self.sum / self.count



Training

終於來到重頭戲了,現在我們可以開始訓練模型了。我們只需要切換 backend,就能決定要使用 GPU 或是 CPU。

import cupy as cp
import numpy as np
from tqdm import tqdm

from dataloader import get_mnist_dataloader
from loss_function import cross_entropy_loss
from models import CustomModel
from utils import AverageMeter


def main() -> None:
    backend = np
    model = CustomModel(lr=0.02, backend=backend)

    # Get dataloader
    train_x, test_x, train_y, test_y = get_mnist_dataloader(backend=backend)

    batch_size = 16
    epochs = 30

    loss_logger = AverageMeter()

    for epoch in range(1, epochs+1):
        steps = len(train_x) // batch_size
        train_pbar = tqdm(total=steps, desc=f"[Epoch {epoch}/{epochs}]")

        for times in range(steps):
            inputs = train_x[times*batch_size:(times+1)*batch_size]
            labels = train_y[times*batch_size:(times+1)*batch_size]

            outputs = model.forward(inputs)
            
            loss = cross_entropy_loss(labels, outputs)
            loss_logger.update(loss)
            model.backward(labels)

            train_pbar.set_description(f"[Epoch {epoch}/{epochs}], Loss: {loss_logger.avg:.4f}")
            train_pbar.update(1)

        train_pbar.close()

        # Save checkpoint
        model.save_checkpoint()


if __name__ == "__main__":
    main()


在我個人測試下,使用 CPU 足足需要 10 分鐘、而使用 GPU 只需要 2 分鐘即可!

是不是很有趣呢?真的是切身體會到了 GPU 平行化計算的強大。

順帶一提這是最後在測試資料上的結果:

import cupy as cp
import numpy as np
import pickle
from tqdm import tqdm
from sklearn.metrics import classification_report

from dataloader import get_mnist_dataloader
from models import CustomModel


def main() -> None:
    backend = cp
    model = CustomModel(backend=backend)
    model.load_checkpoint("./checkpoint.npz")

    # Get dataloader
    train_x, test_x, train_y, test_y = get_mnist_dataloader()

    batch_size = 4

    all_preds = []
    all_labels = []

    steps = len(train_x) // batch_size

    for times in tqdm(range(steps)):
        inputs = test_x[times*batch_size:(times+1)*batch_size]
        labels = test_y[times*batch_size:(times+1)*batch_size]

        inputs = backend.asarray(inputs)
        labels = backend.asarray(labels)

        outputs = model.forward(inputs)

        # Get predictions
        preds = np.argmax(outputs, axis=1).tolist()
        labels = np.argmax(labels, axis=1).tolist()

        all_preds.extend(preds)
        all_labels.extend(labels)

    print(classification_report(all_labels, all_preds))


if __name__ == "__main__":
    main()


Output:


References


Read More

Leave a Reply