diff --git a/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java new file mode 100644 index 00000000000..ada2e8b0cad --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/BroadcastJobTarget.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Set; +import org.apache.ignite.network.ClusterNode; + +public interface BroadcastJobTarget { + /** + * Creates a broadcast job target for a specific nodes. The jobs will be executed on all specified nodes. + * + * @param nodes Nodes. + * @return Broadcast job target. + */ + static BroadcastJobTarget nodes(ClusterNode... nodes) { + return new NodesBroadcastJobTarget(Set.of(nodes)); + } + + /** + * Creates a broadcast job target for a specific nodes. The jobs will be executed on all specified nodes. + * + * @param nodes Nodes. + * @return Broadcast job target. + */ + static BroadcastJobTarget nodes(Set nodes) { + return new NodesBroadcastJobTarget(nodes); + } + + /** + * Creates a broadcast job target for a specific table. The jobs will be executed on all nodes holding the table partitions, + * one job per node. + * + * @param tableName Table name. + * @return Broadcast job target. + */ + static BroadcastJobTarget table(String tableName) { + return new TableBroadcastJobTarget(tableName); + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index cdb5da87889..f88f7492f26 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -136,13 +136,13 @@ R execute( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job execution object. */ Map> submitBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ); @@ -152,17 +152,17 @@ Map> submitBroadcast( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job result. */ default CompletableFuture> executeBroadcastAsync( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ) { - return executeBroadcastAsync(nodes, descriptor, null, arg); + return executeBroadcastAsync(target, descriptor, null, arg); } /** @@ -170,18 +170,21 @@ default CompletableFuture> executeBroadcastAsync( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param cancellationToken Cancellation token or {@code null}. * @param arg Argument of the job. * @return Map from node to job result. */ default CompletableFuture> executeBroadcastAsync( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable CancellationToken cancellationToken, @Nullable T arg ) { + // TODO: Support other types. + var nodes = ((NodesBroadcastJobTarget)target).nodes(); + Map> futures = nodes.stream() .collect(toMap(identity(), node -> executeAsync(JobTarget.node(node), descriptor, cancellationToken, arg))); @@ -203,18 +206,18 @@ default CompletableFuture> executeBroadcastAsync( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param arg Argument of the job. * @return Map from node to job result. * @throws ComputeException If there is any problem executing the job. */ default Map executeBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable T arg ) { - return executeBroadcast(nodes, descriptor, null, arg); + return executeBroadcast(target, descriptor, null, arg); } /** @@ -222,7 +225,7 @@ default Map executeBroadcast( * * @param Job argument (T)ype. * @param Job (R)esult type. - * @param nodes Nodes to execute the job on. + * @param target Broadcast job target. * @param descriptor Job descriptor. * @param cancellationToken Cancellation token or {@code null}. * @param arg Argument of the job. @@ -230,13 +233,16 @@ default Map executeBroadcast( * @throws ComputeException If there is any problem executing the job. */ default Map executeBroadcast( - Set nodes, + BroadcastJobTarget target, JobDescriptor descriptor, @Nullable CancellationToken cancellationToken, @Nullable T arg ) { Map map = new HashMap<>(); + // TODO: Support other types. + var nodes = ((NodesBroadcastJobTarget)target).nodes(); + for (ClusterNode node : nodes) { map.put(node, execute(JobTarget.node(node), descriptor, cancellationToken, arg)); } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java new file mode 100644 index 00000000000..0fa24e20750 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/NodesBroadcastJobTarget.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Set; +import org.apache.ignite.network.ClusterNode; + +public class NodesBroadcastJobTarget implements BroadcastJobTarget { + private final Set nodes; + + public NodesBroadcastJobTarget(Set nodes) { + this.nodes = nodes; + } + + public Set nodes() { + return nodes; + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java b/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java new file mode 100644 index 00000000000..5281247f3e2 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/compute/TableBroadcastJobTarget.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +public class TableBroadcastJobTarget implements BroadcastJobTarget { + private final String tableName; + + public TableBroadcastJobTarget(String tableName) { + this.tableName = tableName; + } + + public String tableName() { + return tableName; + } +} diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index 2a88dbe753f..b3833e503fc 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -55,6 +55,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.Ignite; +import org.apache.ignite.compute.BroadcastJobTarget; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; @@ -289,7 +290,7 @@ void broadcastsJobWithArgumentsAsync() { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(concatJobClass()).units(units()).build(), new Object[] {"a", 42}); @@ -308,7 +309,7 @@ void broadcastExecutesJobOnRespectiveNodes() { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(), null); assertThat(results, is(aMapWithSize(3))); @@ -326,7 +327,7 @@ void broadcastsFailingJob() throws Exception { Ignite entryNode = node(0); Map> results = compute().submitBroadcast( - Set.of(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), + BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(1)), clusterNode(node(2))), JobDescriptor.builder(failingJobClassName()).units(units()).build(), null); assertThat(results, is(aMapWithSize(3))); @@ -429,8 +430,10 @@ void cancelComputeExecuteWithCancelHandle(boolean local) { @ValueSource(booleans = {true, false}) void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) { Ignite entryNode = node(0); - Set executeNodes = - local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : Set.of(clusterNode(node(1)), clusterNode(node(2))); + BroadcastJobTarget executeNodes = + local + ? BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(2))) + : BroadcastJobTarget.nodes(clusterNode(node(1)), clusterNode(node(2))); CancelHandle cancelHandle = CancelHandle.create(); @@ -448,8 +451,10 @@ void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) { @ValueSource(booleans = {true, false}) void cancelComputeExecuteBroadcastWithCancelHandle(boolean local) { Ignite entryNode = node(0); - Set executeNodes = - local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : Set.of(clusterNode(node(1)), clusterNode(node(2))); + BroadcastJobTarget executeNodes = + local + ? BroadcastJobTarget.nodes(clusterNode(entryNode), clusterNode(node(2))) + : BroadcastJobTarget.nodes(clusterNode(node(1)), clusterNode(node(2))); CancelHandle cancelHandle = CancelHandle.create();