Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24123 Add DistributionAlgorithm interface #4984

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
Expand Down Expand Up @@ -1011,6 +1012,21 @@ public static <T> Optional<T> findAny(Collection<T> collection, @Nullable Predic
return Optional.empty();
}

/**
* Iterates over the given collection and applies the given closure to each element using the collection element and its index.
*
* @param collection Collection.
* @param closure Closure to apply.
* @param <T> Type of collection element.
*/
public static <T> void forEachIndexed(Collection<T> collection, BiConsumer<T, Integer> closure) {
int i = 0;

for (T t : collection) {
closure.accept(t, i++);
}
}

/**
* Retries operation until it succeeds or fails with exception that is different than the given.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partitiondistribution;

import java.util.Collection;
import java.util.List;

/**
* Partition distribution algorithm.
*/
public interface DistributionAlgorithm {

/**
* Generates an assignment by the given parameters.
*
* @param nodes List of topology nodes.
* @param currentDistribution Previous assignments or empty list.
* @param partitions Number of table partitions.
* @param replicaFactor Number partition replicas.
* @return List of nodes by partition.
*/
List<List<String>> assignPartitions(
Collection<String> nodes,
List<List<String>> currentDistribution,
int partitions,
int replicaFactor
);

/**
* Generates an assignment by the given parameters for the given partition.
*
* @param nodes List of topology nodes.
* @param currentDistribution Previous assignments or empty list.
* @param partitionId Id of the partition.
* @param replicaFactor Number partition replicas.
* @return List of nodes for partition.
*/
List<String> assignPartition(
Collection<String> nodes,
List<String> currentDistribution,
int partitionId,
int replicaFactor
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.ignite.internal.partitiondistribution;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Stateless distribution utils that produces helper methods for an assignments distribution calculation.
*/
public class PartitionDistributionUtils {

private static final DistributionAlgorithm DISTRIBUTION_ALGORITHM = new RendezvousDistributionFunction();

/**
* Calculates assignments distribution.
*
Expand All @@ -38,14 +40,16 @@ public class PartitionDistributionUtils {
* @param replicas Replicas count.
* @return List assignments by partition.
*/
public static List<Set<Assignment>> calculateAssignments(Collection<String> dataNodes, int partitions, int replicas) {
List<Set<String>> nodes = RendezvousDistributionFunction.assignPartitions(
public static List<Set<Assignment>> calculateAssignments(
Collection<String> dataNodes,
int partitions,
int replicas
) {
List<List<String>> nodes = DISTRIBUTION_ALGORITHM.assignPartitions(
dataNodes,
emptyList(),
partitions,
replicas,
false,
null,
HashSet::new
replicas
);

return nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList());
Expand All @@ -59,15 +63,16 @@ public static List<Set<Assignment>> calculateAssignments(Collection<String> data
* @param replicas Replicas count.
* @return Set of assignments.
*/
public static Set<Assignment> calculateAssignmentForPartition(Collection<String> dataNodes, int partitionId, int replicas) {
Set<String> nodes = RendezvousDistributionFunction.assignPartition(
public static Set<Assignment> calculateAssignmentForPartition(
Collection<String> dataNodes,
int partitionId,
int replicas
) {
List<String> nodes = DISTRIBUTION_ALGORITHM.assignPartition(
dataNodes,
emptyList(),
partitionId,
new ArrayList<>(dataNodes),
replicas,
null,
false,
null,
HashSet::new
replicas
);

return dataNodesToAssignments(nodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.partitiondistribution;

import static org.apache.ignite.internal.util.IgniteUtils.forEachIndexed;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -33,6 +35,7 @@
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.jetbrains.annotations.Nullable;

/**
* Partition distribution function for partitioned table based on Highest Random Weight algorithm. This function supports the following
Expand All @@ -54,7 +57,7 @@
* </li>
* </ul>
*/
public class RendezvousDistributionFunction {
public class RendezvousDistributionFunction implements DistributionAlgorithm {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(RendezvousDistributionFunction.class);

Expand All @@ -81,11 +84,11 @@ public class RendezvousDistributionFunction {
*/
public static <T extends Collection<String>> T assignPartition(
int part,
List<String> nodes,
Collection<String> nodes,
int replicas,
Map<String, Collection<String>> neighborhoodCache,
boolean exclNeighbors,
BiPredicate<String, T> nodeFilter,
@Nullable BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
if (nodes.size() <= 1) {
Expand All @@ -99,13 +102,11 @@ public static <T extends Collection<String>> T assignPartition(
IgniteBiTuple<Long, String>[] hashArr =
(IgniteBiTuple<Long, String>[]) new IgniteBiTuple[nodes.size()];

for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);

forEachIndexed(nodes, (node, i) -> {
long hash = hash(node.hashCode(), part);

hashArr[i] = new IgniteBiTuple<>(hash, node);
}
});

final int effectiveReplicas = replicas == Integer.MAX_VALUE ? nodes.size() : Math.min(replicas, nodes.size());

Expand Down Expand Up @@ -174,6 +175,11 @@ public static <T extends Collection<String>> T assignPartition(
return res;
}

@Override
public List<String> assignPartition(Collection<String> nodes, List<String> currentDistribution, int partitionId, int replicaFactor) {
return assignPartition(partitionId, nodes, replicaFactor, null, false, null, ArrayList::new);
}

/**
* Creates assignment for REPLICATED table.
*
Expand All @@ -182,7 +188,7 @@ public static <T extends Collection<String>> T assignPartition(
* @param aggregator Function that creates a collection for the partition assignments.
* @return Assignment.
*/
private static <T extends Collection<String>> T replicatedAssign(List<String> nodes,
private static <T extends Collection<String>> T replicatedAssign(Collection<String> nodes,
Iterable<String> sortedNodes, IntFunction<T> aggregator) {
String first = sortedNodes.iterator().next();

Expand Down Expand Up @@ -239,7 +245,7 @@ public static List<List<String>> assignPartitions(
int partitions,
int replicas,
boolean exclNeighbors,
BiPredicate<String, List<String>> nodeFilter
@Nullable BiPredicate<String, List<String>> nodeFilter
) {
return assignPartitions(currentTopologySnapshot, partitions, replicas, exclNeighbors, nodeFilter, ArrayList::new);
}
Expand All @@ -260,7 +266,7 @@ public static <T extends Collection<String>> List<T> assignPartitions(
int partitions,
int replicas,
boolean exclNeighbors,
BiPredicate<String, T> nodeFilter,
@Nullable BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT;
Expand All @@ -282,6 +288,16 @@ public static <T extends Collection<String>> List<T> assignPartitions(
return assignments;
}

@Override
public List<List<String>> assignPartitions(
Collection<String> nodes,
List<List<String>> currentDistribution,
int partitions,
int replicaFactor
) {
return assignPartitions(nodes, partitions, replicaFactor, false, null);
}

/**
* Builds neighborhood map for all nodes in snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ private void updateLeaseBatchInternal() {
);
}

// This condition allows to skip the meta storage invoke when there are no leases to update (renewedLeases.isEmpty()).
// However there is the case when we need to save empty leases collection: when the assignments are empty and
// leasesCurrent (those that reflect the meta storage state) is not empty. The negation of this condition gives us
// the condition to skip the update and the result is:
// !(emptyAssignments && !leasesCurrent.isEmpty()) == (!emptyAssignments || leasesCurrent.isEmpty())
boolean emptyAssignments = aggregatedStableAndPendingAssignmentsByGroups.isEmpty();
if (renewedLeases.isEmpty() && (!emptyAssignments || leasesCurrent.leaseByGroupId().isEmpty())) {
LOG.debug("No leases to update found.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -124,10 +125,14 @@ public static Tuple findTupleToBeHostedOnNode(
Tuple t = initialTuple;
int tableId = tableId(node, tableName);

int maxAttempts = 100;
Set<Integer> partitionIds = new HashSet<>();
Set<String> nodes = new HashSet<>();

int maxAttempts = 1000;

while (maxAttempts >= 0) {
int partId = partitionIdForTuple(node, tableName, t, tx);
partitionIds.add(partId);

TablePartitionId grpId = new TablePartitionId(tableId, partId);

Expand All @@ -137,20 +142,25 @@ public static Tuple findTupleToBeHostedOnNode(
if (node.id().equals(replicaMeta.getLeaseholderId())) {
return t;
}

nodes.add(replicaMeta.getLeaseholder());
} else {
Set<String> assignments = partitionAssignment(node, grpId);

if (assignments.contains(node.name())) {
return t;
}

nodes.addAll(assignments);
}

t = nextTuple.apply(t);

maxAttempts--;
}

throw new AssertionError("Failed to find a suitable tuple.");
throw new AssertionError("Failed to find a suitable tuple, tried " + maxAttempts + " times with [partitionIds="
+ partitionIds + ", nodes=" + nodes + "].");
}

/**
Expand Down