From 8655abb353811eba349dd0703d702deb02532242 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 6 Mar 2019 10:00:16 +0530 Subject: [PATCH] HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee. (cherry picked from commit 62e89dc275f120f54967744393e2ddde15575096) --- .../proto/DatanodeContainerProtocol.proto | 1 + .../server/ratis/ContainerStateMachine.java | 60 ++++++++++--------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 3b78835b99c..7396eb31eef 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -143,6 +143,7 @@ enum Result { BCSID_MISMATCH = 38; CONTAINER_NOT_OPEN = 39; CONTAINER_MISSING = 40; + BLOCK_TOKEN_VERIFICATION_FAILED = 41; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 759f9574567..ed7e099a650 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import io.opentracing.Scope; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; @@ -350,13 +352,20 @@ public class ContainerStateMachine extends BaseStateMachine { } private ContainerCommandResponseProto dispatchCommand( - ContainerCommandRequestProto requestProto, - DispatcherContext context) throws IOException { + ContainerCommandRequestProto requestProto, DispatcherContext context) { LOG.trace("dispatch {}", requestProto); - if(isBlockTokenEnabled) { - // ServerInterceptors intercepts incoming request and creates ugi. - tokenVerifier.verify(UserGroupInformation.getCurrentUser() - .getShortUserName(), requestProto.getEncodedToken()); + if (isBlockTokenEnabled) { + try { + // ServerInterceptors intercepts incoming request and creates ugi. + tokenVerifier + .verify(UserGroupInformation.getCurrentUser().getShortUserName(), + requestProto.getEncodedToken()); + } catch (IOException ioe) { + StorageContainerException sce = new StorageContainerException( + "Block token verification failed. " + ioe.getMessage(), ioe, + ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED); + return ContainerUtils.logAndReturnError(LOG, sce, requestProto); + } } ContainerCommandResponseProto response = dispatcher.dispatch(requestProto, context); @@ -365,7 +374,7 @@ public class ContainerStateMachine extends BaseStateMachine { } private Message runCommand(ContainerCommandRequestProto requestProto, - DispatcherContext context) throws IOException { + DispatcherContext context) { return dispatchCommand(requestProto, context)::toByteString; } @@ -394,14 +403,10 @@ public class ContainerStateMachine extends BaseStateMachine { .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) .setCreateContainerSet(createContainerSet) .build(); - CompletableFuture writeChunkFuture; - try { - Message msg = runCommand(requestProto, context); - writeChunkFuture = CompletableFuture - .supplyAsync(() -> msg, chunkExecutor); - }catch(IOException ie) { - writeChunkFuture = completeExceptionally(ie); - } + // ensure the write chunk happens asynchronously in writeChunkExecutor pool + // thread. + CompletableFuture writeChunkFuture = CompletableFuture + .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() @@ -567,16 +572,18 @@ public class ContainerStateMachine extends BaseStateMachine { // readStateMachineData should only be called for "write" to Ratis. Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); if (requestProto.getCmdType() == Type.WriteChunk) { - CompletableFuture future = new CompletableFuture<>(); - return future.supplyAsync(() -> { + final CompletableFuture future = new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { try { - return getCachedStateMachineData(entry.getIndex(), entry.getTerm(), - requestProto); + future.complete( + getCachedStateMachineData(entry.getIndex(), entry.getTerm(), + requestProto)); } catch (ExecutionException e) { future.completeExceptionally(e); - return null; } + return future; }, chunkExecutor); + return future; } else { throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() + " cannot have state machine data"); @@ -627,7 +634,6 @@ public class ContainerStateMachine extends BaseStateMachine { ContainerCommandRequestProto requestProto = getRequestProto(trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); - CompletableFuture future; // Make sure that in write chunk, the user data is not set if (cmdType == Type.WriteChunk) { Preconditions @@ -638,13 +644,11 @@ public class ContainerStateMachine extends BaseStateMachine { if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) { builder.setCreateContainerSet(createContainerSet); } - try { - Message msg = runCommand(requestProto, builder.build()); - future = CompletableFuture.supplyAsync(() -> msg, - getCommandExecutor(requestProto)); - } catch (IOException ie) { - future = completeExceptionally(ie); - } + // Ensure the command gets executed in a separate thread than + // stateMachineUpdater thread which is calling applyTransaction here. + CompletableFuture future = CompletableFuture + .supplyAsync(() -> runCommand(requestProto, builder.build()), + getCommandExecutor(requestProto)); lastIndex = index; future.thenAccept(m -> {