diff --git a/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java b/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java index 2b785ccd764..1404822e0e3 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java @@ -21,14 +21,14 @@ * Ignite transaction options. */ public class TransactionOptions { - /** Transaction timeout. */ + /** Transaction timeout. 0 means 'use default timeout'. */ private long timeoutMillis = 0; /** Read-only transaction. */ private boolean readOnly = false; /** - * Returns transaction timeout, in milliseconds. + * Returns transaction timeout, in milliseconds. 0 means 'use default timeout'. * * @return Transaction timeout, in milliseconds. */ @@ -39,10 +39,17 @@ public long timeoutMillis() { /** * Sets transaction timeout, in milliseconds. * - * @param timeoutMillis Transaction timeout, in milliseconds. + * @param timeoutMillis Transaction timeout, in milliseconds. Cannot be negative; 0 means 'use default timeout'. + * For RO transactions, the default timeout is data availability time configured via ignite.gc.lowWatermark.dataAvailabilityTime + * configuration setting. + * For RW transactions, timeouts are not supported yet. TODO: IGNITE-15936 * @return {@code this} for chaining. */ public TransactionOptions timeoutMillis(long timeoutMillis) { + if (timeoutMillis < 0) { + throw new IllegalArgumentException("Negative timeoutMillis: " + timeoutMillis); + } + this.timeoutMillis = timeoutMillis; return this; diff --git a/modules/api/src/test/java/org/apache/ignite/tx/TransactionOptionsTest.java b/modules/api/src/test/java/org/apache/ignite/tx/TransactionOptionsTest.java new file mode 100644 index 00000000000..3f305415329 --- /dev/null +++ b/modules/api/src/test/java/org/apache/ignite/tx/TransactionOptionsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tx; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class TransactionOptionsTest { + @Test + void readOnlyIsFalseByDefault() { + assertThat(new TransactionOptions().readOnly(), is(false)); + } + + @Test + void readOnlyStatusIsSet() { + var options = new TransactionOptions(); + + options.readOnly(true); + + assertThat(options.readOnly(), is(true)); + } + + @Test + void readOnlySetterReturnsSameObject() { + var options = new TransactionOptions(); + + TransactionOptions afterSetting = options.readOnly(true); + + assertSame(options, afterSetting); + } + + @Test + void timeoutIsZeroByDefault() { + assertThat(new TransactionOptions().timeoutMillis(), is(0L)); + } + + @Test + void timeoutIsSet() { + var options = new TransactionOptions(); + + options.timeoutMillis(3333); + + assertThat(options.timeoutMillis(), is(3333L)); + } + + @Test + void timeoutSetterReturnsSameObject() { + var options = new TransactionOptions(); + + TransactionOptions afterSetting = options.timeoutMillis(3333); + + assertSame(options, afterSetting); + } + + @Test + void positiveTimeoutIsAllowed() { + assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0)); + } + + @Test + void zeroTimeoutIsAllowed() { + assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0)); + } + + @Test + void negativeTimeoutIsRejected() { + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new TransactionOptions().timeoutMillis(-1)); + + assertThat(ex.getMessage(), is("Negative timeoutMillis: -1")); + } +} diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java index 2d95e71092b..8bbd7ce20d7 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java @@ -27,7 +27,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslContext; @@ -130,7 +129,7 @@ public class ClientHandlerModule implements IgniteComponent { @TestOnly @SuppressWarnings("unused") - private volatile ChannelHandler handler; + private volatile ClientInboundMessageHandler handler; /** * Constructor. @@ -395,4 +394,9 @@ private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorV partitionOperationsExecutor ); } + + @TestOnly + public ClientInboundMessageHandler handler() { + return handler; + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index a86724d96c2..c0d712942c4 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -140,6 +140,7 @@ import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException; import org.apache.ignite.sql.SqlBatchException; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Handles messages from thin clients. @@ -1091,4 +1092,9 @@ private static Set authenticationEventsToSubscribe() { AuthenticationEvent.USER_REMOVED ); } + + @TestOnly + public ClientResourceRegistry resources() { + return resources; + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java index 09eb755172a..94b1e881de7 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java @@ -49,13 +49,16 @@ public class ClientTransactionBeginRequest { IgniteTransactionsImpl transactions, ClientResourceRegistry resources, ClientHandlerMetricSource metrics) throws IgniteInternalCheckedException { - TransactionOptions options = null; + TransactionOptions options = new TransactionOptions(); HybridTimestamp observableTs = null; boolean readOnly = in.unpackBoolean(); - if (readOnly) { - options = new TransactionOptions().readOnly(true); + options.readOnly(readOnly); + + long timeoutMillis = in.unpackLong(); + options.timeoutMillis(timeoutMillis); + if (readOnly) { // Timestamp makes sense only for read-only transactions. observableTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong()); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java index 133ec00f7b2..57530d6a365 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java @@ -165,7 +165,10 @@ private synchronized CompletableFuture ensureStarted( return tx0; } - ClientTransaction startedTx() { + /** + * Returns actual {@link ClientTransaction} started by this transaction or throws an exception if no transaction was started yet. + */ + public ClientTransaction startedTx() { var tx0 = tx; assert tx0 != null : "Transaction is not started"; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java index 325ed680a7c..ece4c13b584 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java @@ -62,9 +62,9 @@ static CompletableFuture beginAsync( @Nullable String preferredNodeName, @Nullable TransactionOptions options, long observableTimestamp) { - if (options != null && options.timeoutMillis() != 0) { + if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) { // TODO: IGNITE-16193 - throw new UnsupportedOperationException("Timeouts are not supported yet"); + throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions"); } boolean readOnly = options != null && options.readOnly(); @@ -73,6 +73,7 @@ static CompletableFuture beginAsync( ClientOp.TX_BEGIN, w -> { w.out().packBoolean(readOnly); + w.out().packLong(options == null ? 0 : options.timeoutMillis()); w.out().packLong(observableTimestamp); }, r -> readTx(r, readOnly), diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 4d8c00d9cc7..84e1b6bccb2 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -31,9 +31,9 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.TxPriority; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.network.ClusterNode; @@ -63,17 +63,16 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { } @Override - public InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit) { - return begin(tracker, implicit, false); + public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) { + return begin(timestampTracker, true, readOnly, InternalTxOptions.defaults()); } @Override - public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit, boolean readOnly) { - return begin(timestampTracker, implicit, readOnly, TxPriority.NORMAL); + public InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker, boolean readOnly, InternalTxOptions txOptions) { + return begin(timestampTracker, false, readOnly, txOptions); } - @Override - public InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit, boolean readOnly, TxPriority priority) { + private InternalTransaction begin(HybridTimestampTracker tracker, boolean implicit, boolean readOnly, InternalTxOptions options) { return new InternalTransaction() { private final UUID id = UUID.randomUUID(); diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/CommonTestScheduler.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/CommonTestScheduler.java new file mode 100644 index 00000000000..a65067cc1f3 --- /dev/null +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/CommonTestScheduler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.testframework; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.thread.NamedThreadFactory; + +/** + * Provides access to a single-thread scheduler for light-weight non-blocking operations. It doesn't need to be stopped. + */ +public class CommonTestScheduler { + private static final IgniteLogger LOG = Loggers.forClass(CommonTestScheduler.class); + + private static final ScheduledExecutorService INSTANCE = Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("test-common-scheduler-", true, LOG) + ); + + /** + * Returns a single-thread scheduler for light-weight non-blocking operations. It doesn't need to be shut down. + */ + public static ScheduledExecutorService instance() { + return INSTANCE; + } +} diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index f4e561ccfdf..f49c325a1df 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -169,6 +169,7 @@ import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration; import org.apache.ignite.internal.schema.configuration.GcExtensionConfigurationSchema; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfigurationSchema; @@ -249,6 +250,9 @@ public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest { @InjectConfiguration private static TransactionConfiguration txConfiguration; + @InjectConfiguration + private static LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration private static RaftConfiguration raftConfiguration; @@ -1226,6 +1230,7 @@ public CompletableFuture invoke( txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, lockManager, @@ -1236,7 +1241,8 @@ public CompletableFuture invoke( new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + threadPoolsManager.commonScheduler() ); replicaManager = new ReplicaManager( diff --git a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h index cbaedbb0597..7c00c610a5c 100644 --- a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h +++ b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h @@ -57,6 +57,7 @@ class transactions_impl { IGNITE_API void begin_async(ignite_callback callback) { auto writer_func = [this](protocol::writer &writer) { writer.write_bool(false); // readOnly. + writer.write(std::int64_t(0)); // timeoutMillis. writer.write(m_connection->get_observable_timestamp()); }; diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp index d043b3b536a..ea707d48799 100644 --- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp +++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp @@ -433,6 +433,7 @@ void sql_connection::transaction_start() { network::data_buffer_owning response = sync_request(protocol::client_operation::TX_BEGIN, [&](protocol::writer &writer) { writer.write_bool(false); // read_only. + writer.write(std::int64_t(0)); // timeoutMillis. }); protocol::reader reader(response.get_bytes_view()); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs index c0bdd678ac2..550e178434a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs @@ -309,6 +309,7 @@ protected override void Handle(Socket handler, CancellationToken cancellationTok case ClientOp.TxBegin: reader.Skip(); // Read only. + reader.Skip(); // TimeoutMillis. LastClientObservableTimestamp = reader.ReadInt64(); Send(handler, requestId, new byte[] { 0 }.AsMemory()); diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs index c2976cf5fa5..933ad33b336 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs @@ -197,6 +197,7 @@ void Write() { var w = writer.MessageWriter; w.Write(_options.ReadOnly); + w.Write(_options.TimeoutMillis); w.Write(failoverSocket.ObservableTimestamp); } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Transactions/TransactionOptions.cs b/modules/platforms/dotnet/Apache.Ignite/Transactions/TransactionOptions.cs index acc005bdccc..8a94718c2a4 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Transactions/TransactionOptions.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Transactions/TransactionOptions.cs @@ -25,4 +25,10 @@ namespace Apache.Ignite.Transactions; /// Read-only transactions provide a snapshot view of data at a certain point in time. /// They are lock-free and perform better than normal transactions, but do not permit data modifications. /// -public readonly record struct TransactionOptions(bool ReadOnly); +/// +/// Transaction timeout. 0 means 'use default timeout'. +/// For RO transactions, the default timeout is data availability time configured via ignite.gc.lowWatermark.dataAvailabilityTime +/// configuration setting. +/// For RW transactions, timeouts are not supported yet. TODO: IGNITE-15936. +/// +public readonly record struct TransactionOptions(bool ReadOnly, long TimeoutMillis = 0); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index a89952b5eb8..6159786fb03 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -186,6 +186,7 @@ import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.sql.api.IgniteSqlImpl; import org.apache.ignite.internal.sql.configuration.distributed.SqlClusterExtensionConfiguration; @@ -292,6 +293,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { @InjectConfiguration private static TransactionConfiguration txConfiguration; + @InjectConfiguration + private static LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration private static StorageUpdateConfiguration storageUpdateConfiguration; @@ -616,6 +620,7 @@ public CompletableFuture invoke(Condition condition, List su var txManager = new TxManagerImpl( name, txConfiguration, + lowWatermarkConfiguration, messagingServiceReturningToStorageOperationsPool, clusterSvc.topologyService(), replicaService, @@ -628,7 +633,8 @@ public CompletableFuture invoke(Condition condition, List su threadPoolsManager.partitionOperationsExecutor(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + threadPoolsManager.commonScheduler() ); ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9f0dda60d6a..04134aada27 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -59,6 +59,7 @@ import org.apache.ignite.catalog.IgniteCatalog; import org.apache.ignite.client.handler.ClientHandlerMetricSource; import org.apache.ignite.client.handler.ClientHandlerModule; +import org.apache.ignite.client.handler.ClientInboundMessageHandler; import org.apache.ignite.client.handler.ClusterInfo; import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration; import org.apache.ignite.client.handler.configuration.ClientConnectorExtensionConfiguration; @@ -353,7 +354,7 @@ public class IgniteImpl implements Ignite { private final ReplicaManager replicaMgr; /** Transactions manager. */ - private final TxManager txManager; + private final TxManagerImpl txManager; /** Distributed table manager. */ private final TableManager distributedTblMgr; @@ -959,6 +960,7 @@ public class IgniteImpl implements Ignite { txManager = new TxManagerImpl( name, txConfig, + gcConfig.lowWatermark(), messagingServiceReturningToStorageOperationsPool, clusterSvc.topologyService(), replicaSvc, @@ -971,10 +973,11 @@ public class IgniteImpl implements Ignite { threadPoolsManager.partitionOperationsExecutor(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + threadPoolsManager.commonScheduler() ); - systemViewManager.register((TxManagerImpl) txManager); + systemViewManager.register(txManager); resourceVacuumManager = new ResourceVacuumManager( name, @@ -1849,17 +1852,17 @@ public LogStorageFactory partitionsLogStorageFactory() { return partitionsLogStorageFactory; } - @TestOnly - public LogStorageFactory volatileLogStorageFactory() { - return volatileLogStorageFactoryCreator.factory(raftMgr.volatileRaft().logStorageBudget().value()); - } - /** Returns the node's transaction manager. */ @TestOnly public TxManager txManager() { return txManager; } + @TestOnly + public ClientInboundMessageHandler clientInboundMessageHandler() { + return clientHandlerModule.handler(); + } + /** Returns the node's placement driver service. */ @TestOnly public PlacementDriver placementDriver() { diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java index 0aa757cb7bb..956990ccedd 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java @@ -348,7 +348,8 @@ private static void startNodes() { .clusterName("cluster") .clusterConfiguration("ignite {" + "metaStorage.idleSyncTimeInterval: " + METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS + ",\n" - + "gc.lowWatermark.dataAvailabilityTime: 1010,\n" + // TODO: Set dataAvailabilityTime to 1010 after IGNITE-24002 is fixed. + + "gc.lowWatermark.dataAvailabilityTime: 30010,\n" + "gc.lowWatermark.updateInterval: 3000,\n" + "metrics.exporters.logPush.exporterName: logPush,\n" + "metrics.exporters.logPush.period: 5000\n" diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ScriptContext.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ScriptContext.java index a8815808c68..641a83b4397 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ScriptContext.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ScriptContext.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.sql.sqllogic.SqlScriptRunner.RunnerRuntime; import org.apache.ignite.sql.IgniteSql; @@ -78,7 +79,9 @@ final class ScriptContext { List> executeQuery(String sql) { sql = replaceVars(sql); - log.info("Execute: " + sql); + log.info("Execute: {}", sql); + + long startNanos = System.nanoTime(); try (ResultSet rs = ignSql.execute(null, sql)) { if (rs.hasRowSet()) { @@ -100,6 +103,9 @@ List> executeQuery(String sql) { } else { return Collections.singletonList(Collections.singletonList(rs.wasApplied())); } + } finally { + long tookNanos = System.nanoTime() - startNanos; + log.info("Execution took {} ms", TimeUnit.NANOSECONDS.toMillis(tookNanos)); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java index 5cd76545313..b46b734f174 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.exec.TransactionTracker; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -63,7 +64,7 @@ public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) { if (tx == null) { // TODO: IGNITE-23604 SQL implicit transaction support. Coordinate the transaction implicit flag with the SQL one. - transaction = txManager.begin(observableTimeTracker, false, readOnly); + transaction = txManager.beginExplicit(observableTimeTracker, readOnly, InternalTxOptions.defaults()); result = new QueryTransactionWrapperImpl(transaction, true, txTracker); } else { transaction = tx.unwrap(); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java index 8daa449b9b9..aa11e3bac4c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java @@ -131,7 +131,7 @@ public void throwsExceptionForNestedScriptTransaction() { ); IgniteSqlStartTransaction txStartStmt = mock(IgniteSqlStartTransaction.class); - when(txManager.begin(any(), anyBoolean(), anyBoolean())).thenAnswer(inv -> { + when(txManager.beginExplicit(any(), anyBoolean(), any())).thenAnswer(inv -> { boolean implicit = inv.getArgument(1, Boolean.class); return NoOpTransaction.readWrite("test", implicit); @@ -222,12 +222,11 @@ public void testScriptTransactionWrapperTxInflightsInteraction() { } private void prepareTransactionsMocks() { - when(txManager.begin(any(), anyBoolean(), anyBoolean())).thenAnswer( + when(txManager.beginExplicit(any(), anyBoolean(), any())).thenAnswer( inv -> { - boolean implicit = inv.getArgument(1, Boolean.class); - boolean readOnly = inv.getArgument(2, Boolean.class); + boolean readOnly = inv.getArgument(1, Boolean.class); - return readOnly ? NoOpTransaction.readOnly("test-ro", implicit) : NoOpTransaction.readWrite("test-rw", implicit); + return readOnly ? NoOpTransaction.readOnly("test-ro", false) : NoOpTransaction.readWrite("test-rw", false); } ); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index 4af27e38f32..4d10bf01628 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.BinaryTuplePrefix; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionProvider; import org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken; @@ -75,6 +77,8 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -97,12 +101,19 @@ * Tests execution flow of TableScanNode. */ @ExtendWith(ConfigurationExtension.class) +@ExtendWith(ExecutorServiceExtension.class) public class TableScanNodeExecutionTest extends AbstractExecutionTest { private final LinkedList closeables = new LinkedList<>(); @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private LowWatermarkConfiguration lowWatermarkConfiguration; + + @InjectExecutorService + private ScheduledExecutorService commonExecutor; + // Ensures that all data from TableScanNode is being propagated correctly. @Test public void testScanNodeDataPropagation() throws InterruptedException { @@ -161,6 +172,7 @@ public void testScanNodeDataPropagation() throws InterruptedException { TxManagerImpl txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, new HeapLockManager(1024, 1024), @@ -171,7 +183,8 @@ public void testScanNodeDataPropagation() throws InterruptedException { new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - new TestLowWatermark() + new TestLowWatermark(), + commonExecutor ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java index 3e2d1498bb1..04bd750e454 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; @@ -48,7 +49,7 @@ protected Publisher scan(int part, @Nullable InternalTransaction tx) @Override protected InternalTransaction startTx() { - return internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER, false, true); + return internalTbl.txManager().beginExplicit(HYBRID_TIMESTAMP_TRACKER, true, InternalTxOptions.defaults()); } @Override diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java index 09838e77231..442d9a6b4b6 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -71,7 +72,7 @@ public void testInvalidPartitionParameterScan() { @Override protected InternalTransaction startTx() { - InternalTransaction tx = internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER, false); + InternalTransaction tx = internalTbl.txManager().beginExplicitRw(HYBRID_TIMESTAMP_TRACKER, InternalTxOptions.defaults()); TablePartitionId tblPartId = new TablePartitionId(internalTbl.tableId(), ((TablePartitionId) internalTbl.groupId()).partitionId()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index 09872484fea..119768b9af1 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; @@ -37,9 +38,12 @@ import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; @@ -65,6 +69,7 @@ * Test lock table. */ @ExtendWith(ConfigurationExtension.class) +@ExtendWith(ExecutorServiceExtension.class) public class ItLockTableTest extends IgniteAbstractTest { private static final IgniteLogger LOG = Loggers.forClass(ItLockTableTest.class); @@ -95,12 +100,18 @@ public class ItLockTableTest extends IgniteAbstractTest { @InjectConfiguration("mock: { deadlockPreventionPolicy: { waitTimeout: -1, txIdComparator: NONE } }") protected static TransactionConfiguration txConfiguration; + @InjectConfiguration + protected LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration protected static ReplicationConfiguration replicationConfiguration; @InjectConfiguration protected static StorageUpdateConfiguration storageUpdateConfiguration; + @InjectExecutorService + protected ScheduledExecutorService commonExecutor; + private ItTxTestCluster txTestCluster; private HybridTimestampTracker timestampTracker = new HybridTimestampTracker(); @@ -120,6 +131,7 @@ public void before() throws Exception { testInfo, raftConfiguration, txConfiguration, + lowWatermarkConfiguration, storageUpdateConfiguration, workDir, 1, @@ -142,6 +154,7 @@ protected TxManagerImpl newTxManager( ) { return new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, new HeapLockManager( @@ -154,7 +167,8 @@ protected TxManagerImpl newTxManager( new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + commonExecutor ); } }; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index a1fa4e0e1dc..a981b630f50 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -54,6 +54,7 @@ public void before() throws Exception { testInfo, raftConfiguration, txConfiguration, + lowWatermarkConfiguration, storageUpdateConfiguration, workDir, nodes(), diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index 0266b7fde46..ed14bab72af 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -34,7 +34,6 @@ import java.util.concurrent.Executor; import java.util.function.Supplier; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lowwatermark.LowWatermark; @@ -59,7 +58,6 @@ import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; @@ -84,9 +82,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes /** A list of background cleanup futures. */ private final List> cleanupFutures = new CopyOnWriteArrayList<>(); - @InjectConfiguration - private TransactionConfiguration txConfiguration; - /** * The constructor. * @@ -103,6 +98,7 @@ public void before() throws Exception { testInfo, raftConfiguration, txConfiguration, + lowWatermarkConfiguration, storageUpdateConfiguration, workDir, nodes(), @@ -125,6 +121,7 @@ protected TxManagerImpl newTxManager( ) { return new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, new HeapLockManager(), @@ -135,7 +132,8 @@ protected TxManagerImpl newTxManager( new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + commonExecutor ) { @Override public CompletableFuture executeWriteIntentSwitchAsync(Runnable runnable) { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java index d2ee31fd16b..264ee02ef38 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -69,6 +70,9 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest { @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + protected LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration private StorageUpdateConfiguration storageUpdateConfiguration; @@ -102,6 +106,7 @@ public void before() throws Exception { testInfo, raftConfig, txConfiguration, + lowWatermarkConfiguration, storageUpdateConfiguration, workDir, NODES, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 8dc51db81db..676220f2d7b 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -191,6 +191,7 @@ import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration; import org.apache.ignite.internal.schema.configuration.GcExtensionConfigurationSchema; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfigurationSchema; @@ -286,6 +287,9 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration private RaftConfiguration raftConfiguration; @@ -318,7 +322,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { private Path workDir; @InjectExecutorService - private static ScheduledExecutorService commonScheduledExecutorService; + private ScheduledExecutorService commonScheduledExecutorService; private StaticNodeFinder finder; @@ -1361,6 +1365,7 @@ private class Node { txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, lockManager, @@ -1371,7 +1376,8 @@ private class Node { new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + commonScheduledExecutorService ); rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index faebcdf50f7..ce723f7f3ba 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -49,6 +49,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -90,6 +91,7 @@ import org.apache.ignite.internal.schema.NullBinaryRow; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.sql.engine.util.SqlTestUtils; @@ -99,6 +101,8 @@ import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -132,6 +136,7 @@ * Tests for data colocation. */ @ExtendWith(ConfigurationExtension.class) +@ExtendWith(ExecutorServiceExtension.class) public class ItColocationTest extends BaseIgniteAbstractTest { /** Partitions count. */ private static final int PARTS = 32; @@ -159,6 +164,12 @@ public class ItColocationTest extends BaseIgniteAbstractTest { @InjectConfiguration private static TransactionConfiguration txConfiguration; + @InjectConfiguration + private static LowWatermarkConfiguration lowWatermarkConfiguration; + + @InjectExecutorService + private static ScheduledExecutorService commonExecutor; + private SchemaDescriptor schema; private SchemaRegistry schemaRegistry; @@ -188,6 +199,7 @@ static void beforeAllTests() { txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaService, new HeapLockManager(), @@ -198,7 +210,8 @@ static void beforeAllTests() { new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - new TestLowWatermark() + new TestLowWatermark(), + commonExecutor ) { @Override public CompletableFuture finish( diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 60c68b3577e..8059a78cbf1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -500,11 +500,11 @@ private CompletableFuture enlistInTx( } private InternalTransaction startImplicitRwTxIfNeeded(@Nullable InternalTransaction tx) { - return tx == null ? txManager.begin(observableTimestampTracker, true) : tx; + return tx == null ? txManager.beginImplicitRw(observableTimestampTracker) : tx; } private InternalTransaction startImplicitRoTxIfNeeded(@Nullable InternalTransaction tx) { - return tx == null ? txManager.begin(observableTimestampTracker, true, true) : tx; + return tx == null ? txManager.beginImplicit(observableTimestampTracker, true) : tx; } /** @@ -1156,7 +1156,7 @@ private CompletableFuture updateAllWithRetry( int partition, @Nullable Long txStartTs ) { - InternalTransaction tx = txManager.begin(observableTimestampTracker, true); + InternalTransaction tx = txManager.beginImplicitRw(observableTimestampTracker); TablePartitionId partGroupId = new TablePartitionId(tableId, partition); assert rows.stream().allMatch(row -> partitionId(row) == partition) : "Invalid batch for partition " + partition; diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 2a0638675c2..55f4f5239d2 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -123,6 +123,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; @@ -196,6 +197,8 @@ public class ItTxTestCluster { private final TransactionConfiguration txConfiguration; + private final LowWatermarkConfiguration lowWatermarkConfiguration; + private final StorageUpdateConfiguration storageUpdateConfiguration; private final Path workDir; @@ -318,6 +321,7 @@ public ItTxTestCluster( TestInfo testInfo, RaftConfiguration raftConfig, TransactionConfiguration txConfiguration, + LowWatermarkConfiguration lowWatermarkConfiguration, StorageUpdateConfiguration storageUpdateConfiguration, Path workDir, int nodes, @@ -328,6 +332,7 @@ public ItTxTestCluster( ) { this.raftConfig = raftConfig; this.txConfiguration = txConfiguration; + this.lowWatermarkConfiguration = lowWatermarkConfiguration; this.storageUpdateConfiguration = storageUpdateConfiguration; this.workDir = workDir; this.nodes = nodes; @@ -570,6 +575,7 @@ protected TxManagerImpl newTxManager( return new TxManagerImpl( node.name(), txConfiguration, + lowWatermarkConfiguration, clusterService.messagingService(), clusterService.topologyService(), replicaSvc, @@ -582,7 +588,8 @@ protected TxManagerImpl newTxManager( partitionOperationsExecutor, resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + executor ); } @@ -1060,6 +1067,7 @@ private void initializeClientTxComponents() { clientTxManager = new TxManagerImpl( "client", txConfiguration, + lowWatermarkConfiguration, client.messagingService(), client.topologyService(), clientReplicaSvc, @@ -1072,7 +1080,8 @@ private void initializeClientTxComponents() { partitionOperationsExecutor, resourceRegistry, clientTransactionInflights, - lowWatermark + lowWatermark, + executor ); clientResourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 99a58b1f8a7..179f671ab9a 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.LockManager; @@ -1729,7 +1730,7 @@ public void run() { } while (!stop.get() && firstErr.get() == null) { - InternalTransaction tx = clientTxManager().begin(timestampTracker, false, false); + InternalTransaction tx = clientTxManager().beginExplicitRw(timestampTracker, InternalTxOptions.defaults()); var table = accounts.recordView(); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java index 5d7fafddb23..a0254abaf4d 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import org.apache.ignite.distributed.ItTxTestCluster; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; @@ -46,11 +47,14 @@ import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -72,7 +76,7 @@ /** * Setup infrastructure for tx related test scenarios. */ -@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class, ExecutorServiceExtension.class}) @MockitoSettings(strictness = Strictness.LENIENT) public abstract class TxInfrastructureTest extends IgniteAbstractTest { protected static final double BALANCE_1 = 500; @@ -114,12 +118,18 @@ public abstract class TxInfrastructureTest extends IgniteAbstractTest { @InjectConfiguration protected TransactionConfiguration txConfiguration; + @InjectConfiguration + protected LowWatermarkConfiguration lowWatermarkConfiguration; + @InjectConfiguration protected StorageUpdateConfiguration storageUpdateConfiguration; @InjectConfiguration protected ReplicationConfiguration replicationConfiguration; + @InjectExecutorService + protected ScheduledExecutorService commonExecutor; + protected final TestInfo testInfo; protected ItTxTestCluster txTestCluster; @@ -167,6 +177,7 @@ public void before() throws Exception { testInfo, raftConfiguration, txConfiguration, + lowWatermarkConfiguration, storageUpdateConfiguration, workDir, nodes(), diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 7da5aa1fbc3..f313759d6ce 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.internal.TestHybridClock; import org.apache.ignite.internal.catalog.CatalogService; @@ -84,6 +85,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; @@ -106,6 +108,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.internal.testframework.CommonTestScheduler; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; @@ -551,8 +554,14 @@ public static TxManagerImpl txManager( TransactionInflights transactionInflights = new TransactionInflights(placementDriver, CLOCK_SERVICE); + LowWatermarkConfiguration lowWatermarkConfiguration = mock(LowWatermarkConfiguration.class); + ConfigurationValue dataAvailabilityTime = mock(ConfigurationValue.class); + lenient().when(dataAvailabilityTime.value()).thenReturn(15L * 60 * 1000); + lenient().when(lowWatermarkConfiguration.dataAvailabilityTime()).thenReturn(dataAvailabilityTime); + var txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaSvc, new HeapLockManager(1024, 1024), @@ -563,7 +572,8 @@ public static TxManagerImpl txManager( new TestLocalRwTxCounter(), resourcesRegistry, transactionInflights, - new TestLowWatermark() + new TestLowWatermark(), + CommonTestScheduler.instance() ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/transactions/build.gradle b/modules/transactions/build.gradle index a3b38079c4d..cc9c6aafca2 100644 --- a/modules/transactions/build.gradle +++ b/modules/transactions/build.gradle @@ -56,10 +56,13 @@ dependencies { testImplementation libs.hamcrest.core integrationTestImplementation project(':ignite-api') + integrationTestImplementation project(':ignite-client') + integrationTestImplementation project(':ignite-client-handler') integrationTestImplementation(testFixtures(project(':ignite-core'))) integrationTestImplementation(testFixtures(project(':ignite-transactions'))) integrationTestImplementation(testFixtures(project(':ignite-sql-engine'))) integrationTestImplementation(testFixtures(project(':ignite-runner'))) + integrationTestImplementation libs.netty.transport testFixturesImplementation project(':ignite-configuration') testFixturesImplementation project(':ignite-core') diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java new file mode 100644 index 00000000000..b7ca1c75964 --- /dev/null +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItClientReadOnlyTxTimeoutOneNodeTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.readonly; + +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; + +import org.apache.ignite.Ignite; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.client.handler.ClientResourceRegistry; +import org.apache.ignite.internal.client.tx.ClientLazyTransaction; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +class ItClientReadOnlyTxTimeoutOneNodeTest extends ItReadOnlyTxTimeoutOneNodeTest { + private IgniteClient client; + + @BeforeEach + void startClient() { + client = IgniteClient.builder() + .addresses("localhost:" + unwrapIgniteImpl(cluster.aliveNode()).clientAddress().port()) + .build(); + } + + @AfterEach + void closeClient() { + if (client != null) { + client.close(); + } + } + + @Override + Ignite ignite() { + return client; + } + + @Override + ReadOnlyTransactionImpl transactionImpl(Transaction tx) { + long txId = ClientLazyTransaction.get(tx).startedTx().id(); + + ClientResourceRegistry resources = unwrapIgniteImpl(cluster.aliveNode()).clientInboundMessageHandler().resources(); + try { + return resources.get(txId).get(ReadOnlyTransactionImpl.class); + } catch (IgniteInternalCheckedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java new file mode 100644 index 00000000000..9d174defc88 --- /dev/null +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItEmbeddedReadOnlyTxTimeoutOneNodeTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.readonly; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; +import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.tx.Transaction; + +class ItEmbeddedReadOnlyTxTimeoutOneNodeTest extends ItReadOnlyTxTimeoutOneNodeTest { + @Override + Ignite ignite() { + return cluster.aliveNode(); + } + + @Override + ReadOnlyTransactionImpl transactionImpl(Transaction tx) { + return Wrappers.unwrap(tx, ReadOnlyTransactionImpl.class); + } +} diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java new file mode 100644 index 00000000000..8d23df30721 --- /dev/null +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.readonly; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl; +import org.apache.ignite.table.Table; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +abstract class ItReadOnlyTxTimeoutOneNodeTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "TEST"; + + @Override + protected int initialNodes() { + return 1; + } + + abstract Ignite ignite(); + + abstract ReadOnlyTransactionImpl transactionImpl(Transaction tx); + + @Test + void roTransactionTimesOut() throws Exception { + Ignite ignite = ignite(); + + ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR)"); + + Table table = ignite.tables().table(TABLE_NAME); + + Transaction roTx = ignite.transactions().begin(new TransactionOptions().readOnly(true).timeoutMillis(100)); + + // Make sure the RO tx actually begins on the server (as thin client transactions are lazy). + doGetOn(table, roTx); + + assertTrue( + waitForCondition(() -> transactionImpl(roTx).finishInitiated(), SECONDS.toMillis(10)), + "Transaction should have been finished due to timeout" + ); + + // TODO: Uncomment the following lines after https://issues.apache.org/jira/browse/IGNITE-23980 is fixed. + // assertThrows(TransactionException.class, () -> doGetOn(table, roTx)); + // assertThrows(TransactionException.class, roTx::commit); + } + + private static void doGetOn(Table table, Transaction tx) { + table.keyValueView(Integer.class, String.class).get(tx, 1); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java new file mode 100644 index 00000000000..36d1ba62068 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +/** + * Transaction options for internal use. + */ +public class InternalTxOptions { + private static final InternalTxOptions DEFAULT_OPTIONS = builder().build(); + + /** + * Transaction priority. The priority is used to resolve conflicts between transactions. The higher priority is + * the more likely the transaction will win the conflict. + */ + private final TxPriority priority; + + /** Transaction timeout. 0 means 'use default timeout'. */ + private final long timeoutMillis; + + private InternalTxOptions(TxPriority priority, long timeoutMillis) { + this.priority = priority; + this.timeoutMillis = timeoutMillis; + } + + public static Builder builder() { + return new Builder(); + } + + public static InternalTxOptions defaults() { + return DEFAULT_OPTIONS; + } + + public static InternalTxOptions defaultsWithPriority(TxPriority priority) { + return builder().priority(priority).build(); + } + + public TxPriority priority() { + return priority; + } + + public long timeoutMillis() { + return timeoutMillis; + } + + /** Builder for InternalTxOptions. */ + public static class Builder { + private TxPriority priority = TxPriority.NORMAL; + private long timeoutMillis = 0; + + public Builder priority(TxPriority priority) { + this.priority = priority; + return this; + } + + public Builder timeoutMillis(long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + return this; + } + + public InternalTxOptions build() { + return new InternalTxOptions(priority, timeoutMillis); + } + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index c9b0ead47ae..54250911162 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -24,10 +24,8 @@ import java.util.function.Function; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; -import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -37,45 +35,51 @@ */ public interface TxManager extends IgniteComponent { /** - * Starts a read-write transaction coordinated by a local node. + * Starts an implicit read-write transaction coordinated by a local node. * * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. - * @param implicit Whether the transaction is implicit or not. * @return The transaction. */ - InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit); + default InternalTransaction beginImplicitRw(HybridTimestampTracker timestampTracker) { + return beginImplicit(timestampTracker, false); + } /** - * Starts either read-write or read-only transaction, depending on {@code readOnly} parameter value. The transaction has - * {@link TxPriority#NORMAL} priority. + * Starts an implicit read-write transaction coordinated by a local node. * * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only - * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. Each client - * should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client. - * @param implicit Whether the transaction is implicit or not. + * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. * @param readOnly {@code true} in order to start a read-only transaction, {@code false} in order to start read-write one. * Calling begin with readOnly {@code false} is an equivalent of TxManager#begin(). - * @return The started transaction. - * @throws IgniteInternalException with {@link Transactions#TX_READ_ONLY_TOO_OLD_ERR} if transaction much older than the data - * available in the tables. + * @return The transaction. + */ + InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly); + + /** + * Starts an explicit read-write transaction coordinated by a local node. + * + * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only + * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. + * @param options Transaction options. + * @return The transaction. */ - InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit, boolean readOnly); + default InternalTransaction beginExplicitRw(HybridTimestampTracker timestampTracker, InternalTxOptions options) { + return beginExplicit(timestampTracker, false, options); + } /** - * Starts either read-write or read-only transaction, depending on {@code readOnly} parameter value. + * Starts either read-write or read-only explicit transaction, depending on {@code readOnly} parameter value. * * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. Each client * should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client. - * @param implicit Whether the transaction is implicit or not. * @param readOnly {@code true} in order to start a read-only transaction, {@code false} in order to start read-write one. * Calling begin with readOnly {@code false} is an equivalent of TxManager#begin(). - * @param priority Transaction priority. The priority is used to resolve conflicts between transactions. The higher priority is - * the more likely the transaction will win the conflict. + * @param txOptions Options. * @return The started transaction. */ - InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean readOnly, boolean implicit, TxPriority priority); + InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker, boolean readOnly, InternalTxOptions txOptions); /** * Returns a transaction state meta. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java index 9ebad20816b..c4c370b80d1 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxPriority; import org.apache.ignite.tx.IgniteTransactions; @@ -89,12 +90,18 @@ public InternalTransaction begin(@Nullable TransactionOptions options, @Nullable /** {@inheritDoc} */ @Override public Transaction begin(@Nullable TransactionOptions options) { - if (options != null && options.timeoutMillis() != 0) { + if (options != null && options.timeoutMillis() != 0 && !options.readOnly()) { // TODO: IGNITE-15936. - throw new UnsupportedOperationException("Timeouts are not supported yet"); + throw new UnsupportedOperationException("Timeouts are not supported yet for RW transactions."); } - return txManager.begin(observableTimestampTracker, false, options != null && options.readOnly()); + InternalTxOptions internalTxOptions = options == null + ? InternalTxOptions.defaults() + : InternalTxOptions.builder() + .timeoutMillis(options.timeoutMillis()) + .build(); + + return txManager.beginExplicit(observableTimestampTracker, options != null && options.readOnly(), internalTxOptions); } /** {@inheritDoc} */ @@ -110,11 +117,11 @@ public CompletableFuture beginAsync(@Nullable TransactionOptions op * @return The started transaction. */ public InternalTransaction beginImplicit(boolean readOnly) { - return txManager.begin(observableTimestampTracker, true, readOnly); + return txManager.beginImplicit(observableTimestampTracker, readOnly); } @TestOnly public Transaction beginWithPriority(boolean readOnly, TxPriority priority) { - return txManager.begin(observableTimestampTracker, false, readOnly, priority); + return txManager.beginExplicit(observableTimestampTracker, readOnly, InternalTxOptions.defaultsWithPriority(priority)); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index ed126aadd41..b48b157be19 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -29,11 +29,12 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.TestOnly; /** * The read-only implementation of an internal transaction. */ -class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { +public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { /** The read timestamp. */ private final HybridTimestamp readTimestamp; @@ -142,4 +143,9 @@ public CompletableFuture finish(boolean commit, HybridTimestamp executionT return txFuture; } + + @TestOnly + public boolean finishInitiated() { + return finishGuard.get(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java new file mode 100644 index 00000000000..8e50f7d19de --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.tx.InternalTransaction; + +class TransactionExpirationRegistry { + private static final IgniteLogger LOG = Loggers.forClass(TransactionExpirationRegistry.class); + + private final NavigableMap> txsByExpirationTime = new ConcurrentSkipListMap<>(); + private final Map expirationTimeByTx = new ConcurrentHashMap<>(); + + private final ReadWriteLock watermarkLock = new ReentrantReadWriteLock(); + private volatile HybridTimestamp watermark = HybridTimestamp.MIN_VALUE; + + void register(InternalTransaction tx, HybridTimestamp txExpirationTime) { + if (isExpired(txExpirationTime)) { + abortTransaction(tx); + return; + } + + watermarkLock.readLock().lock(); + + try { + if (isExpired(txExpirationTime)) { + abortTransaction(tx); + return; + } + + Set txsExpiringAtTs = txsByExpirationTime.computeIfAbsent( + txExpirationTime, + k -> ConcurrentHashMap.newKeySet() + ); + txsExpiringAtTs.add(tx); + + expirationTimeByTx.put(tx, txExpirationTime); + } finally { + watermarkLock.readLock().unlock(); + } + } + + private boolean isExpired(HybridTimestamp expirationTime) { + return expirationTime.compareTo(watermark) <= 0; + } + + private static void abortTransaction(InternalTransaction tx) { + tx.rollbackAsync().whenComplete((res, ex) -> { + if (ex != null) { + LOG.error("Transaction abort due to timeout failed [txId={}]", ex, tx.id()); + } + }); + } + + void expireUpTo(HybridTimestamp expirationTime) { + List> transactionSetsToExpire; + + watermarkLock.writeLock().lock(); + + try { + NavigableMap> headMap = txsByExpirationTime.headMap(expirationTime, true); + transactionSetsToExpire = new ArrayList<>(headMap.values()); + headMap.clear(); + + watermark = expirationTime; + } finally { + watermarkLock.writeLock().unlock(); + } + + for (Set set : transactionSetsToExpire) { + for (InternalTransaction tx : set) { + expirationTimeByTx.remove(tx); + + abortTransaction(tx); + } + } + } + + void abortAllRegistered() { + expireUpTo(HybridTimestamp.MAX_VALUE); + } + + void unregister(InternalTransaction tx) { + HybridTimestamp expirationTime = expirationTimeByTx.remove(tx); + + if (expirationTime != null) { + txsByExpirationTime.compute(expirationTime, (k, set) -> { + if (set == null) { + return null; + } + + set.remove(tx); + + return set.isEmpty() ? null : set; + }); + } + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index ff8ac829921..45696e0b7cf 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.function.Function.identity; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; @@ -49,6 +50,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -83,18 +86,19 @@ import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse; import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; import org.apache.ignite.internal.replicator.message.ReplicaResponse; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; import org.apache.ignite.internal.systemview.api.SystemView; import org.apache.ignite.internal.systemview.api.SystemViewProvider; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.LocalRwTxCounter; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TransactionResult; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.TxPriority; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.TxStateMetaFinishing; @@ -122,6 +126,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi /** Transaction configuration. */ private final TransactionConfiguration txConfig; + private final LowWatermarkConfiguration lowWatermarkConfig; + /** Lock manager. */ private final LockManager lockManager; @@ -197,14 +203,21 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi private final ReplicaService replicaService; + private final ScheduledExecutorService commonScheduler; + private final TransactionsViewProvider txViewProvider = new TransactionsViewProvider(); private volatile PersistentTxStateVacuumizer persistentTxStateVacuumizer; + private final TransactionExpirationRegistry transactionExpirationRegistry = new TransactionExpirationRegistry(); + + private volatile @Nullable ScheduledFuture transactionExpirationJobFuture; + /** * Test-only constructor. * * @param txConfig Transaction configuration. + * @param lowWatermarkConfig Low watermark configuration. * @param clusterService Cluster service. * @param replicaService Replica service. * @param lockManager Lock manager. @@ -220,6 +233,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi @TestOnly public TxManagerImpl( TransactionConfiguration txConfig, + LowWatermarkConfiguration lowWatermarkConfig, ClusterService clusterService, ReplicaService replicaService, LockManager lockManager, @@ -230,11 +244,13 @@ public TxManagerImpl( LocalRwTxCounter localRwTxCounter, RemotelyTriggeredResourceRegistry resourcesRegistry, TransactionInflights transactionInflights, - LowWatermark lowWatermark + LowWatermark lowWatermark, + ScheduledExecutorService commonScheduler ) { this( clusterService.nodeName(), txConfig, + lowWatermarkConfig, clusterService.messagingService(), clusterService.topologyService(), replicaService, @@ -247,7 +263,8 @@ public TxManagerImpl( ForkJoinPool.commonPool(), resourcesRegistry, transactionInflights, - lowWatermark + lowWatermark, + commonScheduler ); } @@ -255,6 +272,7 @@ public TxManagerImpl( * The constructor. * * @param txConfig Transaction configuration. + * @param lowWatermarkConfig Low watermark configuration. * @param messagingService Messaging service. * @param topologyService Topology service. * @param replicaService Replica service. @@ -272,6 +290,7 @@ public TxManagerImpl( public TxManagerImpl( String nodeName, TransactionConfiguration txConfig, + LowWatermarkConfiguration lowWatermarkConfig, MessagingService messagingService, TopologyService topologyService, ReplicaService replicaService, @@ -284,9 +303,11 @@ public TxManagerImpl( Executor partitionOperationsExecutor, RemotelyTriggeredResourceRegistry resourcesRegistry, TransactionInflights transactionInflights, - LowWatermark lowWatermark + LowWatermark lowWatermark, + ScheduledExecutorService commonScheduler ) { this.txConfig = txConfig; + this.lowWatermarkConfig = lowWatermarkConfig; this.lockManager = lockManager; this.clockService = clockService; this.transactionIdGenerator = transactionIdGenerator; @@ -301,6 +322,7 @@ public TxManagerImpl( this.transactionInflights = transactionInflights; this.lowWatermark = lowWatermark; this.replicaService = replicaService; + this.commonScheduler = commonScheduler; placementDriverHelper = new PlacementDriverHelper(placementDriver, clockService); @@ -310,7 +332,7 @@ public TxManagerImpl( cpus, cpus, 100, - TimeUnit.MILLISECONDS, + MILLISECONDS, new LinkedBlockingQueue<>(), IgniteThreadFactory.create(nodeName, "tx-async-write-intent", LOG, STORAGE_READ, STORAGE_WRITE) ); @@ -371,19 +393,32 @@ private CompletableFuture primaryReplicaExpiredListener(PrimaryReplicaE } @Override - public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit) { - return begin(timestampTracker, implicit, false); + public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) { + return begin(timestampTracker, true, readOnly, InternalTxOptions.defaults()); } @Override - public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit, boolean readOnly) { - return begin(timestampTracker, implicit, readOnly, TxPriority.NORMAL); + public InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker, boolean readOnly, InternalTxOptions txOptions) { + return begin(timestampTracker, false, readOnly, txOptions); } - @Override - public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean implicit, boolean readOnly, TxPriority priority) { + private InternalTransaction begin( + HybridTimestampTracker timestampTracker, + boolean implicit, + boolean readOnly, + InternalTxOptions options + ) { + return inBusyLock(busyLock, () -> beginBusy(timestampTracker, implicit, readOnly, options)); + } + + private InternalTransaction beginBusy( + HybridTimestampTracker timestampTracker, + boolean implicit, + boolean readOnly, + InternalTxOptions options + ) { HybridTimestamp beginTimestamp = readOnly ? clockService.now() : createBeginTimestampWithIncrementRwTxCounter(); - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, priority); + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); startedTxs.add(1); @@ -391,8 +426,18 @@ public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolea txStateVolatileStorage.initialize(txId, localNodeId); return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit); + } else { + return beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options); } + } + private ReadOnlyTransactionImpl beginReadOnlyTransaction( + HybridTimestampTracker timestampTracker, + HybridTimestamp beginTimestamp, + UUID txId, + boolean implicit, + InternalTxOptions options + ) { HybridTimestamp observableTimestamp = timestampTracker.get(); HybridTimestamp readTimestamp = observableTimestamp != null @@ -410,17 +455,40 @@ public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolea try { CompletableFuture txFuture = new CompletableFuture<>(); + + var transaction = new ReadOnlyTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, readTimestamp, txFuture); + + // Implicit transactions are finished as soon as their operation/query is finished, they cannot be abandoned, so there is + // no need to register them. + if (!implicit) { + transactionExpirationRegistry.register(transaction, roExpirationTimeFor(beginTimestamp, options)); + } + txFuture.whenComplete((unused, throwable) -> { lowWatermark.unlock(txId); + + // We only register explicit transactions, so we only unregister them as well. + if (!implicit) { + transactionExpirationRegistry.unregister(transaction); + } }); - return new ReadOnlyTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, readTimestamp, txFuture); + return transaction; } catch (Throwable t) { lowWatermark.unlock(txId); throw t; } } + private HybridTimestamp roExpirationTimeFor(HybridTimestamp beginTimestamp, InternalTxOptions options) { + long effectiveTimeoutMillis = options.timeoutMillis() == 0 ? defaultRoTransactionTimeout() : options.timeoutMillis(); + return beginTimestamp.addPhysicalTime(effectiveTimeoutMillis); + } + + private long defaultRoTransactionTimeout() { + return lowWatermarkConfig.dataAvailabilityTime().value(); + } + /** * Current read timestamp, for calculation of read timestamp of read-only transactions. * @@ -780,10 +848,23 @@ public CompletableFuture startAsync(ComponentContext componentContext) { placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaElectedListener); + transactionExpirationJobFuture = commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow, 1000, 1000, MILLISECONDS); + return nullCompletedFuture(); }); } + private void expireTransactionsUpToNow() { + HybridTimestamp expirationTime = null; + + try { + expirationTime = clockService.current(); + transactionExpirationRegistry.expireUpTo(expirationTime); + } catch (Throwable t) { + LOG.error("Could not expire transactions up to {}", t, expirationTime); + } + } + @Override public void beforeNodeStop() { orphanDetector.stop(); @@ -805,6 +886,13 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaElectedListener); + ScheduledFuture expirationJobFuture = transactionExpirationJobFuture; + if (expirationJobFuture != null) { + expirationJobFuture.cancel(false); + } + + transactionExpirationRegistry.abortAllRegistered(); + shutdownAndAwaitTermination(writeIntentSwitchPool, 10, TimeUnit.SECONDS); return nullCompletedFuture(); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index ce458093052..ed22c665254 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -55,6 +55,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import java.util.stream.Stream; @@ -77,7 +78,10 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; +import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException; @@ -109,6 +113,7 @@ * Basic tests for a transaction manager. */ @ExtendWith(ConfigurationExtension.class) +@ExtendWith(ExecutorServiceExtension.class) public class TxManagerTest extends IgniteAbstractTest { private static final ClusterNode LOCAL_NODE = new ClusterNodeImpl(randomUUID(), "local", new NetworkAddress("127.0.0.1", 2004), null); @@ -134,6 +139,12 @@ public class TxManagerTest extends IgniteAbstractTest { @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private LowWatermarkConfiguration lowWatermarkConfiguration; + + @InjectExecutorService + private ScheduledExecutorService commonScheduler; + private final LocalRwTxCounter localRwTxCounter = spy(new TestLocalRwTxCounter()); private final TestLowWatermark lowWatermark = spy(new TestLowWatermark()); @@ -156,6 +167,7 @@ public void setup() { txManager = new TxManagerImpl( txConfiguration, + lowWatermarkConfiguration, clusterService, replicaService, lockManager(), @@ -166,7 +178,8 @@ public void setup() { localRwTxCounter, resourceRegistry, transactionInflights, - lowWatermark + lowWatermark, + commonScheduler ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); @@ -191,10 +204,10 @@ public void tearDown() { @Test public void testBegin() { - InternalTransaction tx0 = txManager.begin(hybridTimestampTracker, false); - InternalTransaction tx1 = txManager.begin(hybridTimestampTracker, false); - InternalTransaction tx2 = txManager.begin(hybridTimestampTracker, false, true); - InternalTransaction tx3 = txManager.begin(hybridTimestampTracker, false, true, TxPriority.NORMAL); + InternalTransaction tx0 = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); + InternalTransaction tx1 = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); + InternalTransaction tx2 = txManager.beginImplicitRw(hybridTimestampTracker); + InternalTransaction tx3 = txManager.beginImplicit(hybridTimestampTracker, true); assertNotNull(tx0.id()); assertNotNull(tx1.id()); @@ -202,8 +215,8 @@ public void testBegin() { assertNotNull(tx3.id()); assertFalse(tx0.isReadOnly()); - assertFalse(tx1.isReadOnly()); - assertTrue(tx2.isReadOnly()); + assertTrue(tx1.isReadOnly()); + assertFalse(tx2.isReadOnly()); assertTrue(tx3.isReadOnly()); } @@ -213,7 +226,7 @@ public void testEnlist() { assertEquals(LOCAL_NODE.address(), addr); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId = new TablePartitionId(1, 0); @@ -243,8 +256,10 @@ void testCreateNewRoTxAfterUpdateLowerWatermark() { assertThat(lowWatermark.updateAndNotify(new HybridTimestamp(10_000, 11)), willSucceedFast()); - IgniteInternalException exception = - assertThrows(IgniteInternalException.class, () -> txManager.begin(hybridTimestampTracker, false, true)); + IgniteInternalException exception = assertThrows( + IgniteInternalException.class, + () -> txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()) + ); assertEquals(Transactions.TX_READ_ONLY_TOO_OLD_ERR, exception.code()); } @@ -254,12 +269,12 @@ void testUpdateLowerWatermark() { // Let's check the absence of transactions. assertThat(lowWatermark.updateAndNotify(clockService.now()), willSucceedFast()); - InternalTransaction rwTx0 = txManager.begin(hybridTimestampTracker, false); + InternalTransaction rwTx0 = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); hybridTimestampTracker.update(clockService.now()); - InternalTransaction roTx0 = txManager.begin(hybridTimestampTracker, false, true); - InternalTransaction roTx1 = txManager.begin(hybridTimestampTracker, false, true); + InternalTransaction roTx0 = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); + InternalTransaction roTx1 = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); CompletableFuture readOnlyTxsFuture = lowWatermark.updateAndNotify(roTx1.readTimestamp()); assertFalse(readOnlyTxsFuture.isDone()); @@ -274,8 +289,8 @@ void testUpdateLowerWatermark() { assertTrue(readOnlyTxsFuture.isDone()); // Let's check only RW transactions. - txManager.begin(hybridTimestampTracker, false); - txManager.begin(hybridTimestampTracker, false); + txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); + txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); assertThat(lowWatermark.updateAndNotify(clockService.now()), willSucceedFast()); } @@ -291,7 +306,7 @@ public void testRepeatedCommitRollbackAfterCommit() throws Exception { when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) .thenReturn(completedFuture(new TransactionResult(TxState.COMMITTED, commitTimestamp))); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); @@ -312,7 +327,7 @@ public void testRepeatedCommitRollbackAfterRollback() throws Exception { when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) .thenReturn(completedFuture(new TransactionResult(TxState.ABORTED, null))); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); @@ -340,7 +355,7 @@ void testRepeatedCommitRollbackAfterCommitWithException() throws Exception { ) ))); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); @@ -368,7 +383,7 @@ public void testRepeatedCommitRollbackAfterRollbackWithException() throws Except ) ))); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); @@ -385,12 +400,12 @@ public void testRepeatedCommitRollbackAfterRollbackWithException() throws Except @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testTestOnlyPendingCommit(boolean startReadOnlyTransaction) { + public void testOnlyPendingCommit(boolean startReadOnlyTransaction) { assertEquals(0, txManager.pending()); assertEquals(0, txManager.finished()); // Start transaction. - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false, true); + InternalTransaction tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); assertEquals(1, txManager.pending()); assertEquals(0, txManager.finished()); @@ -412,14 +427,14 @@ public void testTestOnlyPendingCommit(boolean startReadOnlyTransaction) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testTestOnlyPendingRollback(boolean startReadOnlyTransaction) { + public void testOnlyPendingRollback(boolean startReadOnlyTransaction) { assertEquals(0, txManager.pending()); assertEquals(0, txManager.finished()); // Start transaction. InternalTransaction tx = - startReadOnlyTransaction ? txManager.begin(hybridTimestampTracker, false, true) - : txManager.begin(hybridTimestampTracker, false); + startReadOnlyTransaction ? txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()) + : txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); assertEquals(1, txManager.pending()); assertEquals(0, txManager.finished()); @@ -447,14 +462,14 @@ public void testObservableTimestamp() { HybridTimestamp now = clockService.now(); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false, true); + InternalTransaction tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); assertTrue(abs(now.getPhysical() - tx.readTimestamp().getPhysical()) > compareThreshold); tx.commit(); hybridTimestampTracker.update(now); - tx = txManager.begin(hybridTimestampTracker, false, true); + tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); assertTrue(abs(now.getPhysical() - tx.readTimestamp().getPhysical()) < compareThreshold); tx.commit(); @@ -468,7 +483,7 @@ public void testObservableTimestamp() { hybridTimestampTracker.update(timestampInPast); - tx = txManager.begin(hybridTimestampTracker, false, true); + tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); long readTime = now.getPhysical() - idleSafeTimePropagationPeriodMsSupplier.getAsLong() - clockService.maxClockSkewMillis(); @@ -485,7 +500,7 @@ public void testObservableTimestampLocally() { HybridTimestamp now = clockService.now(); - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false, true); + InternalTransaction tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); HybridTimestamp firstReadTs = tx.readTimestamp(); @@ -495,7 +510,7 @@ public void testObservableTimestampLocally() { + idleSafeTimePropagationPeriodMsSupplier.getAsLong() + clockService.maxClockSkewMillis()); tx.commit(); - tx = txManager.begin(hybridTimestampTracker, false, true); + tx = txManager.beginExplicit(hybridTimestampTracker, true, InternalTxOptions.defaults()); assertTrue(firstReadTs.compareTo(tx.readTimestamp()) <= 0); @@ -607,7 +622,7 @@ public void testExpiredExceptionDoesNotShadeResponseExceptions() { @Test public void testOnlyPrimaryExpirationAffectsTransaction() { // Prepare transaction. - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); ClusterNode node = mock(ClusterNode.class); @@ -687,7 +702,7 @@ public void testFinishExpiredWithDifferentEnlistmentConsistencyToken() { @ParameterizedTest(name = "readOnly = {0}") @ValueSource(booleans = {true, false}) void testIncrementLocalRwTxCounterOnBeginTransaction(boolean readOnly) { - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false, readOnly); + InternalTransaction tx = txManager.beginExplicit(hybridTimestampTracker, readOnly, InternalTxOptions.defaults()); VerificationMode verificationMode = readOnly ? never() : times(1); @@ -698,7 +713,7 @@ void testIncrementLocalRwTxCounterOnBeginTransaction(boolean readOnly) { @ParameterizedTest(name = "readOnly = {0}, commit = {1}") @MethodSource("txTypeAndWayCompleteTx") void testDecrementLocalRwTxCounterOnCompleteTransaction(boolean readOnly, boolean commit) { - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false, readOnly); + InternalTransaction tx = txManager.beginExplicit(hybridTimestampTracker, readOnly, InternalTxOptions.defaults()); clearInvocations(localRwTxCounter); @@ -745,11 +760,11 @@ void testCreateBeginTsInsideInUpdateRwTxCount() { return result; }).when(localRwTxCounter).inUpdateRwTxCountLock(any()); - txManager.begin(hybridTimestampTracker, false, false); + txManager.beginExplicit(hybridTimestampTracker, false, InternalTxOptions.defaults()); } private InternalTransaction prepareTransaction() { - InternalTransaction tx = txManager.begin(hybridTimestampTracker, false); + InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryTest.java new file mode 100644 index 00000000000..6cdb0e2e227 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TransactionExpirationRegistryTest extends BaseIgniteAbstractTest { + private final TransactionExpirationRegistry registry = new TransactionExpirationRegistry(); + + @Mock + private InternalTransaction tx1; + + @Mock + private InternalTransaction tx2; + + @BeforeEach + void configureMocks() { + lenient().when(tx1.rollbackAsync()).thenReturn(nullCompletedFuture()); + lenient().when(tx2.rollbackAsync()).thenReturn(nullCompletedFuture()); + } + + @Test + void abortsTransactionsBeforeExpirationTime() { + registry.register(tx1, new HybridTimestamp(1000, 0)); + registry.register(tx2, new HybridTimestamp(2000, 0)); + + registry.expireUpTo(new HybridTimestamp(3000, 0)); + + verify(tx1).rollbackAsync(); + verify(tx2).rollbackAsync(); + } + + @Test + void abortsTransactionsExactlyOnExpirationTime() { + registry.register(tx1, new HybridTimestamp(1000, 0)); + + registry.expireUpTo(new HybridTimestamp(1000, 0)); + + verify(tx1).rollbackAsync(); + } + + @Test + void doesNotAbortTransactionsAfterExpirationTime() { + registry.register(tx1, new HybridTimestamp(1000, 1)); + + registry.expireUpTo(new HybridTimestamp(1000, 0)); + + verify(tx1, never()).rollbackAsync(); + } + + @Test + void abortsTransactionsExpiredAfterFewExpirations() { + registry.register(tx1, new HybridTimestamp(1000, 1)); + + registry.expireUpTo(new HybridTimestamp(1000, 0)); + registry.expireUpTo(new HybridTimestamp(2000, 0)); + + verify(tx1).rollbackAsync(); + } + + @Test + void abortsAlreadyExpiredTransactionOnRegistration() { + registry.expireUpTo(new HybridTimestamp(2000, 0)); + + registry.register(tx1, new HybridTimestamp(1000, 0)); + registry.register(tx2, new HybridTimestamp(2000, 0)); + + verify(tx1).rollbackAsync(); + verify(tx2).rollbackAsync(); + } + + @Test + void abortsAlreadyExpiredTransactionJustOnce() { + registry.expireUpTo(new HybridTimestamp(2000, 0)); + + registry.register(tx1, new HybridTimestamp(1000, 0)); + registry.register(tx2, new HybridTimestamp(2000, 0)); + + registry.expireUpTo(new HybridTimestamp(2000, 0)); + + verify(tx1, times(1)).rollbackAsync(); + verify(tx2, times(1)).rollbackAsync(); + } + + @Test + void abortsAllRegistered() { + registry.register(tx1, new HybridTimestamp(1000, 0)); + registry.register(tx2, HybridTimestamp.MAX_VALUE); + + registry.abortAllRegistered(); + + verify(tx1).rollbackAsync(); + verify(tx2).rollbackAsync(); + } + + @Test + void abortsOnRegistrationAfterAbortingAllRegistered() { + registry.abortAllRegistered(); + + registry.register(tx1, new HybridTimestamp(1000, 0)); + registry.register(tx2, HybridTimestamp.MAX_VALUE); + + verify(tx1).rollbackAsync(); + verify(tx2).rollbackAsync(); + } + + @Test + void removesTransactionOnUnregister() { + registry.register(tx1, new HybridTimestamp(1000, 0)); + + registry.unregister(tx1); + + registry.expireUpTo(new HybridTimestamp(2000, 0)); + + // Should not be aborted due to expiration as we removed the transaction. + verify(tx1, never()).rollbackAsync(); + } + + @Test + void unregisterIsIdempotent() { + registry.register(tx1, new HybridTimestamp(1000, 0)); + + registry.unregister(tx1); + registry.unregister(tx1); + } +}