HDDS-1558. IllegalArgumentException while processing container Reports.

Signed-off-by: Nanda kumar <nanda@apache.org>
This commit is contained in:
Shashikant Banerjee 2019-06-04 00:59:02 +05:30 committed by Nanda kumar
parent bd2590d71b
commit f3271126fc
5 changed files with 125 additions and 4 deletions

View File

@ -67,6 +67,7 @@ import io.opentracing.Scope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -299,8 +300,18 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
State containerState = container.getContainerData().getState();
Preconditions.checkState(
containerState == State.OPEN || containerState == State.CLOSING);
container.getContainerData()
.setState(ContainerDataProto.State.UNHEALTHY);
// mark and persist the container state to be unhealthy
try {
handler.markContainerUhealthy(container);
} catch (IOException ioe) {
// just log the error here in case marking the container fails,
// Return the actual failure response to the client
LOG.error("Failed to mark container " + containerID + " UNHEALTHY. ",
ioe);
}
// in any case, the in memory state of the container should be unhealthy
Preconditions.checkArgument(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}

View File

@ -129,6 +129,15 @@ public abstract class Handler {
public abstract void markContainerForClose(Container container)
throws IOException;
/**
* Marks the container Unhealthy. Moves the container to UHEALTHY state.
*
* @param container container to update
* @throws IOException in case of exception
*/
public abstract void markContainerUhealthy(Container container)
throws IOException;
/**
* Moves the Container to QUASI_CLOSED state.
*

View File

@ -339,8 +339,10 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
updateContainerFile(containerFile);
} catch (StorageContainerException ex) {
if (oldState != null) {
// Failed to update .container file. Reset the state to CLOSING
if (oldState != null
&& containerData.getState() != ContainerDataProto.State.UNHEALTHY) {
// Failed to update .container file. Reset the state to old state only
// if the current state is not unhealthy.
containerData.setState(oldState);
}
throw ex;

View File

@ -892,6 +892,14 @@ public class KeyValueHandler extends Handler {
}
}
@Override
public void markContainerUhealthy(Container container)
throws IOException {
// this will mark the container unhealthy and a close container action will
// be sent from the dispatcher ton SCM to close down this container.
container.markContainerUnhealthy();
}
@Override
public void quasiCloseContainer(Container container)
throws IOException {
@ -920,6 +928,12 @@ public class KeyValueHandler extends Handler {
if (state == State.CLOSED) {
return;
}
if (state == State.UNHEALTHY) {
throw new StorageContainerException(
"Cannot close container #" + container.getContainerData()
.getContainerID() + " while in " + state + " state.",
ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
// The container has to be either in CLOSING or in QUASI_CLOSED state.
if (state != State.CLOSING && state != State.QUASI_CLOSED) {
ContainerProtos.Result error = state == State.INVALID ?

View File

@ -24,12 +24,16 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -49,10 +53,13 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_STALENODE_INTERVAL;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/**
* Tests the containerStateMachine failure handling.
@ -185,4 +192,82 @@ public class TestContainerStateMachineFailures {
Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING,
dispatcher.dispatch(request.build(), null).getResult());
}
@Test
public void testUnhealthyContainer() throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
.build();
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
ContainerData containerData =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
KeyValueContainerData keyValueContainerData =
(KeyValueContainerData) containerData;
// delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
try {
key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
// Check metadata in the .container file
File containerFile = new File(keyValueContainerData.getMetadataPath(),
containerID + OzoneConsts.CONTAINER_EXTENSION);
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
// restart the hdds datanode and see if the container is listed in the
// in the missing container set and not in the regular set
cluster.restartHddsDatanode(0, true);
// make sure the container state is still marked unhealthy after restart
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
.readContainerFile(containerFile);
assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
OzoneContainer ozoneContainer;
ozoneContainer = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(
cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
dispatcher.dispatch(request.build(), null).getResult());
}
}