Skip to content

[core] parallel loading of shards #12028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 13, 2025
Merged

[core] parallel loading of shards #12028

merged 23 commits into from
Aug 13, 2025

Conversation

sayakpaul
Copy link
Member

@sayakpaul sayakpaul commented Jul 31, 2025

What does this PR do?

Similar to huggingface/transformers#36835.

`main`: time: 8.162s
this branch: time: 5.663s
code
import time
t_ini = time.time()

import torch
import os
from diffusers import DiffusionPipeline, AutoModel
print(f"import time: {time.time() - t_ini:.3f}s")

os.environ["HF_ENABLE_PARALLEL_LOADING"] = "YES"
os.environ["HF_PARALLEL_LOADING_WORKERS"] = "12"
model_id = "Wan-AI/Wan2.2-I2V-A14B-Diffusers"

t0 = time.time()
torch.cuda.synchronize()
print(f"CUDA sync time: {time.time() - t0:.3f}s")

print("starting model load")
t1 = time.time()
transformer = AutoModel.from_pretrained(
    model_id, subfolder="transformer", torch_dtype=torch.bfloat16, device_map="cuda"
)
torch.cuda.synchronize()
t2 = time.time()

diff = t2 - t1
print(f"time: {diff:.3f}s")

@@ -310,6 +311,130 @@ def load_model_dict_into_meta(
return offload_index, state_dict_index


def check_support_param_buffer_assignment(model_to_load, state_dict, start_prefix=""):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it here from modeling_utils.py.

return offload_index, state_dict_index, mismatched_keys, error_msgs


def _find_mismatched_keys(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Moved it out of modeling_utils.py.

Comment on lines -1569 to -1579
if len(resolved_model_file) > 1:
resolved_model_file = logging.tqdm(resolved_model_file, desc="Loading checkpoint shards")

mismatched_keys = []
assign_to_params_buffers = None
error_msgs = []

for shard_file in resolved_model_file:
state_dict = load_state_dict(shard_file, dduf_entries=dduf_entries)
mismatched_keys += _find_mismatched_keys(
state_dict, model_state_dict, loaded_keys, ignore_mismatched_sizes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved to load_shard_file().

@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.



def load_shard_files_with_threadpool(args_list):
num_workers = int(os.environ.get("HF_PARALLEL_LOADING_WORKERS", "8"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would add HF_PARALLEL_LOADING_WORKERS as a constant at the top of the file for consistency.

Comment on lines 1551 to 1571
args_list = [
(
model,
model_state_dict,
shard_file,
device_map,
dtype,
hf_quantizer,
keep_in_fp32_modules,
dduf_entries,
loaded_keys,
unexpected_keys,
offload_index,
offload_folder,
state_dict_index,
state_dict_folder,
ignore_mismatched_sizes,
low_cpu_mem_usage,
)
for shard_file in resolved_model_file
]
Copy link
Collaborator

@DN6 DN6 Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same arguments are used across the two loading functions, it's a good candidate for functools.partial

        load_fn = partial(
            load_shard_files_with_threadpool if is_parallel_loading_enabled else load_shard_file,
            model=model,
            model_state_dict=model_state_dict,
            device_map=device_map,
            dtype=dtype,
            hf_quantizer=hf_quantizer,
            keep_in_fp32_modules=keep_in_fp32_modules,
            dduf_entries=dduf_entries,
            loaded_keys=loaded_keys,
            unexpected_keys=unexpected_keys,
            offload_index=offload_index,
            offload_folder=offload_folder,
            state_dict_index=state_dict_index,
            state_dict_folder=state_dict_folder,
            ignore_mismatched_sizes=ignore_mismatched_sizes,
            low_cpu_mem_usage=low_cpu_mem_usage,
        )

        if is_parallel_loading_enabled:
            offload_index, state_dict_index, _mismatched_keys, _error_msgs = load_fn(
                resolved_model_file,
            )
            error_msgs += _error_msgs
            mismatched_keys += _mismatched_keys
        else:
            shard_files = resolved_model_file
            if len(resolved_model_file) > 1:
                shard_files = logging.tqdm(resolved_model_file, desc="Loading checkpoint shards")

            for shard_file in resolved_model_file:
                offload_index, state_dict_index, _mismatched_keys, _error_msgs = load_fn(shard_file)
                error_msgs += _error_msgs
                mismatched_keys += _mismatched_keys

@sayakpaul sayakpaul marked this pull request as ready for review August 12, 2025 15:36
@sayakpaul
Copy link
Member Author

@stevhliu, could you help add docs for this PR (separate PR is fine)? I think we could have some guidance on how to load a DiffusionPipeline faster when the hardware allows for it (like directly loading all model components on the accelerator).

#11904 could also be mentioned in the document.

Then we're working on #12122

@sayakpaul sayakpaul changed the title [wip][core] parallel loading of shards [core] parallel loading of shards Aug 12, 2025
@sayakpaul
Copy link
Member Author

@DN6 thanks a lot for your thoughtful suggestions. I have reflected them and I have added a test case, as well.

LMK what you think.

Copy link
Collaborator

@DN6 DN6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@sayakpaul sayakpaul merged commit baa9b58 into main Aug 13, 2025
34 of 35 checks passed
@sayakpaul sayakpaul deleted the parallel-shards-loading branch August 13, 2025 05:03
@@ -43,6 +43,8 @@
DIFFUSERS_REQUEST_TIMEOUT = 60
DIFFUSERS_ATTN_BACKEND = os.getenv("DIFFUSERS_ATTN_BACKEND", "native")
DIFFUSERS_ATTN_CHECKS = os.getenv("DIFFUSERS_ATTN_CHECKS", "0") in ENV_VARS_TRUE_VALUES
DEFAULT_HF_PARALLEL_LOADING_WORKERS = 8
HF_PARALLEL_LOADING_FLAG = "HF_ENABLE_PARALLEL_LOADING"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to run the env check here

HF_ENABLE_PARALLEL_LOADING  = os.environ.get("HF_ENABLE_PARALLEL_LOADING", "").upper() in ENV_VARS_TRUE_VALUES

Then import the constant into modeling_utils.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR: #12137

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants