Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray Data checkpoint #49438

Open
Jay-ju opened this issue Dec 25, 2024 · 4 comments
Open

Ray Data checkpoint #49438

Jay-ju opened this issue Dec 25, 2024 · 4 comments
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@Jay-ju
Copy link
Contributor

Jay-ju commented Dec 25, 2024

Description

Background
The design of the checkpoint for providing ray data needs to see how the community views this issue
Design catalog

  • Ray Data jobs do not need to repeat the full amount of data when an unrecoverable serious error occurs and needs to be restarted
    • Priority support for OneToOneOpearator, such as various forms of Maps, including read, write, map, map_batches, limit, filter, etc
    • We will consider AllToAllOperator (such as repartition/shuffle) in the future, which involves splitting between batches and has higher processing logic complexity
  • Currently designing two levels of checkpoints.
    • File-level checkpoint. If different paragraphs in a single file are partially processed successfully and partially failed, they are processed in blocks. During recovery, Ray's Block data is obtained according to different paragraphs, requiring the filesystem to seek and read the file.
    • Line-level checkpoint. Record the processed line number, and when restoring, only need to match the unprocessed line number to process the content again, with more granularity
  • Personally, I prefer to implement the first type: file-level checkpoint.
    • The state storage at the row level is relatively large, which affects the write performance
    • At present, the minimum concurrency granularity of FileBasedDatasources such as read_parquet/read_csv is a single file, and when it comes to block granularity, it is mainly a paragraph (a few lines of data) of a single file.
  • Constraints
    • Does not support repartition/shuffle

File Level Checkpoint

  • File level
image
  • Fromwhere is the line of block corresponding to the source Header File, and offset is the line of offset corresponding to the source file

Row Level Checkpoint

  • Row level, record < file_name, row_index >
    • Starting from the source operator, the row number of processed data is recorded in the meta, and the data stream is passed between upstream and downstream operators, and recorded to external storage after sinking
image

Use case

No response

@Jay-ju Jay-ju added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 25, 2024
@raulchen
Copy link
Contributor

thanks for posting this. and I agree that checkpointing would be a useful feature for Ray Data users.

Regarding the file-based approach, I think the main problem is that not all data source are file-based and support seeking. Also we'll need to enable ExecutionOptions.preserve_order which could significantly degrade performance.

And regarding the row-based approach, the overheads can be significant if the dataset has millions of rows.

Also consider that both approaches are not hard to implement at the app level. I.E., manually add a filter op after the read. I would suggest doing so before we come up with a general solution.

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Dec 27, 2024

thanks for posting this. and I agree that checkpointing would be a useful feature for Ray Data users.

Regarding the file-based approach, I think the main problem is that not all data source are file-based and support seeking. Also we'll need to enable ExecutionOptions.preserve_order which could significantly degrade performance.

And regarding the row-based approach, the overheads can be significant if the dataset has millions of rows.

Also consider that both approaches are not hard to implement at the app level. I.E., manually add a filter op after the read. I would suggest doing so before we come up with a general solution.

ExecutionOptions.preserve_order seems unnecessary, right? It just records whether the current record has been processed or not. File - level ones seem to be sufficient for most scenarios.

@richardliaw richardliaw added the data Ray Data-related issues label Dec 27, 2024
@raulchen
Copy link
Contributor

ExecutionOptions.preserve_order seems unnecessary, right?

What exactly do you want to checkpoint? From "requiring the filesystem to seek and read the file", I assume you want to checkpoint something like "for this file, all rows before row X have been finished". If so, you'll need to preserve the execution order. Alternatively, you can also checkpoint individual row numbers.

That said, it's not the most critical problem. I think the main problem is how to support non-file-based data sources.

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Dec 30, 2024

Yes, like supporting SQL data source is a problem. Unstructured data is mainly files?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

3 participants