diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java index ebbec4de66a..ccf57cb2f36 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java @@ -52,6 +52,7 @@ public class CSMMetrics { // Failure Metrics private @Metric MutableCounterLong numWriteStateMachineFails; + private @Metric MutableCounterLong numWriteDataFails; private @Metric MutableCounterLong numQueryStateMachineFails; private @Metric MutableCounterLong numApplyTransactionFails; private @Metric MutableCounterLong numReadStateMachineFails; @@ -97,6 +98,10 @@ public class CSMMetrics { numWriteStateMachineFails.incr(); } + public void incNumWriteDataFails() { + numWriteDataFails.incr(); + } + public void incNumQueryStateMachineFails() { numQueryStateMachineFails.incr(); } @@ -141,6 +146,11 @@ public class CSMMetrics { return numWriteStateMachineFails.value(); } + @VisibleForTesting + public long getNumWriteDataFails() { + return numWriteDataFails.value(); + } + @VisibleForTesting public long getNumQueryStateMachineFails() { return numQueryStateMachineFails.value(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 768b37bf258..f4d4744d5a6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -137,8 +137,8 @@ public class ContainerStateMachine extends BaseStateMachine { private final ContainerDispatcher dispatcher; private ThreadPoolExecutor chunkExecutor; private final XceiverServerRatis ratisServer; - private final ConcurrentHashMap> - writeChunkFutureMap; + private final ConcurrentHashMap> writeChunkFutureMap; // keeps track of the containers created per pipeline private final Set createContainerSet; @@ -385,9 +385,15 @@ public class ContainerStateMachine extends BaseStateMachine { return response; } + private ContainerCommandResponseProto runCommandGetResponse( + ContainerCommandRequestProto requestProto, + DispatcherContext context) { + return dispatchCommand(requestProto, context); + } + private Message runCommand(ContainerCommandRequestProto requestProto, DispatcherContext context) { - return dispatchCommand(requestProto, context)::toByteString; + return runCommandGetResponse(requestProto, context)::toByteString; } private ExecutorService getCommandExecutor( @@ -417,8 +423,11 @@ public class ContainerStateMachine extends BaseStateMachine { .build(); // ensure the write chunk happens asynchronously in writeChunkExecutor pool // thread. - CompletableFuture writeChunkFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); + CompletableFuture writeChunkFuture = + CompletableFuture.supplyAsync(() -> + runCommandGetResponse(requestProto, context), chunkExecutor); + + CompletableFuture raftFuture = new CompletableFuture<>(); writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " + @@ -427,15 +436,29 @@ public class ContainerStateMachine extends BaseStateMachine { // Remove the future once it finishes execution from the // writeChunkFutureMap. writeChunkFuture.thenApply(r -> { - metrics.incNumBytesWrittenCount( - requestProto.getWriteChunk().getChunkData().getLen()); + if (r.getResult() != ContainerProtos.Result.SUCCESS) { + StorageContainerException sce = + new StorageContainerException(r.getMessage(), r.getResult()); + LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName() + " Error message: " + + r.getMessage() + " Container Result: " + r.getResult()); + metrics.incNumWriteDataFails(); + raftFuture.completeExceptionally(sce); + } else { + metrics.incNumBytesWrittenCount( + requestProto.getWriteChunk().getChunkData().getLen()); + LOG.debug(gid + + ": writeChunk writeStateMachineData completed: blockId" + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName()); + raftFuture.complete(r::toByteString); + } + writeChunkFutureMap.remove(entryIndex); - LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" + - write.getBlockID() + " logIndex " + entryIndex + " chunkName " - + write.getChunkData().getChunkName()); return r; }); - return writeChunkFuture; + return raftFuture; } /* @@ -544,7 +567,7 @@ public class ContainerStateMachine extends BaseStateMachine { */ @Override public CompletableFuture flushStateMachineData(long index) { - List> futureList = + List> futureList = writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) .map(Map.Entry::getValue).collect(Collectors.toList()); return CompletableFuture.allOf( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 13e3eff825e..2c3cfab045e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -139,13 +139,7 @@ public class TestContainerStateMachine { .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - try { - key.close(); - Assert.fail(); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getMessage().contains( - "Requested operation not allowed as ContainerState is UNHEALTHY")); - } + key.close(); // Make sure the container is marked unhealthy Assert.assertTrue( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 744f687286a..469eeb0adee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -58,6 +59,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys. HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys. OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys. + OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -96,6 +99,8 @@ public class TestContainerStateMachineFailures { TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, + TimeUnit.SECONDS); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200) @@ -126,10 +131,14 @@ public class TestContainerStateMachineFailures { objectStore.getVolume(volumeName).getBucket(bucketName) .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); + byte[] testData = "ratis".getBytes(); + long written = 0; // First write and flush creates a container in the datanode - key.write("ratis".getBytes()); + key.write(testData); + written += testData.length; key.flush(); - key.write("ratis".getBytes()); + key.write(testData); + written += testData.length; //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). @@ -148,13 +157,7 @@ public class TestContainerStateMachineFailures { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - try { - key.close(); - Assert.fail(); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getMessage().contains( - "Requested operation not allowed as ContainerState is UNHEALTHY")); - } + key.close(); long containerID = omKeyLocationInfo.getContainerID(); // Make sure the container is marked unhealthy @@ -170,27 +173,28 @@ public class TestContainerStateMachineFailures { HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher(); Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty()); - // restart the hdds datanode and see if the container is listed in the - // in the missing container set and not in the regular set + // restart the hdds datanode, container should not in the regular set cluster.restartHddsDatanode(0, true); ozoneContainer = cluster.getHddsDatanodes().get(0) .getDatanodeStateMachine().getContainer(); - dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher(); - Assert .assertNull(ozoneContainer.getContainerSet().getContainer(containerID)); - Assert.assertTrue(dispatcher.getMissingContainerSet() - .contains(containerID)); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CreateContainer); - request.setContainerID(containerID); - request.setCreateContainer( - ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); - request.setDatanodeUuid( - cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString()); - Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING, - dispatcher.dispatch(request.build(), null).getResult()); + + OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName) + .getBucket(bucketName).getKey("ratis"); + + /** + * Ensure length of data stored in key is equal to number of bytes written. + */ + Assert.assertTrue("Number of bytes stored in the key is not equal " + + "to number of bytes written.", keyDetails.getDataSize() == written); + + /** + * Pending data from the second write should get written to a new container + * during key.close() because the first container is UNHEALTHY by that time + */ + Assert.assertTrue("Expect Key to be stored in 2 separate containers", + keyDetails.getOzoneKeyLocations().size() == 2); } @Test @@ -224,13 +228,9 @@ public class TestContainerStateMachineFailures { (KeyValueContainerData) containerData; // delete the container db file FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath())); - try { - key.close(); - Assert.fail(); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getMessage().contains( - "Requested operation not allowed as ContainerState is UNHEALTHY")); - } + + key.close(); + long containerID = omKeyLocationInfo.getContainerID(); // Make sure the container is marked unhealthy