From 5826e876a8ceff474398023e763eca109923494f Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Tue, 24 Dec 2024 17:23:54 +0200 Subject: [PATCH 1/5] BroadcastJobTarget --- .../ignite/compute/BroadcastJobTarget.java | 31 +++++++++++++++++++ .../compute/TableBroadcastJobTarget.java | 30 ++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java create mode 100644 modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java diff --git a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java new file mode 100644 index 00000000000..3bfd5ad6fb2 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +public interface BroadcastJobTarget { + /** + * Creates a broadcast job target for a specific table. The jobs will be executed on all nodes holding the table partitions, + * one job per node. + * + * @param tableName Table name. + * @return Broadcast job target. + */ + static BroadcastJobTarget table(String tableName) { + return new TableBroadcastJobTarget(tableName); + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java new file mode 100644 index 00000000000..5281247f3e2 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +public class TableBroadcastJobTarget implements BroadcastJobTarget { + private final String tableName; + + public TableBroadcastJobTarget(String tableName) { + this.tableName = tableName; + } + + public String tableName() { + return tableName; + } +} From dbeeb7e5e3b6c090b2f6fafa484133dfa5c1b919 Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Tue, 24 Dec 2024 17:28:17 +0200 Subject: [PATCH 2/5] Add NodesTarget --- .../ignite/compute/BroadcastJobTarget.java | 7 ++++ .../apache/ignite/compute/IgniteCompute.java | 30 ++++++++++------- .../compute/NodesBroadcastJobTarget.java | 33 +++++++++++++++++++ 3 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java diff --git a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java index 3bfd5ad6fb2..3a7e6dbb2b7 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java @@ -17,7 +17,14 @@ package org.apache.ignite.compute; +import java.util.Set; +import org.apache.ignite.network.ClusterNode; + public interface BroadcastJobTarget { + static BroadcastJobTarget nodes(ClusterNode... nodes) { + return new NodesBroadcastJobTarget(Set.of(nodes)); + } + /** * Creates a broadcast job target for a specific table. The jobs will be executed on all nodes holding the table partitions, * one job per node. diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index cdb5da87889..f88f7492f26 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -136,13 +136,13 @@ R execute( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job execution object. */ Map> submitBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ); @@ -152,17 +152,17 @@ Map> submitBroadcast( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job result. */ default CompletableFuture> executeBroadcastAsync( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ) { - return executeBroadcastAsync(nodes, descriptor, null, arg); + return executeBroadcastAsync(target, descriptor, null, arg); } /** @@ -170,18 +170,21 @@ default CompletableFuture> executeBroadcastAsync( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param cancellationToken Cancellation token or {@code null}. * @param arg Argument of the job. * @return Map from node to job result. */ default CompletableFuture> executeBroadcastAsync( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable CancellationToken cancellationToken, @Nullable T arg ) { + // TODO: Support other types. + var nodes = ((NodesBroadcastJobTarget)target).nodes(); + Map> futures = nodes.stream() .collect(toMap(identity(), node -> executeAsync(JobTarget.node(node), descriptor, cancellationToken, arg))); @@ -203,18 +206,18 @@ default CompletableFuture> executeBroadcastAsync( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job result. * @throws ComputeException If there is any problem executing the job. */ default Map executeBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ) { - return executeBroadcast(nodes, descriptor, null, arg); + return executeBroadcast(target, descriptor, null, arg); } /** @@ -222,7 +225,7 @@ default Map executeBroadcast( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param cancellationToken Cancellation token or {@code null}. * @param arg Argument of the job. @@ -230,13 +233,16 @@ default Map executeBroadcast( * @throws ComputeException If there is any problem executing the job. */ default Map executeBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable CancellationToken cancellationToken, @Nullable T arg ) { Map map = new HashMap<>(); + // TODO: Support other types. + var nodes = ((NodesBroadcastJobTarget)target).nodes(); + for (ClusterNode node : nodes) { map.put(node, execute(JobTarget.node(node), descriptor, cancellationToken, arg)); } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java new file mode 100644 index 00000000000..0fa24e20750 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Set; +import org.apache.ignite.network.ClusterNode; + +public class NodesBroadcastJobTarget implements BroadcastJobTarget { + private final Set nodes; + + public NodesBroadcastJobTarget(Set nodes) { + this.nodes = nodes; + } + + public Set nodes() { + return nodes; + } +} From 6e1739715b94a0da29408cd74c451e04a3dca1fb Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Tue, 24 Dec 2024 17:28:47 +0200 Subject: [PATCH 3/5] wip --- .../java/org/apache/ignite/compute/BroadcastJobTarget.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java index 3a7e6dbb2b7..93794d86650 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java @@ -21,6 +21,12 @@ import org.apache.ignite.network.ClusterNode; public interface BroadcastJobTarget { + /** + * Creates a broadcast job target for a specific nodes. The jobs will be executed on all specified nodes. + * + * @param nodes Nodes. + * @return Broadcast job target. + */ static BroadcastJobTarget nodes(ClusterNode... nodes) { return new NodesBroadcastJobTarget(Set.of(nodes)); } From 6f5eb0eebacce0d2d042ecac4966a0c623d100a5 Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Tue, 24 Dec 2024 17:30:00 +0200 Subject: [PATCH 4/5] wip --- .../org/apache/ignite/compute/BroadcastJobTarget.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java index 93794d86650..ada2e8b0cad 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java @@ -31,6 +31,16 @@ static BroadcastJobTarget nodes(ClusterNode... nodes) { return new NodesBroadcastJobTarget(Set.of(nodes)); } + /** + * Creates a broadcast job target for a specific nodes. The jobs will be executed on all specified nodes. + * + * @param nodes Nodes. + * @return Broadcast job target. + */ + static BroadcastJobTarget nodes(Set nodes) { + return new NodesBroadcastJobTarget(nodes); + } + /** * Creates a broadcast job target for a specific table. The jobs will be executed on all nodes holding the table partitions, * one job per node. From 69296737512b27c640b3b20272846c9cfcb67215 Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Tue, 24 Dec 2024 17:33:03 +0200 Subject: [PATCH 5/5] Fix ItComputeBaseTest --- .../internal/compute/ItComputeBaseTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index 2a88dbe753f..b3833e503fc 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -55,6 +55,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.Ignite; +import org.apache.ignite.compute.BroadcastJobTarget; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; @@ -289,7 +290,7 @@ void broadcastsJobWithArgumentsAsync() { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(concatJobClass()).units(units()).build(), new Object[] {"a", 42}); @@ -308,7 +309,7 @@ void broadcastExecutesJobOnRespectiveNodes() { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(), null); assertThat(results, is(aMapWithSize(3))); @@ -326,7 +327,7 @@ void broadcastsFailingJob() throws Exception { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(failingJobClassName()).units(units()).build(), null); assertThat(results, is(aMapWithSize(3))); @@ -429,8 +430,10 @@ void cancelComputeExecuteWithCancelHandle(boolean local) { @ValueSource(booleans = {true, false}) void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) { Ignite entryNode = node(0); - Set executeNodes = - local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : Set.of(clusterNode(node(1)), clusterNode(node(2))); + BroadcastJobTarget executeNodes = + local + ? BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(2))) + : BroadcastJobTarget.nodes(clusterNode(node(1)), clusterNode(node(2))); CancelHandle cancelHandle = CancelHandle.create(); @@ -448,8 +451,10 @@ void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) { @ValueSource(booleans = {true, false}) void cancelComputeExecuteBroadcastWithCancelHandle(boolean local) { Ignite entryNode = node(0); - Set executeNodes = - local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : Set.of(clusterNode(node(1)), clusterNode(node(2))); + BroadcastJobTarget executeNodes = + local + ? BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(2))) + : BroadcastJobTarget.nodes(clusterNode(node(1)), clusterNode(node(2))); CancelHandle cancelHandle = CancelHandle.create();