From a619d120a6c44bde2a846d61505a94f896e58e46 Mon Sep 17 00:00:00 2001 From: Jitendra Pandey Date: Sat, 13 Oct 2018 19:15:01 -0700 Subject: [PATCH] HDDS-579. ContainerStateMachine should fail subsequent transactions per container in case one fails. Contributed by Shashikant Banerjee. --- .../proto/DatanodeContainerProtocol.proto | 4 +- .../container/common/impl/HddsDispatcher.java | 63 ++++-- .../container/keyvalue/KeyValueHandler.java | 20 +- .../StorageContainerDatanodeProtocol.proto | 1 + .../TestContainerStateMachineFailures.java | 185 ++++++++++++++++++ 5 files changed, 242 insertions(+), 31 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 662df8f3aa5..da55db3e220 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -139,6 +139,7 @@ enum Result { CONTAINER_CHECKSUM_ERROR = 33; UNKNOWN_CONTAINER_TYPE = 34; BLOCK_NOT_COMMITTED = 35; + CONTAINER_UNHEALTHY = 36; } /** @@ -161,7 +162,8 @@ enum ContainerLifeCycleState { OPEN = 1; CLOSING = 2; CLOSED = 3; - INVALID = 4; + UNHEALTHY = 4; + INVALID = 5; } message ContainerCommandRequestProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index bb5002ae69e..1849841cf4b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -142,6 +142,26 @@ public ContainerCommandResponseProto dispatch( responseProto = handler.handle(msg, container); if (responseProto != null) { metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime); + + // If the request is of Write Type and the container operation + // is unsuccessful, it implies the applyTransaction on the container + // failed. All subsequent transactions on the container should fail and + // hence replica will be marked unhealthy here. In this case, a close + // container action will be sent to SCM to close the container. + if (!HddsUtils.isReadOnly(msg) + && responseProto.getResult() != ContainerProtos.Result.SUCCESS) { + // If the container is open and the container operation has failed, + // it should be first marked unhealthy and the initiate the close + // container action. This also implies this is the first transaction + // which has failed, so the container is marked unhealthy right here. + // Once container is marked unhealthy, all the subsequent write + // transactions will fail with UNHEALTHY_CONTAINER exception. + if (container.getContainerState() == ContainerLifeCycleState.OPEN) { + container.getContainerData() + .setState(ContainerLifeCycleState.UNHEALTHY); + sendCloseContainerActionIfNeeded(container); + } + } return responseProto; } else { return ContainerUtils.unsupportedRequest(msg); @@ -149,31 +169,46 @@ public ContainerCommandResponseProto dispatch( } /** - * If the container usage reaches the close threshold we send Close - * ContainerAction to SCM. - * + * If the container usage reaches the close threshold or the container is + * marked unhealthy we send Close ContainerAction to SCM. * @param container current state of container */ private void sendCloseContainerActionIfNeeded(Container container) { // We have to find a more efficient way to close a container. - Boolean isOpen = Optional.ofNullable(container) + boolean isSpaceFull = isContainerFull(container); + boolean shouldClose = isSpaceFull || isContainerUnhealthy(container); + if (shouldClose) { + ContainerData containerData = container.getContainerData(); + ContainerAction.Reason reason = + isSpaceFull ? ContainerAction.Reason.CONTAINER_FULL : + ContainerAction.Reason.CONTAINER_UNHEALTHY; + ContainerAction action = ContainerAction.newBuilder() + .setContainerID(containerData.getContainerID()) + .setAction(ContainerAction.Action.CLOSE).setReason(reason).build(); + context.addContainerActionIfAbsent(action); + } + } + + private boolean isContainerFull(Container container) { + boolean isOpen = Optional.ofNullable(container) .map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN) .orElse(Boolean.FALSE); if (isOpen) { ContainerData containerData = container.getContainerData(); - double containerUsedPercentage = 1.0f * containerData.getBytesUsed() / - containerData.getMaxSize(); - if (containerUsedPercentage >= containerCloseThreshold) { - ContainerAction action = ContainerAction.newBuilder() - .setContainerID(containerData.getContainerID()) - .setAction(ContainerAction.Action.CLOSE) - .setReason(ContainerAction.Reason.CONTAINER_FULL) - .build(); - context.addContainerActionIfAbsent(action); - } + double containerUsedPercentage = + 1.0f * containerData.getBytesUsed() / containerData.getMaxSize(); + return containerUsedPercentage >= containerCloseThreshold; + } else { + return false; } } + private boolean isContainerUnhealthy(Container container) { + return Optional.ofNullable(container).map( + cont -> (cont.getContainerState() == ContainerLifeCycleState.UNHEALTHY)) + .orElse(Boolean.FALSE); + } + @Override public Handler getHandler(ContainerProtos.ContainerType containerType) { return handlers.get(containerType); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 922db2ad888..4c87b1950f2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -79,22 +79,7 @@ import com.google.protobuf.ByteString; import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.BLOCK_NOT_COMMITTED; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CLOSED_CONTAINER_IO; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.DELETE_ON_OPEN_CONTAINER; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.GET_SMALL_FILE_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.INVALID_CONTAINER_STATE; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.IO_EXCEPTION; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.PUT_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Stage; import static org.apache.hadoop.ozone.OzoneConfigKeys @@ -819,6 +804,9 @@ private void checkContainerOpen(KeyValueContainer kvContainer) case CLOSED: result = CLOSED_CONTAINER_IO; break; + case UNHEALTHY: + result = CONTAINER_UNHEALTHY; + break; case INVALID: result = INVALID_CONTAINER_STATE; break; diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index f8fb32d995e..72d48a6a214 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -159,6 +159,7 @@ message ContainerAction { enum Reason { CONTAINER_FULL = 1; + CONTAINER_UNHEALTHY = 2; } required int64 containerID = 1; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java new file mode 100644 index 00000000000..0e593fb2172 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.ContainerAction.Action; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.ContainerAction.Reason; +import org.apache.hadoop.hdds.scm.container. + common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys. + HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys. + HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys. + HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys. + OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests the containerStateMachine failure handling. + */ + +public class TestContainerStateMachineFailures { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static String volumeName; + private static String bucketName; + private static String path; + private static int chunkSize; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + path = GenericTestUtils + .getTempPath(TestContainerStateMachineFailures.class.getSimpleName()); + File baseDir = new File(path); + baseDir.mkdirs(); + + chunkSize = (int) OzoneConsts.MB; + + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + volumeName = "testcontainerstatemachinefailures"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testContainerStateMachineFailures() throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE); + key.write("ratis".getBytes()); + + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). + setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") + .build(); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + + long containerID = omKeyLocationInfo.getContainerID(); + // delete the container dir + FileUtil.fullyDelete(new File( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() + .getContainerPath())); + try { + // flush will throw an exception + key.flush(); + Assert.fail("Expected exception not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof StorageContainerException); + } + + // Make sure the container is marked unhealthy + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerState() + == ContainerProtos.ContainerLifeCycleState.UNHEALTHY); + try { + // subsequent requests will fail with unhealthy container exception + key.close(); + Assert.fail("Expected exception not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof StorageContainerException); + Assert.assertTrue(((StorageContainerException) ioe.getCause()).getResult() + == ContainerProtos.Result.CONTAINER_UNHEALTHY); + } + StorageContainerDatanodeProtocolProtos.ContainerAction action = + StorageContainerDatanodeProtocolProtos.ContainerAction.newBuilder() + .setContainerID(containerID).setAction(Action.CLOSE) + .setReason(Reason.CONTAINER_UNHEALTHY) + .build(); + + // Make sure the container close action is initiated to SCM. + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext() + .getAllPendingContainerActions().contains(action)); + } +}