HDDS-1798. Propagate failure in writeStateMachineData to Ratis. Contributed by Supratim Deka (#1113)

This commit is contained in:
supratimdeka 2019-08-05 13:43:41 +05:30 committed by bshashikant
parent 9680a8b237
commit f8ea6e1ce1
4 changed files with 78 additions and 51 deletions

View File

@ -52,6 +52,7 @@ public class CSMMetrics {
// Failure Metrics // Failure Metrics
private @Metric MutableCounterLong numWriteStateMachineFails; private @Metric MutableCounterLong numWriteStateMachineFails;
private @Metric MutableCounterLong numWriteDataFails;
private @Metric MutableCounterLong numQueryStateMachineFails; private @Metric MutableCounterLong numQueryStateMachineFails;
private @Metric MutableCounterLong numApplyTransactionFails; private @Metric MutableCounterLong numApplyTransactionFails;
private @Metric MutableCounterLong numReadStateMachineFails; private @Metric MutableCounterLong numReadStateMachineFails;
@ -97,6 +98,10 @@ public void incNumWriteStateMachineFails() {
numWriteStateMachineFails.incr(); numWriteStateMachineFails.incr();
} }
public void incNumWriteDataFails() {
numWriteDataFails.incr();
}
public void incNumQueryStateMachineFails() { public void incNumQueryStateMachineFails() {
numQueryStateMachineFails.incr(); numQueryStateMachineFails.incr();
} }
@ -141,6 +146,11 @@ public long getNumWriteStateMachineFails() {
return numWriteStateMachineFails.value(); return numWriteStateMachineFails.value();
} }
@VisibleForTesting
public long getNumWriteDataFails() {
return numWriteDataFails.value();
}
@VisibleForTesting @VisibleForTesting
public long getNumQueryStateMachineFails() { public long getNumQueryStateMachineFails() {
return numQueryStateMachineFails.value(); return numQueryStateMachineFails.value();

View File

@ -137,8 +137,8 @@ public class ContainerStateMachine extends BaseStateMachine {
private final ContainerDispatcher dispatcher; private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor chunkExecutor; private ThreadPoolExecutor chunkExecutor;
private final XceiverServerRatis ratisServer; private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>> private final ConcurrentHashMap<Long,
writeChunkFutureMap; CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
// keeps track of the containers created per pipeline // keeps track of the containers created per pipeline
private final Set<Long> createContainerSet; private final Set<Long> createContainerSet;
@ -385,9 +385,15 @@ private ContainerCommandResponseProto dispatchCommand(
return response; return response;
} }
private ContainerCommandResponseProto runCommandGetResponse(
ContainerCommandRequestProto requestProto,
DispatcherContext context) {
return dispatchCommand(requestProto, context);
}
private Message runCommand(ContainerCommandRequestProto requestProto, private Message runCommand(ContainerCommandRequestProto requestProto,
DispatcherContext context) { DispatcherContext context) {
return dispatchCommand(requestProto, context)::toByteString; return runCommandGetResponse(requestProto, context)::toByteString;
} }
private ExecutorService getCommandExecutor( private ExecutorService getCommandExecutor(
@ -417,8 +423,11 @@ private CompletableFuture<Message> handleWriteChunk(
.build(); .build();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool // ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread. // thread.
CompletableFuture<Message> writeChunkFuture = CompletableFuture CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
.supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); CompletableFuture.supplyAsync(() ->
runCommandGetResponse(requestProto, context), chunkExecutor);
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
writeChunkFutureMap.put(entryIndex, writeChunkFuture); writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " + LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
@ -427,15 +436,29 @@ private CompletableFuture<Message> handleWriteChunk(
// Remove the future once it finishes execution from the // Remove the future once it finishes execution from the
// writeChunkFutureMap. // writeChunkFutureMap.
writeChunkFuture.thenApply(r -> { writeChunkFuture.thenApply(r -> {
metrics.incNumBytesWrittenCount( if (r.getResult() != ContainerProtos.Result.SUCCESS) {
requestProto.getWriteChunk().getChunkData().getLen()); StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
LOG.debug(gid +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
raftFuture.complete(r::toByteString);
}
writeChunkFutureMap.remove(entryIndex); writeChunkFutureMap.remove(entryIndex);
LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName());
return r; return r;
}); });
return writeChunkFuture; return raftFuture;
} }
/* /*
@ -544,7 +567,7 @@ private ByteString getCachedStateMachineData(Long logIndex, long term,
*/ */
@Override @Override
public CompletableFuture<Void> flushStateMachineData(long index) { public CompletableFuture<Void> flushStateMachineData(long index) {
List<CompletableFuture<Message>> futureList = List<CompletableFuture<ContainerCommandResponseProto>> futureList =
writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
.map(Map.Entry::getValue).collect(Collectors.toList()); .map(Map.Entry::getValue).collect(Collectors.toList());
return CompletableFuture.allOf( return CompletableFuture.allOf(

View File

@ -139,13 +139,7 @@ public void testContainerStateMachineFailures() throws Exception {
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath())); .getContainerPath()));
try { key.close();
key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
// Make sure the container is marked unhealthy // Make sure the container is marked unhealthy
Assert.assertTrue( Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@ -58,6 +59,8 @@
HDDS_SCM_WATCHER_TIMEOUT; HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys. import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_STALENODE_INTERVAL; OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -96,6 +99,8 @@ public static void init() throws Exception {
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
TimeUnit.SECONDS);
conf.setQuietMode(false); conf.setQuietMode(false);
cluster = cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200) MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
@ -126,10 +131,14 @@ public void testContainerStateMachineFailures() throws Exception {
objectStore.getVolume(volumeName).getBucket(bucketName) objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS, .createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>()); ReplicationFactor.ONE, new HashMap<>());
byte[] testData = "ratis".getBytes();
long written = 0;
// First write and flush creates a container in the datanode // First write and flush creates a container in the datanode
key.write("ratis".getBytes()); key.write(testData);
written += testData.length;
key.flush(); key.flush();
key.write("ratis".getBytes()); key.write(testData);
written += testData.length;
//get the name of a valid container //get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
@ -148,13 +157,7 @@ public void testContainerStateMachineFailures() throws Exception {
.getContainer().getContainerSet() .getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath())); .getContainerPath()));
try { key.close();
key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
long containerID = omKeyLocationInfo.getContainerID(); long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy // Make sure the container is marked unhealthy
@ -170,27 +173,28 @@ public void testContainerStateMachineFailures() throws Exception {
HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher(); HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty()); Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
// restart the hdds datanode and see if the container is listed in the // restart the hdds datanode, container should not in the regular set
// in the missing container set and not in the regular set
cluster.restartHddsDatanode(0, true); cluster.restartHddsDatanode(0, true);
ozoneContainer = cluster.getHddsDatanodes().get(0) ozoneContainer = cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().getContainer(); .getDatanodeStateMachine().getContainer();
dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
Assert Assert
.assertNull(ozoneContainer.getContainerSet().getContainer(containerID)); .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
Assert.assertTrue(dispatcher.getMissingContainerSet()
.contains(containerID)); OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
ContainerProtos.ContainerCommandRequestProto.Builder request = .getBucket(bucketName).getKey("ratis");
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer); /**
request.setContainerID(containerID); * Ensure length of data stored in key is equal to number of bytes written.
request.setCreateContainer( */
ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); Assert.assertTrue("Number of bytes stored in the key is not equal " +
request.setDatanodeUuid( "to number of bytes written.", keyDetails.getDataSize() == written);
cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING, /**
dispatcher.dispatch(request.build(), null).getResult()); * Pending data from the second write should get written to a new container
* during key.close() because the first container is UNHEALTHY by that time
*/
Assert.assertTrue("Expect Key to be stored in 2 separate containers",
keyDetails.getOzoneKeyLocations().size() == 2);
} }
@Test @Test
@ -224,13 +228,9 @@ public void testUnhealthyContainer() throws Exception {
(KeyValueContainerData) containerData; (KeyValueContainerData) containerData;
// delete the container db file // delete the container db file
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath())); FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
try {
key.close(); key.close();
Assert.fail();
} catch (IOException ioe) {
Assert.assertTrue(ioe.getMessage().contains(
"Requested operation not allowed as ContainerState is UNHEALTHY"));
}
long containerID = omKeyLocationInfo.getContainerID(); long containerID = omKeyLocationInfo.getContainerID();
// Make sure the container is marked unhealthy // Make sure the container is marked unhealthy