diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index bb1a804442f..270e164234a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -116,8 +116,7 @@ public class ContainerStateMachine extends BaseStateMachine { private final XceiverServerRatis ratisServer; private final ConcurrentHashMap> writeChunkFutureMap; - private final ConcurrentHashMap> - createContainerFutureMap; + private final ConcurrentHashMap createContainerResponseMap; private ExecutorService[] executors; private final int numExecutors; private final Map containerCommandCompletionMap; @@ -137,7 +136,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, this.numExecutors = executors.size(); this.executors = executors.toArray(new ExecutorService[numExecutors]); this.writeChunkFutureMap = new ConcurrentHashMap<>(); - this.createContainerFutureMap = new ConcurrentHashMap<>(); + this.createContainerResponseMap = new ConcurrentHashMap<>(); containerCommandCompletionMap = new ConcurrentHashMap<>(); } @@ -289,17 +288,8 @@ private ExecutorService getCommandExecutor( private CompletableFuture handleWriteChunk( ContainerCommandRequestProto requestProto, long entryIndex) { final WriteChunkRequestProto write = requestProto.getWriteChunk(); - long containerID = write.getBlockID().getContainerID(); - CompletableFuture future = - createContainerFutureMap.get(containerID); - CompletableFuture writeChunkFuture; - if (future != null) { - writeChunkFuture = future.thenApplyAsync( - v -> runCommand(requestProto), chunkExecutor); - } else { - writeChunkFuture = CompletableFuture.supplyAsync( - () -> runCommand(requestProto), chunkExecutor); - } + CompletableFuture writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), chunkExecutor); writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData() @@ -319,9 +309,9 @@ private CompletableFuture handleWriteChunk( private CompletableFuture handleCreateContainer( ContainerCommandRequestProto requestProto) { long containerID = requestProto.getContainerID(); - createContainerFutureMap. - computeIfAbsent(containerID, k -> new CompletableFuture<>()); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + Message response = runCommand(requestProto); + createContainerResponseMap.put(containerID, response); + return CompletableFuture.completedFuture(response); } /* @@ -508,6 +498,10 @@ public CompletableFuture applyTransaction(TransactionContext trx) { future = CompletableFuture .supplyAsync(() -> runCommand(containerCommandRequestProto), getCommandExecutor(requestProto)); + } else if (cmdType == Type.CreateContainer) { + long containerID = requestProto.getContainerID(); + return CompletableFuture.completedFuture( + createContainerResponseMap.get(containerID)); } else { // Make sure that in write chunk, the user data is not set if (cmdType == Type.WriteChunk) { @@ -517,18 +511,6 @@ public CompletableFuture applyTransaction(TransactionContext trx) { future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), getCommandExecutor(requestProto)); } - // Mark the createContainerFuture complete so that writeStateMachineData - // for WriteChunk gets unblocked - if (cmdType == Type.CreateContainer) { - long containerID = requestProto.getContainerID(); - future.thenApply( - r -> { - createContainerFutureMap.remove(containerID).complete(null); - LOG.info("create Container Transaction completed for container " + - containerID + " log index " + index); - return r; - }); - } future.thenAccept(m -> { final Long previous =