HDDS-461. Container remains in CLOSING state in SCM forever. Contributed by Shashikant Banerjee.

This commit is contained in:
Nanda kumar 2018-09-19 21:43:44 +05:30
parent b3c5221f30
commit 61a4b07bda
8 changed files with 175 additions and 161 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -50,7 +52,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@ -206,9 +207,18 @@ public class StateContext {
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
List<GeneratedMessage> reportList = new ArrayList<>();
synchronized (reports) {
return reports.parallelStream().limit(maxLimit)
.collect(Collectors.toList());
if (!reports.isEmpty()) {
int size = reports.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
GeneratedMessage report = reports.poll();
Preconditions.checkNotNull(report);
reportList.add(report);
}
}
return reportList;
}
}
@ -254,9 +264,20 @@ public class StateContext {
* @return List<ContainerAction>
*/
public List<ContainerAction> getPendingContainerAction(int maxLimit) {
List<ContainerAction> containerActionList = new ArrayList<>();
synchronized (containerActions) {
return containerActions.parallelStream().limit(maxLimit)
.collect(Collectors.toList());
if (!containerActions.isEmpty()) {
int size = containerActions.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
// we need to remove the action from the containerAction queue
// as well
ContainerAction action = containerActions.poll();
Preconditions.checkNotNull(action);
containerActionList.add(action);
}
}
return containerActionList;
}
}
@ -295,9 +316,16 @@ public class StateContext {
* @return List<ContainerAction>
*/
public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
List<PipelineAction> pipelineActionList = new ArrayList<>();
synchronized (pipelineActions) {
return pipelineActions.parallelStream().limit(maxLimit)
.collect(Collectors.toList());
if (!pipelineActions.isEmpty()) {
int size = pipelineActions.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
pipelineActionList.add(pipelineActions.poll());
}
}
return pipelineActionList;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -89,9 +90,21 @@ public class CloseContainerCommandHandler implements CommandHandler {
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType, pipelineID);
cmdExecuted = true;
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);
if (e instanceof NotLeaderException) {
// If the particular datanode is not the Ratis leader, the close
// container command will not be executed by the follower but will be
// executed by Ratis stateMachine transactions via leader to follower.
// There can also be case where the datanode is in candidate state.
// In these situations, NotLeaderException is thrown. Remove the status
// from cmdStatus Map here so that it will be retried only by SCM if the
// leader could not not close the container after a certain time.
context.removeCommandStatus(containerID);
LOG.info(e.getLocalizedMessage());
} else {
LOG.error("Can't close container " + containerID, e);
cmdExecuted = false;
}
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
long endTime = Time.monotonicNow();

View File

@ -21,12 +21,15 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -128,8 +131,13 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
@Override
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
HddsProtos.PipelineID pipelineID) throws IOException {
ContainerProtos.ContainerCommandResponseProto response =
storageContainer.dispatch(request);
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
throw new StorageContainerException(response.getMessage(),
response.getResult());
}
}
@Override

View File

@ -74,7 +74,6 @@ import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -319,39 +318,35 @@ public final class XceiverServerRatis implements XceiverServerSpi {
return server;
}
private void processReply(RaftClientReply reply) {
private void processReply(RaftClientReply reply) throws IOException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
// and will eventually be executed once the request comnes via the leader
// and will eventually be executed once the request comes via the leader
// node.
NotLeaderException notLeaderException = reply.getNotLeaderException();
if (notLeaderException != null) {
LOG.info(reply.getNotLeaderException().getLocalizedMessage());
throw notLeaderException;
}
StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
// In case the request could not be completed, StateMachine Exception
// will be thrown. For now, Just log the message.
// If the container could not be closed, SCM will come to know
// via containerReports. CloseContainer should be re tried via SCM.
LOG.error(stateMachineException.getLocalizedMessage());
throw stateMachineException;
}
}
@Override
public void submitRequest(
ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
throws IOException {
// ReplicationLevel.MAJORITY ensures the transactions corresponding to
// the request here are applied on all the raft servers.
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) throws IOException {
RaftClientReply reply;
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
RaftClientRequest.writeRequestType(replicationLevel));
CompletableFuture<RaftClientReply> reply =
server.submitClientRequestAsync(raftClientRequest);
reply.thenAccept(this::processReply);
try {
reply = server.submitClientRequestAsync(raftClientRequest).get();
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
processReply(reply);
}
private RaftClientRequest createRaftClientRequest(

View File

@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
/**
* Ozone main class sets up the network server and initializes the container
* Ozone main class sets up the network servers and initializes the container
* layer.
*/
public class OzoneContainer {
@ -64,7 +67,7 @@ public class OzoneContainer {
private final OzoneConfiguration config;
private final VolumeSet volumeSet;
private final ContainerSet containerSet;
private final XceiverServerSpi[] server;
private final Map<ReplicationType, XceiverServerSpi> servers;
/**
* Construct OzoneContainer object.
@ -82,14 +85,13 @@ public class OzoneContainer {
buildContainerSet();
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
context);
server = new XceiverServerSpi[]{
new XceiverServerGrpc(datanodeDetails, this.config, this
.hddsDispatcher, createReplicationService()),
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
.config, hddsDispatcher, context)
};
servers = new HashMap<>();
servers.put(ReplicationType.STAND_ALONE,
new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
createReplicationService()));
servers.put(ReplicationType.RATIS, XceiverServerRatis
.newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
context));
}
private GrpcReplicationService createReplicationService() {
@ -133,7 +135,7 @@ public class OzoneContainer {
*/
public void start() throws IOException {
LOG.info("Attempting to start container services.");
for (XceiverServerSpi serverinstance : server) {
for (XceiverServerSpi serverinstance : servers.values()) {
serverinstance.start();
}
hddsDispatcher.init();
@ -145,7 +147,7 @@ public class OzoneContainer {
public void stop() {
//TODO: at end of container IO integration work.
LOG.info("Attempting to stop container services.");
for(XceiverServerSpi serverinstance: server) {
for(XceiverServerSpi serverinstance: servers.values()) {
serverinstance.stop();
}
hddsDispatcher.shutdown();
@ -169,7 +171,7 @@ public class OzoneContainer {
public PipelineReportsProto getPipelineReport() {
PipelineReportsProto.Builder pipelineReportsProto =
PipelineReportsProto.newBuilder();
for (XceiverServerSpi serverInstance : server) {
for (XceiverServerSpi serverInstance : servers.values()) {
pipelineReportsProto
.addAllPipelineReport(serverInstance.getPipelineReport());
}
@ -181,82 +183,38 @@ public class OzoneContainer {
* @param request
* @param replicationType
* @param pipelineID
* @throws IOException
*/
public void submitContainerRequest(
ContainerProtos.ContainerCommandRequestProto request,
HddsProtos.ReplicationType replicationType,
HddsProtos.PipelineID pipelineID) throws IOException {
XceiverServerSpi serverInstance;
long containerId = getContainerIdForCmd(request);
if (replicationType == HddsProtos.ReplicationType.RATIS) {
serverInstance = getRatisSerer();
Preconditions.checkNotNull(serverInstance);
serverInstance.submitRequest(request, pipelineID);
LOG.info("submitting {} request over RATIS server for container {}",
request.getCmdType(), containerId);
} else {
serverInstance = getStandaAloneSerer();
Preconditions.checkNotNull(serverInstance);
getStandaAloneSerer().submitRequest(request, pipelineID);
LOG.info(
"submitting {} request over STAND_ALONE server for container {}",
request.getCmdType(), containerId);
ReplicationType replicationType,
PipelineID pipelineID) throws IOException {
if (containerSet.getContainer(request.getContainerID())
.getContainerData().isClosed()) {
LOG.debug("Container {} is already closed", request.getContainerID());
// It might happen that the where the first attempt of closing the
// container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
// retry to check the container got really closed via Ratis.
// In such cases of the retry attempt, if the container is already closed
// via Ratis, we should just return.
}
LOG.info("submitting {} request over {} server for container {}",
request.getCmdType(), replicationType, request.getContainerID());
Preconditions.checkState(servers.containsKey(replicationType));
servers.get(replicationType).submitRequest(request, pipelineID);
}
private long getContainerIdForCmd(
ContainerProtos.ContainerCommandRequestProto request)
throws IllegalArgumentException {
ContainerProtos.Type type = request.getCmdType();
switch (type) {
case CloseContainer:
return request.getContainerID();
// Right now, we handle only closeContainer via queuing it over the
// over the XceiVerServer. For all other commands we throw Illegal
// argument exception here. Will need to extend the switch cases
// in case we want add another commands here.
default:
throw new IllegalArgumentException("Cmd " + request.getCmdType()
+ " not supported over HearBeat Response");
}
}
private XceiverServerSpi getRatisSerer() {
for (XceiverServerSpi serverInstance : server) {
if (serverInstance instanceof XceiverServerRatis) {
return serverInstance;
}
}
return null;
}
private XceiverServerSpi getStandaAloneSerer() {
for (XceiverServerSpi serverInstance : server) {
if (!(serverInstance instanceof XceiverServerRatis)) {
return serverInstance;
}
}
return null;
}
private int getPortbyType(HddsProtos.ReplicationType replicationType) {
for (XceiverServerSpi serverinstance : server) {
if (serverinstance.getServerType() == replicationType) {
return serverinstance.getIPCPort();
}
}
return INVALID_PORT;
private int getPortByType(ReplicationType replicationType) {
return servers.containsKey(replicationType) ?
servers.get(replicationType).getIPCPort() : INVALID_PORT;
}
/**
* Returns the container server IPC port.
* Returns the container servers IPC port.
*
* @return Container server IPC port.
* @return Container servers IPC port.
*/
public int getContainerServerPort() {
return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
return getPortByType(ReplicationType.STAND_ALONE);
}
/**
@ -265,7 +223,7 @@ public class OzoneContainer {
* @return Ratis port.
*/
public int getRatisContainerServerPort() {
return getPortbyType(HddsProtos.ReplicationType.RATIS);
return getPortByType(ReplicationType.RATIS);
}
/**

View File

@ -30,25 +30,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
public class CloseContainerCommand
extends SCMCommand<CloseContainerCommandProto> {
private long containerID;
private HddsProtos.ReplicationType replicationType;
private PipelineID pipelineID;
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType,
PipelineID pipelineID) {
super();
this.containerID = containerID;
this.replicationType = replicationType;
this.pipelineID = pipelineID;
}
// Should be called only for protobuf conversion
private CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType,
PipelineID pipelineID, long id) {
super(id);
this.containerID = containerID;
super(containerID);
this.replicationType = replicationType;
this.pipelineID = pipelineID;
}
@ -75,7 +63,7 @@ public class CloseContainerCommand
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID)
.setContainerID(getId())
.setCmdId(getId())
.setReplicationType(replicationType)
.setPipelineID(pipelineID.getProtobuf())
@ -85,13 +73,12 @@ public class CloseContainerCommand
public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID(),
return new CloseContainerCommand(closeContainerProto.getCmdId(),
closeContainerProto.getReplicationType(),
PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
closeContainerProto.getCmdId());
PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
}
public long getContainerID() {
return containerID;
return getId();
}
}

View File

@ -17,10 +17,10 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
@ -57,7 +57,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
ContainerWithPipeline containerWithPipeline = null;
ContainerWithPipeline containerWithPipeline;
ContainerInfo info;
try {
containerWithPipeline =
@ -74,42 +74,66 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
return;
}
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
for (DatanodeDetails datanode :
containerWithPipeline.getPipeline().getMachines()) {
CommandForDatanode closeContainerCommand = new CommandForDatanode<>(
datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
info.getReplicationType(), info.getPipelineID()));
publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
CloseContainerRetryableReq(containerID));
}
try {
// Finalize event will make sure the state of the container transitions
// from OPEN to CLOSING in containerStateManager.
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.FINALIZE);
} catch (IOException ex) {
LOG.error("Failed to update the container state to FINALIZE for"
+ "container : {}" + containerID, ex);
}
} else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) {
try {
// Create event will make sure the state of the container transitions
// from OPEN to CREATING in containerStateManager, this will move
// the container out of active allocation path.
HddsProtos.LifeCycleState state = info.getState();
try {
switch (state) {
case ALLOCATED:
// We cannot close a container in ALLOCATED state, moving the
// container to CREATING state, this should eventually
// timeout and the container will be moved to DELETING state.
LOG.debug("Closing container {} in {} state", containerID, state);
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.CREATE);
} catch (IOException ex) {
LOG.error("Failed to update the container state to CREATE for"
+ "container:{}" + containerID, ex);
break;
case CREATING:
// We cannot close a container in CREATING state, it will eventually
// timeout and moved to DELETING state.
LOG.debug("Closing container {} in {} state", containerID, state);
break;
case OPEN:
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.FINALIZE);
fireCloseContainerEvents(containerWithPipeline, info, publisher);
break;
case CLOSING:
fireCloseContainerEvents(containerWithPipeline, info, publisher);
break;
case CLOSED:
case DELETING:
case DELETED:
LOG.info(
"container with id : {} is in {} state and need not be closed.",
containerID.getId(), info.getState());
break;
default:
throw new IOException(
"Invalid container state for container " + containerID);
}
} else {
LOG.info("container with id : {} is in {} state and need not be closed.",
containerID.getId(), info.getState());
} catch (IOException ex) {
LOG.error("Failed to update the container state for" + "container : {}"
+ containerID, ex);
}
}
private void fireCloseContainerEvents(
ContainerWithPipeline containerWithPipeline, ContainerInfo info,
EventPublisher publisher) {
ContainerID containerID = info.containerID();
// fire events.
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(containerID.getId(),
info.getReplicationType(), info.getPipelineID());
Pipeline pipeline = containerWithPipeline.getPipeline();
pipeline.getMachines().stream().map(
datanode -> new CommandForDatanode<>(datanode.getUuid(),
closeContainerCommand)).forEach((command) -> {
publisher.fireEvent(DATANODE_COMMAND, command);
});
publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
new CloseContainerRetryableReq(containerID));
LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
pipeline, containerID);
}
/**

View File

@ -132,7 +132,6 @@ public class TestCloseContainerByPipeline {
// Make sure the closeContainerCommandHandler is Invoked
Assert.assertTrue(
closeContainerHandler.getInvocationCount() > lastInvocationCount);
}
@Test
@ -190,6 +189,7 @@ public class TestCloseContainerByPipeline {
Assert.assertFalse((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
logCapturer.stopCapturing();
}
@Test
@ -239,13 +239,14 @@ public class TestCloseContainerByPipeline {
Assert.assertTrue(isContainerClosed(cluster,
containerID, datanodeDetails));
}
// Make sure it was really closed via Ratis not STAND_ALONE server
Assert.assertFalse(logCapturer.getOutput().contains(
"submitting CloseContainer request over STAND_ALONE "
+ "server for container " + containerID));
// Make sure it was really closed via StandAlone not Ratis server
Assert.assertTrue((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
logCapturer.stopCapturing();
}
private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,