diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c780429fdc3..f3e4391046b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -681,6 +681,11 @@ public class ContainerStateMachine extends BaseStateMachine { evictStateMachineCache(); } + @Override + public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) { + ratisServer.handleNodeLogFailure(gid, t); + } + @Override public CompletableFuture notifyInstallSnapshotFromLeader( RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 246d58af201..f6ecb54d9af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -545,18 +545,28 @@ public final class XceiverServerRatis extends XceiverServer { + roleInfoProto.getRole()); } + triggerPipelineClose(groupId, msg, + ClosePipelineInfo.Reason.PIPELINE_FAILED, false); + } + + private void triggerPipelineClose(RaftGroupId groupId, String detail, + ClosePipelineInfo.Reason reasonCode, boolean triggerHB) { PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); ClosePipelineInfo.Builder closePipelineInfo = ClosePipelineInfo.newBuilder() .setPipelineID(pipelineID.getProtobuf()) - .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED) - .setDetailedReason(msg); + .setReason(reasonCode) + .setDetailedReason(detail); PipelineAction action = PipelineAction.newBuilder() .setClosePipeline(closePipelineInfo) .setAction(PipelineAction.Action.CLOSE) .build(); context.addPipelineActionIfAbsent(action); + // wait for the next HB timeout or right away? + if (triggerHB) { + context.getParent().triggerHeartbeat(); + } LOG.debug( "pipeline Action " + action.getAction() + " on pipeline " + pipelineID + ".Reason : " + action.getClosePipeline().getDetailedReason()); @@ -628,4 +638,20 @@ public final class XceiverServerRatis extends XceiverServer { firstTermIndexInLog, groupId); handlePipelineFailure(groupId, roleInfoProto); } + + /** + * Notify the Datanode Ratis endpoint of Ratis log failure. + * Expected to be invoked from the Container StateMachine + * @param groupId the Ratis group/pipeline for which log has failed + * @param t exception encountered at the time of the failure + * + */ + @VisibleForTesting + public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) { + String msg = (t == null) ? "Unspecified failure reported in Ratis log" + : t.getMessage(); + + triggerPipelineClose(groupId, msg, + ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true); + } } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 718a999ee10..73dc1cc5337 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -211,6 +211,7 @@ message PipelineActionsProto { message ClosePipelineInfo { enum Reason { PIPELINE_FAILED = 1; + PIPELINE_LOG_FAILED = 2; } required PipelineID pipelineID = 1; optional Reason reason = 3; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index eb4dba52809..c583559fd3a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; @@ -29,20 +30,27 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftGroupId; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -180,4 +188,77 @@ public class TestPipelineClose { } catch (PipelineNotFoundException e) { } } + + @Test + public void testPipelineCloseWithLogFailure() throws IOException { + + EventQueue eventQ = (EventQueue) scm.getEventQueue(); + PipelineActionHandler pipelineActionTest = + Mockito.mock(PipelineActionHandler.class); + eventQ.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionTest); + ArgumentCaptor actionCaptor = + ArgumentCaptor.forClass(PipelineActionsFromDatanode.class); + + ContainerInfo containerInfo = containerManager + .allocateContainer(RATIS, THREE, "testOwner"); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(containerInfo, + pipelineManager.getPipeline(containerInfo.getPipelineID())); + Pipeline openPipeline = containerWithPipeline.getPipeline(); + RaftGroupId groupId = RaftGroupId.valueOf(openPipeline.getId().getId()); + + try { + pipelineManager.getPipeline(openPipeline.getId()); + } catch (PipelineNotFoundException e) { + Assert.assertTrue("pipeline should exist", false); + } + + DatanodeDetails datanodeDetails = openPipeline.getNodes().get(0); + int index = cluster.getHddsDatanodeIndex(datanodeDetails); + + XceiverServerRatis xceiverRatis = + (XceiverServerRatis) cluster.getHddsDatanodes().get(index) + .getDatanodeStateMachine().getContainer().getWriteChannel(); + + /** + * Notify Datanode Ratis Server endpoint of a Ratis log failure. + * This is expected to trigger an immediate pipeline actions report to SCM + */ + xceiverRatis.handleNodeLogFailure(groupId, null); + + // verify SCM receives a pipeline action report "immediately" + Mockito.verify(pipelineActionTest, Mockito.timeout(100)) + .onMessage( + actionCaptor.capture(), + Mockito.any(EventPublisher.class)); + + PipelineActionsFromDatanode actionsFromDatanode = + actionCaptor.getValue(); + + // match the pipeline id + verifyCloseForPipeline(openPipeline, actionsFromDatanode); + } + + private boolean verifyCloseForPipeline(Pipeline pipeline, + PipelineActionsFromDatanode report) { + UUID uuidToFind = pipeline.getId().getId(); + + boolean found = false; + for (StorageContainerDatanodeProtocolProtos.PipelineAction action : + report.getReport().getPipelineActionsList()) { + if (action.getAction() == + StorageContainerDatanodeProtocolProtos.PipelineAction.Action.CLOSE) { + PipelineID closedPipelineId = PipelineID. + getFromProtobuf(action.getClosePipeline().getPipelineID()); + + if (closedPipelineId.getId().equals(uuidToFind)) { + found = true; + } + } + } + + Assert.assertTrue("SCM did not receive a Close action for the Pipeline", + found); + return found; + } }