From 3f1a9119be830be3416aed41146e2e504805d4fd Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Fri, 27 Dec 2024 14:59:20 +0200 Subject: [PATCH 1/2] IGNITE-24123 Add DistributionAlgorithm interface --- .../ignite/internal/util/IgniteUtils.java | 16 +++++ .../DistributionAlgorithm.java | 59 +++++++++++++++++++ .../PartitionDistributionUtils.java | 37 +++++++----- .../RendezvousDistributionFunction.java | 36 +++++++---- .../placementdriver/LeaseUpdater.java | 5 ++ .../tx/test/ItTransactionTestUtils.java | 14 ++++- 6 files changed, 139 insertions(+), 28 deletions(-) create mode 100644 modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 22bac9e011d..b259f7b588f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; @@ -1011,6 +1012,21 @@ public static Optional findAny(Collection collection, @Nullable Predic return Optional.empty(); } + /** + * Iterates over the given collection and applies the given closure to each element using the collection element and its index. + * + * @param collection Collection. + * @param closure Closure to apply. + * @param Type of collection element. + */ + public static void forEachIndexed(Collection collection, BiConsumer closure) { + int i = 0; + + for (T t : collection) { + closure.accept(t, i++); + } + } + /** * Retries operation until it succeeds or fails with exception that is different than the given. * diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java new file mode 100644 index 00000000000..b041b83b1b4 --- /dev/null +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.Collection; +import java.util.List; + +/** + * Partition distribution algorithm. + */ +public interface DistributionAlgorithm { + + /** + * Generates an assignment by the given parameters. + * + * @param nodes List of topology nodes. + * @param currentDistribution Previous assignments or empty list. + * @param partitions Number of table partitions. + * @param replicaFactor Number partition replicas. + * @return List of nodes by partition. + */ + List> assignPartitions( + Collection nodes, + List> currentDistribution, + int partitions, + int replicaFactor + ); + + /** + * Generates an assignment by the given parameters for the given partition. + * + * @param nodes List of topology nodes. + * @param currentDistribution Previous assignments or empty list. + * @param partitionId Id of the partition. + * @param replicaFactor Number partition replicas. + * @return List of nodes for partition. + */ + List assignPartition( + Collection nodes, + List currentDistribution, + int partitionId, + int replicaFactor + ); +} diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java index 694730c9823..34123863bd8 100644 --- a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java @@ -17,12 +17,11 @@ package org.apache.ignite.internal.partitiondistribution; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,6 +29,9 @@ * Stateless distribution utils that produces helper methods for an assignments distribution calculation. */ public class PartitionDistributionUtils { + + private static final DistributionAlgorithm DISTRIBUTION_ALGORITHM = new RendezvousDistributionFunction(); + /** * Calculates assignments distribution. * @@ -38,14 +40,16 @@ public class PartitionDistributionUtils { * @param replicas Replicas count. * @return List assignments by partition. */ - public static List> calculateAssignments(Collection dataNodes, int partitions, int replicas) { - List> nodes = RendezvousDistributionFunction.assignPartitions( + public static List> calculateAssignments( + Collection dataNodes, + int partitions, + int replicas + ) { + List> nodes = DISTRIBUTION_ALGORITHM.assignPartitions( dataNodes, + emptyList(), partitions, - replicas, - false, - null, - HashSet::new + replicas ); return nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList()); @@ -59,15 +63,16 @@ public static List> calculateAssignments(Collection data * @param replicas Replicas count. * @return Set of assignments. */ - public static Set calculateAssignmentForPartition(Collection dataNodes, int partitionId, int replicas) { - Set nodes = RendezvousDistributionFunction.assignPartition( + public static Set calculateAssignmentForPartition( + Collection dataNodes, + int partitionId, + int replicas + ) { + List nodes = DISTRIBUTION_ALGORITHM.assignPartition( + dataNodes, + emptyList(), partitionId, - new ArrayList<>(dataNodes), - replicas, - null, - false, - null, - HashSet::new + replicas ); return dataNodesToAssignments(nodes); diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java index 0f5e2c99c47..5469a2d7725 100644 --- a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.partitiondistribution; +import static org.apache.ignite.internal.util.IgniteUtils.forEachIndexed; + import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +35,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.jetbrains.annotations.Nullable; /** * Partition distribution function for partitioned table based on Highest Random Weight algorithm. This function supports the following @@ -54,7 +57,7 @@ * * */ -public class RendezvousDistributionFunction { +public class RendezvousDistributionFunction implements DistributionAlgorithm { /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(RendezvousDistributionFunction.class); @@ -81,11 +84,11 @@ public class RendezvousDistributionFunction { */ public static > T assignPartition( int part, - List nodes, + Collection nodes, int replicas, Map> neighborhoodCache, boolean exclNeighbors, - BiPredicate nodeFilter, + @Nullable BiPredicate nodeFilter, IntFunction aggregator ) { if (nodes.size() <= 1) { @@ -99,13 +102,11 @@ public static > T assignPartition( IgniteBiTuple[] hashArr = (IgniteBiTuple[]) new IgniteBiTuple[nodes.size()]; - for (int i = 0; i < nodes.size(); i++) { - String node = nodes.get(i); - + forEachIndexed(nodes, (node, i) -> { long hash = hash(node.hashCode(), part); hashArr[i] = new IgniteBiTuple<>(hash, node); - } + }); final int effectiveReplicas = replicas == Integer.MAX_VALUE ? nodes.size() : Math.min(replicas, nodes.size()); @@ -182,7 +183,7 @@ public static > T assignPartition( * @param aggregator Function that creates a collection for the partition assignments. * @return Assignment. */ - private static > T replicatedAssign(List nodes, + private static > T replicatedAssign(Collection nodes, Iterable sortedNodes, IntFunction aggregator) { String first = sortedNodes.iterator().next(); @@ -239,7 +240,7 @@ public static List> assignPartitions( int partitions, int replicas, boolean exclNeighbors, - BiPredicate> nodeFilter + @Nullable BiPredicate> nodeFilter ) { return assignPartitions(currentTopologySnapshot, partitions, replicas, exclNeighbors, nodeFilter, ArrayList::new); } @@ -260,7 +261,7 @@ public static > List assignPartitions( int partitions, int replicas, boolean exclNeighbors, - BiPredicate nodeFilter, + @Nullable BiPredicate nodeFilter, IntFunction aggregator ) { assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT; @@ -316,6 +317,21 @@ public static Map> neighbors(Collection topSn return neighbors; } + @Override + public List> assignPartitions( + Collection nodes, + List> currentDistribution, + int partitions, + int replicaFactor + ) { + return assignPartitions(nodes, partitions, replicaFactor, false, null); + } + + @Override + public List assignPartition(Collection nodes, List currentDistribution, int partitionId, int replicaFactor) { + return assignPartition(partitionId, nodes, replicaFactor, null, false, null, ArrayList::new); + } + /** * Hash comparator. */ diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index ac2c92e9cb9..880875470e8 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -543,6 +543,11 @@ private void updateLeaseBatchInternal() { ); } + // This condition allows to skip the meta storage invoke when there are no leases to update (renewedLeases.isEmpty()). + // However there is the case when we need to save empty leases collection: when the assignments are empty and + // leasesCurrent (those that reflect the meta storage state) is not empty. The negation of this condition gives us + // the condition to skip the update and the result is: + // !(emptyAssignments && !leasesCurrent.isEmpty()) == (!emptyAssignments || leasesCurrent.isEmpty()) boolean emptyAssignments = aggregatedStableAndPendingAssignmentsByGroups.isEmpty(); if (renewedLeases.isEmpty() && (!emptyAssignments || leasesCurrent.leaseByGroupId().isEmpty())) { LOG.debug("No leases to update found."); diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java index c05dbb03a8a..a57e9ce9b0f 100644 --- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java +++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -124,10 +125,14 @@ public static Tuple findTupleToBeHostedOnNode( Tuple t = initialTuple; int tableId = tableId(node, tableName); - int maxAttempts = 100; + Set partitionIds = new HashSet<>(); + Set nodes = new HashSet<>(); + + int maxAttempts = 1000; while (maxAttempts >= 0) { int partId = partitionIdForTuple(node, tableName, t, tx); + partitionIds.add(partId); TablePartitionId grpId = new TablePartitionId(tableId, partId); @@ -137,12 +142,16 @@ public static Tuple findTupleToBeHostedOnNode( if (node.id().equals(replicaMeta.getLeaseholderId())) { return t; } + + nodes.add(replicaMeta.getLeaseholder()); } else { Set assignments = partitionAssignment(node, grpId); if (assignments.contains(node.name())) { return t; } + + nodes.addAll(assignments); } t = nextTuple.apply(t); @@ -150,7 +159,8 @@ public static Tuple findTupleToBeHostedOnNode( maxAttempts--; } - throw new AssertionError("Failed to find a suitable tuple."); + throw new AssertionError("Failed to find a suitable tuple, tried " + maxAttempts + " times with [partitionIds=" + + partitionIds + ", nodes=" + nodes + "]."); } /** From 329083be143f061c5ede8bc3b301819fffff35b3 Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Fri, 27 Dec 2024 15:57:47 +0200 Subject: [PATCH 2/2] fixed checkstyle --- .../RendezvousDistributionFunction.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java index 5469a2d7725..a1ac234e99f 100644 --- a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java @@ -175,6 +175,11 @@ public static > T assignPartition( return res; } + @Override + public List assignPartition(Collection nodes, List currentDistribution, int partitionId, int replicaFactor) { + return assignPartition(partitionId, nodes, replicaFactor, null, false, null, ArrayList::new); + } + /** * Creates assignment for REPLICATED table. * @@ -283,6 +288,16 @@ public static > List assignPartitions( return assignments; } + @Override + public List> assignPartitions( + Collection nodes, + List> currentDistribution, + int partitions, + int replicaFactor + ) { + return assignPartitions(nodes, partitions, replicaFactor, false, null); + } + /** * Builds neighborhood map for all nodes in snapshot. * @@ -317,21 +332,6 @@ public static Map> neighbors(Collection topSn return neighbors; } - @Override - public List> assignPartitions( - Collection nodes, - List> currentDistribution, - int partitions, - int replicaFactor - ) { - return assignPartitions(nodes, partitions, replicaFactor, false, null); - } - - @Override - public List assignPartition(Collection nodes, List currentDistribution, int partitionId, int replicaFactor) { - return assignPartition(partitionId, nodes, replicaFactor, null, false, null, ArrayList::new); - } - /** * Hash comparator. */