diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1f194d3956a..8fe609c519a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -90,6 +90,14 @@ public final class ScmConfigKeys {
"dfs.container.ratis.statemachinedata.sync.retries";
public static final int
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT = -1;
+ public static final String
+ DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS =
+ "dfs.container.ratis.statemachine.max.pending.apply-transactions";
+ // The default value of maximum number of pending state machine apply
+ // transactions is kept same as default snapshot threshold.
+ public static final int
+ DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS_DEFAULT =
+ 100000;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
"dfs.container.ratis.log.queue.num-elements";
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b2f820b373d..a88dd82a238 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -186,6 +186,15 @@
taken.
+
+ dfs.container.ratis.statemachine.max.pending.apply-transactions
+ 10000
+ OZONE, RATIS
+ Maximum number of pending apply transactions in a data
+ pipeline. The default value is kept same as default snapshot threshold
+ dfs.ratis.snapshot.threshold.
+
+
dfs.container.ratis.num.write.chunk.threads
60
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index d82d1142307..872cc8abd45 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -79,6 +80,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -146,6 +148,8 @@ public class ContainerStateMachine extends BaseStateMachine {
private final Cache stateMachineDataCache;
private final boolean isBlockTokenEnabled;
private final TokenVerifier tokenVerifier;
+
+ private final Semaphore applyTransactionSemaphore;
/**
* CSM metrics.
*/
@@ -175,6 +179,12 @@ public class ContainerStateMachine extends BaseStateMachine {
final int numContainerOpExecutors = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
+ int maxPendingApplyTransactions = conf.getInt(
+ ScmConfigKeys.
+ DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS,
+ ScmConfigKeys.
+ DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS_DEFAULT);
+ applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
this.executors = new ExecutorService[numContainerOpExecutors];
for (int i = 0; i < numContainerOpExecutors; i++) {
final int index = i;
@@ -626,6 +636,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setLogIndex(index);
try {
+ applyTransactionSemaphore.acquire();
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
getContainerCommandRequestProto(
@@ -663,9 +674,9 @@ public class ContainerStateMachine extends BaseStateMachine {
requestProto.getWriteChunk().getChunkData().getLen());
}
updateLastApplied();
- });
+ }).whenComplete((r, t) -> applyTransactionSemaphore.release());
return future;
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
}