diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 0535763537c..7b638a3cd8c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine { private final Cache stateMachineDataCache; private final boolean isBlockTokenEnabled; private final TokenVerifier tokenVerifier; - private final AtomicBoolean isStateMachineHealthy; + private final AtomicBoolean stateMachineHealthy; private final Semaphore applyTransactionSemaphore; /** @@ -190,7 +190,7 @@ public class ContainerStateMachine extends BaseStateMachine { ScmConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT); applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); - isStateMachineHealthy = new AtomicBoolean(true); + stateMachineHealthy = new AtomicBoolean(true); this.executors = new ExecutorService[numContainerOpExecutors]; for (int i = 0; i < numContainerOpExecutors; i++) { final int index = i; @@ -271,11 +271,15 @@ public class ContainerStateMachine extends BaseStateMachine { IOUtils.write(builder.build().toByteArray(), out); } + public boolean isStateMachineHealthy() { + return stateMachineHealthy.get(); + } + @Override public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); long startTime = Time.monotonicNow(); - if (!isStateMachineHealthy.get()) { + if (!isStateMachineHealthy()) { String msg = "Failed to take snapshot " + " for " + gid + " as the stateMachine" + " is unhealthy. The last applied index is at " + ti; @@ -731,7 +735,11 @@ public class ContainerStateMachine extends BaseStateMachine { metrics.incPipelineLatency(cmdType, Time.monotonicNowNanos() - startTime); } - if (r.getResult() != ContainerProtos.Result.SUCCESS) { + // ignore close container exception while marking the stateMachine + // unhealthy + if (r.getResult() != ContainerProtos.Result.SUCCESS + && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error( @@ -744,7 +752,7 @@ public class ContainerStateMachine extends BaseStateMachine { // caught in stateMachineUpdater in Ratis and ratis server will // shutdown. applyTransactionFuture.completeExceptionally(sce); - isStateMachineHealthy.compareAndSet(true, false); + stateMachineHealthy.compareAndSet(true, false); ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); } else { LOG.debug( @@ -759,7 +767,7 @@ public class ContainerStateMachine extends BaseStateMachine { // add the entry to the applyTransactionCompletionMap only if the // stateMachine is healthy i.e, there has been no applyTransaction // failures before. - if (isStateMachineHealthy.get()) { + if (isStateMachineHealthy()) { final Long previous = applyTransactionCompletionMap .put(index, trx.getLogEntry().getTerm()); Preconditions.checkState(previous == null); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 7b908151f48..9ac45b88116 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -353,6 +353,71 @@ public class TestContainerStateMachineFailures { Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath())); } + @Test + public void testApplyTransactionIdempotencyWithClosedContainer() + throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE, new HashMap<>()); + // First write and flush creates a container in the datanode + key.write("ratis".getBytes()); + key.flush(); + key.write("ratis".getBytes()); + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + ContainerData containerData = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerData(); + Assert.assertTrue(containerData instanceof KeyValueContainerData); + key.close(); + ContainerStateMachine stateMachine = + (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); + Path parentPath = storage.findLatestSnapshot().getFile().getPath(); + // Since the snapshot threshold is set to 1, since there are + // applyTransactions, we should see snapshots + Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0); + FileInfo snapshot = storage.findLatestSnapshot().getFile(); + Assert.assertNotNull(snapshot); + long containerID = omKeyLocationInfo.getContainerID(); + Pipeline pipeline = cluster.getStorageContainerLocationClient() + .getContainerWithPipeline(containerID).getPipeline(); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + try { + xceiverClient.sendCommand(request.build()); + } catch (IOException e) { + Assert.fail("Exception should not be thrown"); + } + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet().getContainer(containerID) + .getContainerState() + == ContainerProtos.ContainerDataProto.State.CLOSED); + Assert.assertTrue(stateMachine.isStateMachineHealthy()); + try { + stateMachine.takeSnapshot(); + } catch (IOException ioe) { + Assert.fail("Exception should not be thrown"); + } + FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); + Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath())); + } + @Test public void testValidateBCSIDOnDnRestart() throws Exception { OzoneOutputStream key =