Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Handling backpressure on KPLMesageHandler for KPL #249

Open
siddharthjain210 opened this issue Dec 8, 2024 · 1 comment
Milestone

Comments

@siddharthjain210
Copy link
Contributor

siddharthjain210 commented Dec 8, 2024

Expected Behavior
In order to prevent the KPL Native process from getting overwhelmed in high load situations, the user of the KPL Producers library must ensure to protect against the backpressure added on the kinesis producer native process. In some cases, if backpressure is not handled, the KPL native process can hung up, swallow exceptions.
References: https://github.com/awslabs/amazon-kinesis-producer?tab=readme-ov-file#back-pressure

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
        if (this.maxRecordsInFlight != -1 && this.kinesisProducer.getOutstandingRecordsCount() > this.maxRecordsInFlight) {
            try {
                Thread.sleep(this.maxRecordInFlightsSleepDurationinMillis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
        return listenableFutureToCompletableFuture(recordResult)
                .thenApply(UserRecordResponse::new);
  }

The same capability is also required by the spring cloud stream aws kinesis binder project:
spring-cloud/spring-cloud-stream-binder-aws-kinesis#230

Current Behavior
The current implementation of Kinesis KPL message handler in the library doesn't support handling backpressure.

Context
References: https://github.com/awslabs/amazon-kinesis-producer?tab=readme-ov-file#back-pressure

The same capability is also required by the spring cloud stream aws kinesis binder project:
spring-cloud/spring-cloud-stream-binder-aws-kinesis#230

How has this issue affected you?
In one of the applications working on KCL-KPL mode, the native process has started eating up exceptions.

What are you trying to accomplish?
Ensure that

What other alternatives have you considered?

  1. For handling backpressure, the consumers must ensure to use PutRecords instead of PutRecord. PutRecords enable aggregation on the KPL.
  2. The Kinesis Producer Configuration, by defaults have the threading model as in PER_REQUEST instead of POOLED.
  3. The value for maxRecordBufferedTime can also be helpful to setup the aggeregation buffer duration.

Even if the above recommendations are there, the KPL can still end in adverse situations in very high load. Hence adding this capability would certainly help out.

Are you aware of any workarounds?
Not that I can think of. If we take spring cloud stream kinesis binder as a project, it becomes impossible for a user to configure all the above. Hence the capability is needed in the library so that it can be used.

siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 8, 2024
@siddharthjain210
Copy link
Contributor Author

Opened up a PR for the fix:
#250

@artembilan artembilan added this to the 3.0.9 milestone Dec 11, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 14, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 14, 2024
…max capacity. Implemented exponential retry pattern.
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 21, 2024
…vadocs. Added links and references to the properties. Removed commented code from checkstyle.
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 21, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 22, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 22, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 22, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 22, 2024
…o maxInFlightRecordsInitialBackoffDuration. Corrected capacity loop entry condition to use greater than zero.
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 24, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 29, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 31, 2024
siddharthjain210 added a commit to siddharthjain210/spring-integration-aws that referenced this issue Dec 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants