HDDS-1603. Handle Ratis Append Failure in Container State Machine. Contributed by Supratim Deka (#1019)
This commit is contained in:
parent
030307226a
commit
ac7a8accdf
|
@ -681,6 +681,11 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|||
evictStateMachineCache();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
|
||||
ratisServer.handleNodeLogFailure(gid, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
|
||||
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
|
||||
|
|
|
@ -545,18 +545,28 @@ public final class XceiverServerRatis extends XceiverServer {
|
|||
+ roleInfoProto.getRole());
|
||||
}
|
||||
|
||||
triggerPipelineClose(groupId, msg,
|
||||
ClosePipelineInfo.Reason.PIPELINE_FAILED, false);
|
||||
}
|
||||
|
||||
private void triggerPipelineClose(RaftGroupId groupId, String detail,
|
||||
ClosePipelineInfo.Reason reasonCode, boolean triggerHB) {
|
||||
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
|
||||
ClosePipelineInfo.Builder closePipelineInfo =
|
||||
ClosePipelineInfo.newBuilder()
|
||||
.setPipelineID(pipelineID.getProtobuf())
|
||||
.setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
|
||||
.setDetailedReason(msg);
|
||||
.setReason(reasonCode)
|
||||
.setDetailedReason(detail);
|
||||
|
||||
PipelineAction action = PipelineAction.newBuilder()
|
||||
.setClosePipeline(closePipelineInfo)
|
||||
.setAction(PipelineAction.Action.CLOSE)
|
||||
.build();
|
||||
context.addPipelineActionIfAbsent(action);
|
||||
// wait for the next HB timeout or right away?
|
||||
if (triggerHB) {
|
||||
context.getParent().triggerHeartbeat();
|
||||
}
|
||||
LOG.debug(
|
||||
"pipeline Action " + action.getAction() + " on pipeline " + pipelineID
|
||||
+ ".Reason : " + action.getClosePipeline().getDetailedReason());
|
||||
|
@ -628,4 +638,20 @@ public final class XceiverServerRatis extends XceiverServer {
|
|||
firstTermIndexInLog, groupId);
|
||||
handlePipelineFailure(groupId, roleInfoProto);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the Datanode Ratis endpoint of Ratis log failure.
|
||||
* Expected to be invoked from the Container StateMachine
|
||||
* @param groupId the Ratis group/pipeline for which log has failed
|
||||
* @param t exception encountered at the time of the failure
|
||||
*
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) {
|
||||
String msg = (t == null) ? "Unspecified failure reported in Ratis log"
|
||||
: t.getMessage();
|
||||
|
||||
triggerPipelineClose(groupId, msg,
|
||||
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,6 +211,7 @@ message PipelineActionsProto {
|
|||
message ClosePipelineInfo {
|
||||
enum Reason {
|
||||
PIPELINE_FAILED = 1;
|
||||
PIPELINE_LOG_FAILED = 2;
|
||||
}
|
||||
required PipelineID pipelineID = 1;
|
||||
optional Reason reason = 3;
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
|
|||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
|
@ -29,20 +30,27 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
|||
import org.apache.hadoop.hdds.scm.container.ContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -180,4 +188,77 @@ public class TestPipelineClose {
|
|||
} catch (PipelineNotFoundException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPipelineCloseWithLogFailure() throws IOException {
|
||||
|
||||
EventQueue eventQ = (EventQueue) scm.getEventQueue();
|
||||
PipelineActionHandler pipelineActionTest =
|
||||
Mockito.mock(PipelineActionHandler.class);
|
||||
eventQ.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionTest);
|
||||
ArgumentCaptor<PipelineActionsFromDatanode> actionCaptor =
|
||||
ArgumentCaptor.forClass(PipelineActionsFromDatanode.class);
|
||||
|
||||
ContainerInfo containerInfo = containerManager
|
||||
.allocateContainer(RATIS, THREE, "testOwner");
|
||||
ContainerWithPipeline containerWithPipeline =
|
||||
new ContainerWithPipeline(containerInfo,
|
||||
pipelineManager.getPipeline(containerInfo.getPipelineID()));
|
||||
Pipeline openPipeline = containerWithPipeline.getPipeline();
|
||||
RaftGroupId groupId = RaftGroupId.valueOf(openPipeline.getId().getId());
|
||||
|
||||
try {
|
||||
pipelineManager.getPipeline(openPipeline.getId());
|
||||
} catch (PipelineNotFoundException e) {
|
||||
Assert.assertTrue("pipeline should exist", false);
|
||||
}
|
||||
|
||||
DatanodeDetails datanodeDetails = openPipeline.getNodes().get(0);
|
||||
int index = cluster.getHddsDatanodeIndex(datanodeDetails);
|
||||
|
||||
XceiverServerRatis xceiverRatis =
|
||||
(XceiverServerRatis) cluster.getHddsDatanodes().get(index)
|
||||
.getDatanodeStateMachine().getContainer().getWriteChannel();
|
||||
|
||||
/**
|
||||
* Notify Datanode Ratis Server endpoint of a Ratis log failure.
|
||||
* This is expected to trigger an immediate pipeline actions report to SCM
|
||||
*/
|
||||
xceiverRatis.handleNodeLogFailure(groupId, null);
|
||||
|
||||
// verify SCM receives a pipeline action report "immediately"
|
||||
Mockito.verify(pipelineActionTest, Mockito.timeout(100))
|
||||
.onMessage(
|
||||
actionCaptor.capture(),
|
||||
Mockito.any(EventPublisher.class));
|
||||
|
||||
PipelineActionsFromDatanode actionsFromDatanode =
|
||||
actionCaptor.getValue();
|
||||
|
||||
// match the pipeline id
|
||||
verifyCloseForPipeline(openPipeline, actionsFromDatanode);
|
||||
}
|
||||
|
||||
private boolean verifyCloseForPipeline(Pipeline pipeline,
|
||||
PipelineActionsFromDatanode report) {
|
||||
UUID uuidToFind = pipeline.getId().getId();
|
||||
|
||||
boolean found = false;
|
||||
for (StorageContainerDatanodeProtocolProtos.PipelineAction action :
|
||||
report.getReport().getPipelineActionsList()) {
|
||||
if (action.getAction() ==
|
||||
StorageContainerDatanodeProtocolProtos.PipelineAction.Action.CLOSE) {
|
||||
PipelineID closedPipelineId = PipelineID.
|
||||
getFromProtobuf(action.getClosePipeline().getPipelineID());
|
||||
|
||||
if (closedPipelineId.getId().equals(uuidToFind)) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue("SCM did not receive a Close action for the Pipeline",
|
||||
found);
|
||||
return found;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue