-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: KIP-1112, clean up StatefulProcessorNode #18195
base: trunk
Are you sure you want to change the base?
KAFKA-18026: KIP-1112, clean up StatefulProcessorNode #18195
Conversation
c46f9a4
to
af0931d
Compare
@ableegoldman #17881 adds a "triage" label to PRs from non-committers. Turns out this also affect committers if their membership visibility in the ASF GitHub org is not public. I added instructions for setting your membership visibility to public https://github.com/apache/kafka/blob/trunk/.github/workflows/README.md#pr-triage |
af0931d
to
8c061b7
Compare
This comment was marked as outdated.
This comment was marked as outdated.
35c459a
to
a3238e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Made a pass. Please feel free to merge after incorporating the unit test comment.
aggFunctionName, | ||
new ProcessorParameters<>(aggregateSupplier, aggFunctionName), | ||
new String[] {storeFactory.storeName()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for line 57 above and independent from the PR: just a thought, could we pass in storeFactory.storeName()
in the future?
@@ -1225,7 +1225,8 @@ public <KOut, VOut> KStream<KOut, VOut> process( | |||
} | |||
|
|||
final String name = new NamedInternal(named).name(); | |||
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( | |||
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new | |||
ProcessorToStateConnectorNode<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is newline necessary?
); | ||
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); | ||
|
||
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier(); | ||
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode = | ||
new StatefulProcessorNode<>( | ||
final ProcessorToStateConnectorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I thought I do not need to use the newly introduced ProcessorToStateConnectorNode
for this case but only needed for those process/transform
with a store list, but thinking that again we probably do not have another way around at the moment..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same thought actually. I did take a quick look at it but ultimately decided that whether or not it was possible, it would be too much for this one PR. So I'm going to revisit this in a followup PR if it does indeed make sense to do
@@ -0,0 +1,74 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a unit test class for this new node?
Final cleanup of StatefulProcessorNode after converting all stateful operators to adding state stores via implementing the
#stores
method. This PR can't be merged until we merge these two open PRs: