Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] : Ray Thread Actor may cause cuda memory leakage. #49360

Open
PanAndy opened this issue Dec 19, 2024 · 2 comments
Open

[Core] : Ray Thread Actor may cause cuda memory leakage. #49360

PanAndy opened this issue Dec 19, 2024 · 2 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@PanAndy
Copy link

PanAndy commented Dec 19, 2024

What happened + What you expected to happen

When we were using ray.Actor to perform calculations related to PyTorch tensors, we noticed that the CUDA memory usage continuously increased with each computation step.
After investigation, we found that this was caused by setting max_concurrency > 1.
In the test provided below, the memory leak reached 3 GB after executing 100 times.

Versions / Dependencies

Version: 2.40.0
python

Reproduction script

import json

import ray
import torch

def log_gpu_memory_usage(head: str):
    memory_allocated = torch.cuda.memory_allocated() / 1024 ** 3
    memory_reserved = torch.cuda.memory_reserved() / 1024 ** 3
    message = f'{head}, memory allocated (GB): {memory_allocated}, memory reserved (GB): {memory_reserved}'
    print(message)
    return memory_allocated, memory_reserved


@ray.remote(num_gpus=1)
class ComputeTensorActor:
    def __init__(self, name):
        self.name = name

    def compute_tensor(self, num=100):
        torch.manual_seed(0)

        tensor_size = (1024, 1024)
        tensor = torch.randn(tensor_size, device='cuda')
        tensor_list = [torch.randn(tensor_size, device='cuda') for _ in range(num * 100)]

        for _ in range(num):
            other_tensor = torch.randn(tensor_size, device='cuda')
            tensor = torch.mm(tensor, other_tensor)
            tensor = torch.relu(tensor)
            tensor += 0.1 * torch.randn(tensor_size, device='cuda')

        metrics = {}
        memory_allocated, memory_reserved = log_gpu_memory_usage(head=f"{self.name} before empty cache")
        metrics["onload/memory_allocated"] = memory_allocated
        metrics["onload/memory_reserved"] = memory_reserved

        del tensor_list, tensor
        torch.cuda.empty_cache()

        memory_allocated, memory_reserved = log_gpu_memory_usage(head=f"{self.name} after empty cache")
        metrics["offload/memory_allocated"] = memory_allocated
        metrics["offload/memory_reserved"] = memory_reserved

        return metrics


def test_thread_actor():
    ray.init(num_gpus=1, ignore_reinit_error=True)
    cp_actor = ComputeTensorActor.options(num_gpus=1, max_concurrency=1000).remote("thread actor")
    num = 100
    metric_list = [ray.get(cp_actor.compute_tensor.remote(num=num)) for _ in range(num)]

    print(metric_list)

    with open("thread_actor_metrics.json", "w") as f:
        json.dump(metric_list, f)


def test_common_actor():
    ray.init(num_gpus=1, ignore_reinit_error=True)
    cp_actor = ComputeTensorActor.options(num_gpus=1).remote("thread actor")
    num = 100
    metric_list = [ray.get(cp_actor.compute_tensor.remote(num=num)) for _ in range(num)]

    print(metric_list)
    with open("common_actor_metrics.json", "w") as f:
        json.dump(metric_list, f)


if __name__ == '__main__':
    test_thread_actor()
    test_common_actor()

last result:

thread_actor_metrics[-1] =     {
        "onload/memory_allocated": 42.1953125,
        "onload/memory_reserved": 42.20703125,
        "offload/memory_allocated": 3.12890625,
        "offload/memory_reserved": 3.14453125
    }

common_actor_metrics[-1] =     {
        "onload/memory_allocated": 39.1015625,
        "onload/memory_reserved": 39.11328125,
        "offload/memory_allocated": 0.03515625,
        "offload/memory_reserved": 0.05078125
    }

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@PanAndy PanAndy added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 19, 2024
@jcotant1 jcotant1 added the core Issues that should be addressed in Ray Core label Dec 19, 2024
@ruisearch42 ruisearch42 added P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 23, 2024
@ruisearch42 ruisearch42 self-assigned this Dec 23, 2024
@ruisearch42 ruisearch42 added P1 Issue that should be fixed within a few weeks and removed P0 Issues that should be fixed in short order labels Dec 23, 2024
@kf-zhang
Copy link

kf-zhang commented Dec 29, 2024

import torch
import ray


def log_gpu_memory_usage(head: str):
    memory_allocated = torch.cuda.memory_allocated() / 1024 ** 3
    memory_reserved = torch.cuda.memory_reserved() / 1024 ** 3
    message = f'{head}, memory allocated (GB): {memory_allocated}, memory reserved (GB): {memory_reserved}'
    return memory_allocated, memory_reserved

MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT: int = 100000

def start_record_memory_history() -> None:
   print("Starting snapshot record_memory_history")
   torch.cuda.memory._record_memory_history(
       max_entries=MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT
   )

def stop_record_memory_history() -> None:
   print("Stopping snapshot record_memory_history")
   torch.cuda.memory._record_memory_history(enabled=None)

def export_memory_snapshot(file_prefix: str) -> None:
   # Prefix for file names.

   try:
       print(f"Saving snapshot to local file: {file_prefix}.pickle")
       torch.cuda.memory._dump_snapshot(f"{file_prefix}.pickle")
   except Exception as e:
       print(f"Failed to capture memory snapshot {e}")
       return

@ray.remote(num_gpus=1)
class Actor:
    def __init__(self, name: str):
        self.name = name
    
    def compute(self):
        start_record_memory_history()
        
        tensor_size = (1024, 1024)
        mat_a = torch.rand(tensor_size, device='cuda')
        mat_b = torch.rand(tensor_size, device='cuda')
        mat_c = torch.mm(mat_a, mat_b) #torch.mm will use cublas to do the matrix multiplication, which will allocate memory(workspace) on the GPU
        
        metrics = {}
        memory_allocated, memory_reserved = log_gpu_memory_usage(head=f"{self.name} before empty cache")
        metrics["onload/memory_allocated"] = memory_allocated
        metrics["onload/memory_reserved"] = memory_reserved

        del mat_a, mat_b, mat_c
        torch.cuda.empty_cache()
        # While tensors are deleted, the workspace allocated by cublas is not released.
        
        memory_allocated, memory_reserved = log_gpu_memory_usage(head=f"{self.name} after empty cache")
        metrics["offload/memory_allocated"] = memory_allocated
        metrics["offload/memory_reserved"] = memory_reserved
        
        export_memory_snapshot(self.name)
        stop_record_memory_history()
        
        return metrics


def test(num_threads: int):
    ray.init()
    actor_handler = Actor.options(max_concurrency=num_threads).remote(f"num_threads_{num_threads}") #the size of the thread pool is num_threads
    futures = [actor_handler.compute.remote() for i in range(num_threads)] #fill the thread pool with num_threads tasks
    metrics = ray.get(futures)
    print(f"num_thread:{num_threads} metrics: {metrics[-1]}")
    ray.shutdown()

if __name__ == '__main__':
    num_threads = [2**i for i in range(10)]
    for x in num_threads:
        test(x)
    

I believe the leaked memory comes from the cuBLAS workspace. When calling torch.mm, PyTorch uses cuBLAS to perform the matrix multiplication, and cuBLAS allocates a workspace that occupies GPU memory, as described in torch getWorkSpace. Additionally, PyTorch allocates a cuBLAS handler for each thread (source code), which causes the size of the workspace to be proportional to the number of threads, specifically the max_concurrency in Ray. To validate this hypothesis, you can run the code above and paste the generated files into https://pytorch.org/memory_viz to check the final memory usage. You can refer to this link for more information.

@PanAndy
Copy link
Author

PanAndy commented Dec 30, 2024

Okay, I understand. Thank you for your response.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

4 participants