diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java index 50226182622..5f2fe266e77 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java @@ -186,15 +186,17 @@ public class XceiverClient extends XceiverClientSpi { /** * Create a pipeline. - * - * @param ignored - pipeline to be created. */ @Override - public void createPipeline(Pipeline ignored) + public void createPipeline() throws IOException { // For stand alone pipeline, there is no notion called setup pipeline. } + public void destroyPipeline() { + // For stand alone pipeline, there is no notion called destroy pipeline. + } + /** * Returns pipeline Type. * diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 1622ddbf054..3cdbc7cc998 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -216,15 +216,16 @@ public class XceiverClientGrpc extends XceiverClientSpi { /** * Create a pipeline. - * - * @param ignored - pipeline to be created. */ @Override - public void createPipeline(Pipeline ignored) - throws IOException { + public void createPipeline() { // For stand alone pipeline, there is no notion called setup pipeline. } + public void destroyPipeline() { + // For stand alone pipeline, there is no notion called destroy pipeline. + } + /** * Returns pipeline Type. * diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 2cb319f1d4a..499f94d2baf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -88,13 +88,27 @@ public final class XceiverClientRatis extends XceiverClientSpi { /** * {@inheritDoc} */ - public void createPipeline(Pipeline pipeline) + public void createPipeline() throws IOException { RaftGroupId groupId = pipeline.getId().getRaftGroupID(); RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines()); LOG.debug("initializing pipeline:{} with nodes:{}", pipeline.getId(), group.getPeers()); - reinitialize(pipeline.getMachines(), group); + reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group); + } + + /** + * {@inheritDoc} + */ + public void destroyPipeline() + throws IOException { + RaftGroupId groupId = pipeline.getId().getRaftGroupID(); + RaftGroup currentGroup = + RatisHelper.newRaftGroup(groupId, pipeline.getMachines()); + LOG.debug("destroying pipeline:{} with nodes:{}", + pipeline.getId(), currentGroup.getPeers()); + reinitialize(pipeline.getMachines(), currentGroup, + RatisHelper.emptyRaftGroup()); } /** @@ -107,8 +121,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { return HddsProtos.ReplicationType.RATIS; } - private void reinitialize(List datanodes, RaftGroup group) - throws IOException { + private void reinitialize(List datanodes, RaftGroup oldGroup, + RaftGroup newGroup) throws IOException { if (datanodes.isEmpty()) { return; } @@ -116,7 +130,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { IOException exception = null; for (DatanodeDetails d : datanodes) { try { - reinitialize(d, group); + reinitialize(d, oldGroup, newGroup); } catch (IOException ioe) { if (exception == null) { exception = new IOException( @@ -135,14 +149,18 @@ public final class XceiverClientRatis extends XceiverClientSpi { * Adds a new peers to the Ratis Ring. * * @param datanode - new datanode - * @param group - Raft group + * @param oldGroup - previous Raft group + * @param newGroup - new Raft group * @throws IOException - on Failure. */ - private void reinitialize(DatanodeDetails datanode, RaftGroup group) + private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup, + RaftGroup newGroup) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(datanode); - try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { - client.reinitialize(group, p.getId()); + try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ? + RatisHelper.newRaftClient(rpcType, p) : + RatisHelper.newRaftClient(rpcType, p, oldGroup)) { + client.reinitialize(newGroup, p.getId()); } catch (IOException ioe) { LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", p, datanode, ioe); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 8c8cb95d5c5..fed589c81de 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -180,7 +180,7 @@ public class ContainerOperationClient implements ScmClient { // ObjectStageChangeRequestProto.Op.create, // ObjectStageChangeRequestProto.Stage.begin); - client.createPipeline(pipeline); + client.createPipeline(); //storageContainerLocationClient.notifyObjectStageChange( // ObjectStageChangeRequestProto.Type.pipeline, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 3bda29fcabe..4dc7e0a51c3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -56,6 +56,11 @@ public final class HddsConfigKeys { public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT = 20; + public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT = + "hdds.pipeline.action.max.limit"; + public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT = + 20; + // Configuration to allow volume choosing policy. public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY = "hdds.datanode.volume.choosing.policy"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 62d9ef5df59..22ba71409d3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.TimeDuration; import java.util.concurrent.TimeUnit; @@ -57,6 +58,10 @@ public final class ScmConfigKeys { = "dfs.container.ratis.num.write.chunk.threads"; public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT = 60; + public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY + = "dfs.container.ratis.replication.level"; + public static final ReplicationLevel + DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY; public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = "dfs.container.ratis.segment.size"; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = @@ -76,6 +81,12 @@ public final class ScmConfigKeys { DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = + "dfs.ratis.server.failure.duration"; + public static final TimeDuration + DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT = + TimeDuration.valueOf(120, TimeUnit.SECONDS); + // TODO : this is copied from OzoneConsts, may need to move to a better place public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; // 16 MB by default diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index b3b0da26e4e..e8ef5c572c2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -111,10 +111,14 @@ public abstract class XceiverClientSpi implements Closeable { /** * Create a pipeline. - * - * @param pipeline - pipeline to be created. */ - public abstract void createPipeline(Pipeline pipeline) throws IOException; + public abstract void createPipeline() throws IOException; + + /** + * Destroy a pipeline. + * @throws IOException + */ + public abstract void destroyPipeline() throws IOException; /** * Returns pipeline Type. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 0f2b1088489..f07d59955d5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.TimeDuration; /** @@ -214,6 +215,11 @@ public final class OzoneConfigKeys { = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY; public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT; + public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY; + public static final ReplicationLevel + DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT; public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT @@ -237,6 +243,12 @@ public final class OzoneConfigKeys { DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT; + public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = + ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY; + public static final TimeDuration + DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT = + ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT; + public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL = "ozone.web.authentication.kerberos.principal"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 9c25e204252..48fdd6477d2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -48,8 +50,19 @@ public interface RatisHelper { Logger LOG = LoggerFactory.getLogger(RatisHelper.class); static String toRaftPeerIdString(DatanodeDetails id) { - return id.getUuidString() + "_" + - id.getPort(DatanodeDetails.Port.Name.RATIS).getValue(); + return id.getUuidString(); + } + + static UUID toDatanodeId(String peerIdString) { + return UUID.fromString(peerIdString); + } + + static UUID toDatanodeId(RaftPeerId peerId) { + return toDatanodeId(peerId.toString()); + } + + static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) { + return toDatanodeId(RaftPeerId.valueOf(peerId.getId())); } static String toRaftPeerAddressString(DatanodeDetails id) { @@ -117,6 +130,11 @@ public interface RatisHelper { newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); } + static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, + RaftGroup group) { + return newRaftClient(rpcType, leader.getId(), group); + } + static RaftClient newRaftClient( RpcType rpcType, RaftPeerId leader, RaftGroup group) { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 2112ae33074..778d641fb1a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -126,6 +126,15 @@ will use for writing chunks (60 by default). + + dfs.container.ratis.replication.level + MAJORITY + OZONE, RATIS + Replication level to be used by datanode for submitting a + container command to ratis. Available replication levels are ALL and + MAJORTIY, MAJORITY is used as the default replication level. + + dfs.container.ratis.segment.size 1073741824 @@ -154,6 +163,15 @@ OZONE, RATIS, MANAGEMENT The timeout duration for ratis server request. + + dfs.ratis.server.failure.duration + 120s + OZONE, RATIS, MANAGEMENT + The timeout duration for ratis server failure detection, + once the threshold has reached, the ratis state machine will be informed + about the failure in the ratis ring + + hdds.node.report.interval 60000ms @@ -1104,6 +1122,15 @@ + + hdds.pipeline.action.max.limit + 20 + DATANODE + + Maximum number of Pipeline Actions sent by the datanode to SCM in a + single heartbeat. + + hdds.scm.watcher.timeout 10m diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index a342294bd1d..c2d54213476 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -20,6 +20,8 @@ import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto @@ -66,6 +68,7 @@ public class StateContext { private final Configuration conf; private final Queue reports; private final Queue containerActions; + private final Queue pipelineActions; private DatanodeStateMachine.DatanodeStates state; /** @@ -91,6 +94,7 @@ public class StateContext { cmdStatusMap = new ConcurrentHashMap<>(); reports = new LinkedList<>(); containerActions = new LinkedList<>(); + pipelineActions = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); } @@ -256,6 +260,47 @@ public class StateContext { } } + /** + * Add PipelineAction to PipelineAction queue if it's not present. + * + * @param pipelineAction PipelineAction to be added + */ + public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { + synchronized (pipelineActions) { + /** + * If pipelineAction queue already contains entry for the pipeline id + * with same action, we should just return. + * Note: We should not use pipelineActions.contains(pipelineAction) here + * as, pipelineAction has a msg string. So even if two msgs differ though + * action remains same on the given pipeline, it will end up adding it + * multiple times here. + */ + for (PipelineAction pipelineActionIter : pipelineActions) { + if (pipelineActionIter.getAction() == pipelineAction.getAction() + && pipelineActionIter.hasClosePipeline() && pipelineAction + .hasClosePipeline() + && pipelineActionIter.getClosePipeline().getPipelineID() + == pipelineAction.getClosePipeline().getPipelineID()) { + return; + } + } + pipelineActions.add(pipelineAction); + } + } + + /** + * Returns pending PipelineActions from the PipelineAction queue with a + * max limit on list size, or empty list if the queue is empty. + * + * @return List + */ + public List getPendingPipelineAction(int maxLimit) { + synchronized (pipelineActions) { + return pipelineActions.parallelStream().limit(maxLimit) + .collect(Collectors.toList()); + } + } + /** * Returns the next task to get executed by the datanode state machine. * @return A callable that will be executed by the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 020fb714263..5769e6d2f64 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -24,6 +24,10 @@ import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; import org.apache.hadoop.hdds.protocol.proto @@ -57,6 +61,10 @@ import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_CONTAINER_ACTION_MAX_LIMIT; import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT; +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_PIPELINE_ACTION_MAX_LIMIT; +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT; /** * Heartbeat class for SCMs. @@ -70,6 +78,7 @@ public class HeartbeatEndpointTask private DatanodeDetailsProto datanodeDetailsProto; private StateContext context; private int maxContainerActionsPerHB; + private int maxPipelineActionsPerHB; /** * Constructs a SCM heart beat. @@ -83,6 +92,8 @@ public class HeartbeatEndpointTask this.context = context; this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT, HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT); + this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT, + HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT); } /** @@ -121,6 +132,7 @@ public class HeartbeatEndpointTask .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); addContainerActions(requestBuilder); + addPipelineActions(requestBuilder); SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() .sendHeartbeat(requestBuilder.build()); processResponse(reponse, datanodeDetailsProto); @@ -169,6 +181,22 @@ public class HeartbeatEndpointTask } } + /** + * Adds all the pending PipelineActions to the heartbeat. + * + * @param requestBuilder builder to which the report has to be added. + */ + private void addPipelineActions( + SCMHeartbeatRequestProto.Builder requestBuilder) { + List actions = context.getPendingPipelineAction( + maxPipelineActionsPerHB); + if (!actions.isEmpty()) { + PipelineActionsProto pap = PipelineActionsProto.newBuilder() + .addAllPipelineActions(actions) + .build(); + requestBuilder.setPipelineActions(pap); + } + } /** * Returns a builder class for HeartbeatEndpointTask task. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 68d6d5bb144..1636f24a95d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.shaded.com.google.protobuf @@ -42,6 +43,7 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; @@ -115,6 +117,7 @@ public class ContainerStateMachine extends BaseStateMachine { = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; private ThreadPoolExecutor chunkExecutor; + private final XceiverServerRatis ratisServer; private final ConcurrentHashMap> writeChunkFutureMap; private final ConcurrentHashMap stateMachineMap; @@ -124,9 +127,10 @@ public class ContainerStateMachine extends BaseStateMachine { private final CSMMetrics metrics; public ContainerStateMachine(ContainerDispatcher dispatcher, - ThreadPoolExecutor chunkExecutor) { + ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) { this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; + this.ratisServer = ratisServer; this.writeChunkFutureMap = new ConcurrentHashMap<>(); this.stateMachineMap = new ConcurrentHashMap<>(); metrics = CSMMetrics.create(); @@ -400,6 +404,17 @@ public class ContainerStateMachine extends BaseStateMachine { return future; } + @Override + public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { + ratisServer.handleNodeSlowness(group, roleInfoProto); + } + + @Override + public void notifyExtendedNoLeader(RaftGroup group, + RoleInfoProto roleInfoProto) { + ratisServer.handleNoLeader(group, roleInfoProto); + } + @Override public void close() throws IOException { } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 82567225be0..f7753968ddb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -26,9 +26,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server .XceiverServerSpi; import org.apache.ratis.RaftConfigKeys; @@ -43,10 +48,15 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; @@ -59,6 +69,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -81,24 +92,72 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final RaftServer server; private ThreadPoolExecutor chunkExecutor; private ClientId clientId = ClientId.randomId(); + private final StateContext context; + private final ReplicationLevel replicationLevel; + private long nodeFailureTimeoutMs; private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, - ContainerDispatcher dispatcher, Configuration conf) throws IOException { + ContainerDispatcher dispatcher, Configuration conf, StateContext context) + throws IOException { + Objects.requireNonNull(dd, "id == null"); + this.port = port; + RaftProperties serverProperties = newRaftProperties(conf, storageDir); + final int numWriteChunkThreads = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); + chunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.context = context; + this.replicationLevel = + conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); + ContainerStateMachine stateMachine = + new ContainerStateMachine(dispatcher, chunkExecutor, this); + this.server = RaftServer.newBuilder() + .setServerId(RatisHelper.toRaftPeerId(dd)) + .setGroup(RatisHelper.emptyRaftGroup()) + .setProperties(serverProperties) + .setStateMachine(stateMachine) + .build(); + } + + private RaftProperties newRaftProperties(Configuration conf, + String storageDir) { + final RaftProperties properties = new RaftProperties(); + + // Set rpc type final String rpcType = conf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + RaftConfigKeys.Rpc.setType(properties, rpc); + + // set raft segment size final int raftSegmentSize = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties, + SizeInBytes.valueOf(raftSegmentSize)); + + // set raft segment pre-allocated size final int raftSegmentPreallocatedSize = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); + RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.setPreallocatedSize(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + + // Set max write buffer size, which is the scm chunk size final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; - final int numWriteChunkThreads = conf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); + RaftServerConfigKeys.Log.setWriteBufferSize(properties, + SizeInBytes.valueOf(maxChunkSize)); + + // Set the client requestTimeout TimeUnit timeUnit = OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getUnit(); @@ -108,6 +167,10 @@ public final class XceiverServerRatis implements XceiverServerSpi { .getDuration(), timeUnit); final TimeDuration clientRequestTimeout = TimeDuration.valueOf(duration, timeUnit); + RaftClientConfigKeys.Rpc + .setRequestTimeout(properties, clientRequestTimeout); + + // Set the server Request timeout timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT .getUnit(); duration = conf.getTimeDuration( @@ -116,61 +179,44 @@ public final class XceiverServerRatis implements XceiverServerSpi { .getDuration(), timeUnit); final TimeDuration serverRequestTimeout = TimeDuration.valueOf(duration, timeUnit); - - Objects.requireNonNull(dd, "id == null"); - this.port = port; - RaftProperties serverProperties = - newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize, - raftSegmentPreallocatedSize); - setRequestTimeout(serverProperties, clientRequestTimeout, - serverRequestTimeout); - - chunkExecutor = - new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, - 100, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); - ContainerStateMachine stateMachine = - new ContainerStateMachine(dispatcher, chunkExecutor); - this.server = RaftServer.newBuilder() - .setServerId(RatisHelper.toRaftPeerId(dd)) - .setGroup(RatisHelper.emptyRaftGroup()) - .setProperties(serverProperties) - .setStateMachine(stateMachine) - .build(); - } - - private static void setRequestTimeout(RaftProperties serverProperties, - TimeDuration clientRequestTimeout, TimeDuration serverRequestTimeout) { - RaftClientConfigKeys.Rpc - .setRequestTimeout(serverProperties, clientRequestTimeout); RaftServerConfigKeys.Rpc - .setRequestTimeout(serverProperties, serverRequestTimeout); - } + .setRequestTimeout(properties, serverRequestTimeout); - private static RaftProperties newRaftProperties( - RpcType rpc, int port, String storageDir, int scmChunkSize, - int raftSegmentSize, int raftSegmentPreallocatedSize) { - final RaftProperties properties = new RaftProperties(); + // Enable batch append on raft server RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); - RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, - SizeInBytes.valueOf(raftSegmentPreallocatedSize)); - RaftServerConfigKeys.Log.setWriteBufferSize(properties, - SizeInBytes.valueOf(scmChunkSize)); - RaftServerConfigKeys.Log.setPreallocatedSize(properties, - SizeInBytes.valueOf(raftSegmentPreallocatedSize)); - RaftServerConfigKeys.Log.setSegmentSizeMax(properties, - SizeInBytes.valueOf(raftSegmentSize)); - RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); - RaftConfigKeys.Rpc.setType(properties, rpc); + // Set the maximum cache segments RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); - GrpcConfigKeys.setMessageSizeMax(properties, - SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize)); + + // Set the ratis leader election timeout RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(800, TimeUnit.MILLISECONDS)); RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS)); + + // set the node failure timeout + timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT + .getUnit(); + duration = conf.getTimeDuration( + OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, + OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT + .getDuration(), timeUnit); + final TimeDuration nodeFailureTimeout = + TimeDuration.valueOf(duration, timeUnit); + RaftServerConfigKeys.setLeaderElectionTimeout(properties, + nodeFailureTimeout); + RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, + nodeFailureTimeout); + nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); + + // Set the ratis storage directory + RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); + + // For grpc set the maximum message size + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize)); + + // Set the ratis port number if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); } else if (rpc == SupportedRpcType.NETTY) { @@ -181,7 +227,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { public static XceiverServerRatis newXceiverServerRatis( DatanodeDetails datanodeDetails, Configuration ozoneConf, - ContainerDispatcher dispatcher) throws IOException { + ContainerDispatcher dispatcher, StateContext context) throws IOException { final String ratisDir = File.separator + "ratis"; int localPort = ozoneConf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, @@ -226,7 +272,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { datanodeDetails.setPort( DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); return new XceiverServerRatis(datanodeDetails, localPort, storageDir, - dispatcher, ozoneConf); + dispatcher, ozoneConf, context); } @Override @@ -296,7 +342,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { // the request here are applied on all the raft servers. RaftClientRequest raftClientRequest = createRaftClientRequest(request, pipelineID, - RaftClientRequest.writeRequestType(ReplicationLevel.ALL)); + RaftClientRequest.writeRequestType(replicationLevel)); CompletableFuture reply = server.submitClientRequestAsync(raftClientRequest); reply.thenAccept(this::processReply); @@ -309,4 +355,57 @@ public final class XceiverServerRatis implements XceiverServerSpi { PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(), nextCallId(), 0, Message.valueOf(request.toByteString()), type); } + + private void handlePipelineFailure(RaftGroupId groupId, + RoleInfoProto roleInfoProto) { + String msg; + UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf()); + RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); + switch (roleInfoProto.getRole()) { + case CANDIDATE: + msg = datanode + " is in candidate state for " + + roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms"; + break; + case LEADER: + StringBuilder sb = new StringBuilder(); + sb.append(datanode).append(" has not seen follower/s"); + for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo() + .getFollowerInfoList()) { + if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) { + sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId())) + .append(" for ").append(follower.getLastRpcElapsedTimeMs()) + .append("ms"); + } + } + msg = sb.toString(); + break; + default: + LOG.error("unknown state:" + roleInfoProto.getRole()); + throw new IllegalStateException("node" + id + " is in illegal role " + + roleInfoProto.getRole()); + } + + PipelineID pipelineID = PipelineID.valueOf(groupId); + ClosePipelineInfo.Builder closePipelineInfo = + ClosePipelineInfo.newBuilder() + .setPipelineID(pipelineID.getProtobuf()) + .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED) + .setDetailedReason(msg); + + PipelineAction action = PipelineAction.newBuilder() + .setClosePipeline(closePipelineInfo) + .setAction(PipelineAction.Action.CLOSE) + .build(); + context.addPipelineActionIfAbsent(action); + } + + void handleNodeSlowness( + RaftGroup group, RoleInfoProto roleInfoProto) { + handlePipelineFailure(group.getGroupId(), roleInfoProto); + } + + void handleNoLeader( + RaftGroup group, RoleInfoProto roleInfoProto) { + handlePipelineFailure(group.getGroupId(), roleInfoProto); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index b1bf38129c6..72a5804a9ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -84,7 +84,7 @@ public class OzoneContainer { new XceiverServerGrpc(datanodeDetails, this.config, this .hddsDispatcher, createReplicationService()), XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this - .config, hddsDispatcher) + .config, hddsDispatcher, context) }; diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 1a3496de059..0a6934342e4 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -81,6 +81,7 @@ message SCMHeartbeatRequestProto { optional ContainerReportsProto containerReport = 3; optional CommandStatusReportsProto commandStatusReport = 4; optional ContainerActionsProto containerActions = 5; + optional PipelineActionsProto pipelineActions = 6; } /* @@ -162,6 +163,31 @@ message ContainerAction { optional Reason reason = 3; } +message PipelineActionsProto { + repeated PipelineAction pipelineActions = 1; +} + +message ClosePipelineInfo { + enum Reason { + PIPELINE_FAILED = 1; + } + required PipelineID pipelineID = 1; + optional Reason reason = 3; + optional string detailedReason = 4; +} + +message PipelineAction { + enum Action { + CLOSE = 1; + } + + /** + * Action will be used to identify the correct pipeline action. + */ + required Action action = 1; + optional ClosePipelineInfo closePipeline = 2; +} + /** A container report contains the following information. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 8f5d8d61f96..35543397745 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -200,8 +200,7 @@ public class ContainerMapping implements Mapping { Pipeline pipeline; if (contInfo.isContainerOpen()) { // If pipeline with given pipeline Id already exist return it - pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(), - contInfo.getReplicationType()); + pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); if (pipeline == null) { pipeline = pipelineSelector .getReplicationPipeline(contInfo.getReplicationType(), @@ -389,8 +388,7 @@ public class ContainerMapping implements Mapping { .updateContainerState(containerInfo, event); if (!updatedContainer.isContainerOpen()) { Pipeline pipeline = pipelineSelector - .getPipeline(containerInfo.getPipelineID(), - containerInfo.getReplicationType()); + .getPipeline(containerInfo.getPipelineID()); pipelineSelector.closePipelineIfNoOpenContainers(pipeline); } containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); @@ -470,8 +468,7 @@ public class ContainerMapping implements Mapping { return null; } Pipeline pipeline = pipelineSelector - .getPipeline(containerInfo.getPipelineID(), - containerInfo.getReplicationType()); + .getPipeline(containerInfo.getPipelineID()); if (pipeline == null) { pipeline = pipelineSelector .getReplicationPipeline(containerInfo.getReplicationType(), @@ -480,6 +477,24 @@ public class ContainerMapping implements Mapping { return new ContainerWithPipeline(containerInfo, pipeline); } + public void handlePipelineClose(PipelineID pipelineID) { + try { + Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); + if (pipeline != null) { + pipelineSelector.finalizePipeline(pipeline); + } else { + LOG.debug("pipeline:{} not found", pipelineID); + } + } catch (Exception e) { + LOG.info("failed to close pipeline:{}", pipelineID, e); + } + } + + public Set getPipelineOnDatanode( + DatanodeDetails datanodeDetails) { + return pipelineSelector.getPipelineId(datanodeDetails.getUuid()); + } + /** * Process container report from Datanode. *

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 7afed42b884..421d34e7001 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -486,10 +486,9 @@ public class ContainerStateManager implements Closeable { * @throws IOException */ public ContainerWithPipeline getContainer(PipelineSelector selector, - ContainerID containerID) throws IOException { + ContainerID containerID) { ContainerInfo info = containers.getContainerInfo(containerID.getId()); - Pipeline pipeline = selector.getPipeline(info.getPipelineID(), - info.getReplicationType()); + Pipeline pipeline = selector.getPipeline(info.getPipelineID()); return new ContainerWithPipeline(info, pipeline); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index f4b5bb22f6f..1b0c57c3522 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; /** * Mapping class contains the mapping from a name to a pipeline mapping. This is @@ -135,4 +137,16 @@ public interface Mapping extends Closeable { ContainerWithPipeline getMatchingContainerWithPipeline(long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; + + /** + * Handle a pipeline close event. + * @param pipelineID pipeline id + */ + void handlePipelineClose(PipelineID pipelineID); + + /** + * Get set of pipeline for a specific datanode. + * @param datanodeDetails datanode for which pipelines needs to be fetched. + */ + Set getPipelineOnDatanode(DatanodeDetails datanodeDetails); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 9a4f887cd7d..03df8eb0ae6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -72,6 +75,23 @@ public final class SCMEvents { public static final TypedEvent CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class, "Container_Actions"); + + /** + * PipelineActions are sent by Datanode. This event is received by + * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. + */ + public static final TypedEvent + PIPELINE_ACTIONS = new TypedEvent<>(PipelineActionsFromDatanode.class, + "Pipeline_Actions"); + + /** + * Pipeline close event are triggered to close pipeline because of failure, + * stale node, decommissioning etc. + */ + public static final TypedEvent + PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class, + "Pipeline_Close"); + /** * A Command status report will be sent by datanodes. This repoort is received * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. @@ -155,7 +175,7 @@ public final class SCMEvents { */ public static final Event DELETE_BLOCK_STATUS = - new TypedEvent(DeleteBlockCommandStatus.class, + new TypedEvent<>(DeleteBlockCommandStatus.class, "DeleteBlockCommandStatus"); /** @@ -164,7 +184,7 @@ public final class SCMEvents { * deleteTransactionID on SCM. */ public static final Event PENDING_DELETE_STATUS = - new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus"); + new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus"); /** * This is the command for ReplicationManager to handle under/over diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index b37dd93978a..0bd93395c31 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,24 +19,36 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import java.util.Set; + /** * Handles Stale node event. */ public class StaleNodeHandler implements EventHandler { private final Node2ContainerMap node2ContainerMap; + private final Mapping containerManager; - public StaleNodeHandler(Node2ContainerMap node2ContainerMap) { + public StaleNodeHandler(Node2ContainerMap node2ContainerMap, + Mapping containerManager) { this.node2ContainerMap = node2ContainerMap; + this.containerManager = containerManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - //TODO: logic to handle stale node. + Set pipelineIDs = + containerManager.getPipelineOnDatanode(datanodeDetails); + for (PipelineID id : pipelineIDs) { + publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java index 4a7fa81c75b..363ce715566 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.pipelines; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.util.Collections; import java.util.HashSet; @@ -30,8 +30,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE; - /** * This data structure maintains the list of pipelines which the given datanode is a part of. This * information will be added whenever a new pipeline allocation happens. @@ -39,7 +37,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUP *

TODO: this information needs to be regenerated from pipeline reports on SCM restart */ public class Node2PipelineMap { - private final Map> dn2PipelineMap; + private final Map> dn2PipelineMap; /** Constructs a Node2PipelineMap Object. */ public Node2PipelineMap() { @@ -57,20 +55,6 @@ public class Node2PipelineMap { return dn2PipelineMap.containsKey(datanodeID); } - /** - * Insert a new datanode into Node2Pipeline Map. - * - * @param datanodeID -- Datanode UUID - * @param pipelines - set of pipelines. - */ - private void insertNewDatanode(UUID datanodeID, Set pipelines) throws SCMException { - Preconditions.checkNotNull(pipelines); - Preconditions.checkNotNull(datanodeID); - if (dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) { - throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE); - } - } - /** * Removes datanode Entry from the map. * @@ -87,9 +71,10 @@ public class Node2PipelineMap { * @param datanode - UUID * @return Set of pipelines or Null. */ - public Set getPipelines(UUID datanode) { + public Set getPipelines(UUID datanode) { Preconditions.checkNotNull(datanode); - return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> Collections.unmodifiableSet(v)); + final Set s = dn2PipelineMap.get(datanode); + return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); } /** @@ -100,9 +85,8 @@ public class Node2PipelineMap { public synchronized void addPipeline(Pipeline pipeline) { for (DatanodeDetails details : pipeline.getDatanodes().values()) { UUID dnId = details.getUuid(); - dn2PipelineMap - .computeIfAbsent(dnId, k -> Collections.synchronizedSet(new HashSet<>())) - .add(pipeline); + dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>()) + .add(pipeline.getId()); } } @@ -112,13 +96,13 @@ public class Node2PipelineMap { dn2PipelineMap.computeIfPresent( dnId, (k, v) -> { - v.remove(pipeline); + v.remove(pipeline.getId()); return v; }); } } - public Map> getDn2PipelineMap() { + public Map> getDn2PipelineMap() { return Collections.unmodifiableMap(dn2PipelineMap); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java new file mode 100644 index 00000000000..54c240037f9 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineActionsFromDatanode; + +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles pipeline actions from datanode. + */ +public class PipelineActionEventHandler implements + EventHandler { + + public static final Logger LOG = LoggerFactory.getLogger( + PipelineActionEventHandler.class); + + public PipelineActionEventHandler() { + + } + + @Override + public void onMessage(PipelineActionsFromDatanode report, + EventPublisher publisher) { + for (PipelineAction action : report.getReport().getPipelineActionsList()) { + switch (action.getAction()) { + case CLOSE: + PipelineID pipelineID = PipelineID. + getFromProtobuf(action.getClosePipeline().getPipelineID()); + publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID); + break; + default: + LOG.error("unknown pipeline action:{}" + action.getAction()); + } + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java new file mode 100644 index 00000000000..733dec56339 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Handles pipeline close event. + */ +public class PipelineCloseHandler implements EventHandler { + private final Mapping mapping; + public PipelineCloseHandler(Mapping mapping) { + this.mapping = mapping; + } + + @Override + public void onMessage(PipelineID pipelineID, EventPublisher publisher) { + mapping.handlePipelineClose(pipelineID); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 5b1a7f704c8..102df8a9e8c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipelines; import java.util.LinkedList; import java.util.Map; -import java.util.WeakHashMap; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; @@ -43,11 +42,12 @@ public abstract class PipelineManager { private final AtomicInteger pipelineIndex; private final Node2PipelineMap node2PipelineMap; - public PipelineManager(Node2PipelineMap map) { + public PipelineManager(Node2PipelineMap map, + Map pipelineMap) { activePipelines = new LinkedList<>(); pipelineIndex = new AtomicInteger(0); - pipelineMap = new WeakHashMap<>(); - node2PipelineMap = map; + this.pipelineMap = pipelineMap; + this.node2PipelineMap = map; } /** @@ -187,7 +187,7 @@ public abstract class PipelineManager { * * @param pipeline */ - public void closePipeline(Pipeline pipeline) { + public void closePipeline(Pipeline pipeline) throws IOException { pipelineMap.remove(pipeline.getId()); node2PipelineMap.removePipeline(pipeline); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index b02beb3e753..63afbaa9331 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -55,6 +55,8 @@ import java.util.HashSet; import java.util.List; import java.util.NavigableSet; import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -77,6 +79,7 @@ public class PipelineSelector { private final StandaloneManagerImpl standaloneManager; private final long containerSize; private final Node2PipelineMap node2PipelineMap; + private final Map pipelineMap; private final LeaseManager pipelineLeaseManager; private final StateMachine stateMachine; @@ -99,12 +102,13 @@ public class PipelineSelector { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); node2PipelineMap = new Node2PipelineMap(); + pipelineMap = new ConcurrentHashMap<>(); this.standaloneManager = new StandaloneManagerImpl(this.nodeManager, placementPolicy, - containerSize, node2PipelineMap); + containerSize, node2PipelineMap, pipelineMap); this.ratisManager = new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, - conf, node2PipelineMap); + conf, node2PipelineMap, pipelineMap); // Initialize the container state machine. Set finalStates = new HashSet(); long pipelineCreationLeaseTimeout = conf.getTimeDuration( @@ -303,19 +307,10 @@ public class PipelineSelector { } /** - * This function to return pipeline for given pipeline name and replication - * type. + * This function to return pipeline for given pipeline id. */ - public Pipeline getPipeline(PipelineID pipelineID, - ReplicationType replicationType) throws IOException { - if (pipelineID == null) { - return null; - } - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Getting replication pipeline forReplicationType {} :" + - " pipelineName:{}", replicationType, pipelineID); - return manager.getPipeline(pipelineID); + public Pipeline getPipeline(PipelineID pipelineID) { + return pipelineMap.get(pipelineID); } /** @@ -324,9 +319,18 @@ public class PipelineSelector { public void finalizePipeline(Pipeline pipeline) throws IOException { PipelineManager manager = getPipelineManager(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId()); + if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING || + pipeline.getLifeCycleState() == LifeCycleState.CLOSED) { + LOG.debug("pipeline:{} already in closing state, skipping", + pipeline.getId()); + // already in closing/closed state + return; + } + // Remove the pipeline from active allocation manager.finalizePipeline(pipeline); + + LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId()); updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE); closePipelineIfNoOpenContainers(pipeline); } @@ -350,7 +354,7 @@ public class PipelineSelector { /** * Close a given pipeline. */ - private void closePipeline(Pipeline pipeline) { + private void closePipeline(Pipeline pipeline) throws IOException { PipelineManager manager = getPipelineManager(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId()); @@ -400,14 +404,8 @@ public class PipelineSelector { return node2PipelineMap; } - public void removePipeline(UUID dnId) { - Set pipelineSet = - node2PipelineMap.getPipelines(dnId); - for (Pipeline pipeline : pipelineSet) { - getPipelineManager(pipeline.getType()) - .closePipeline(pipeline); - } - node2PipelineMap.removeDatanode(dnId); + public Set getPipelineId(UUID dnId) { + return node2PipelineMap.getPipelines(dnId); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index 8b144834ef8..150802e2b18 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.Map; /** * Implementation of {@link PipelineManager}. @@ -59,8 +60,8 @@ public class RatisManagerImpl extends PipelineManager { */ public RatisManagerImpl(NodeManager nodeManager, ContainerPlacementPolicy placementPolicy, long size, Configuration conf, - Node2PipelineMap map) { - super(map); + Node2PipelineMap map, Map pipelineMap) { + super(map, pipelineMap); this.conf = conf; this.nodeManager = nodeManager; ratisMembers = new HashSet<>(); @@ -101,20 +102,23 @@ public class RatisManagerImpl extends PipelineManager { //TODO:move the initialization from SCM to client try (XceiverClientRatis client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(pipeline); + client.createPipeline(); } } /** * Close the pipeline. */ - public void closePipeline(Pipeline pipeline) { + public void closePipeline(Pipeline pipeline) throws IOException { super.closePipeline(pipeline); for (DatanodeDetails node : pipeline.getMachines()) { // A node should always be the in ratis members list. Preconditions.checkArgument(ratisMembers.remove(node)); } - //TODO: should the raft ring also be destroyed as well? + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.destroyPipeline(); + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index f1b23f50eff..2573b9c480f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.Map; /** * Standalone Manager Impl to prove that pluggable interface @@ -58,8 +59,8 @@ public class StandaloneManagerImpl extends PipelineManager { */ public StandaloneManagerImpl(NodeManager nodeManager, ContainerPlacementPolicy placementPolicy, long containerSize, - Node2PipelineMap map) { - super(map); + Node2PipelineMap map, Map pipelineMap) { + super(map, pipelineMap); this.nodeManager = nodeManager; this.placementPolicy = placementPolicy; this.containerSize = containerSize; @@ -103,7 +104,7 @@ public class StandaloneManagerImpl extends PipelineManager { /** * Close the pipeline. */ - public void closePipeline(Pipeline pipeline) { + public void closePipeline(Pipeline pipeline) throws IOException { super.closePipeline(pipeline); for (DatanodeDetails node : pipeline.getMachines()) { // A node should always be the in standalone members list. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index c2591418e94..a651f62371a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; import org.apache.hadoop.hdds.protocol.proto. @@ -43,6 +45,8 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS; + /** * This class is responsible for dispatching heartbeat from datanode to * appropriate EventHandler at SCM. @@ -99,6 +103,13 @@ public final class SCMDatanodeHeartbeatDispatcher { heartbeat.getContainerActions())); } + if (heartbeat.hasPipelineActions()) { + LOG.debug("Dispatching Pipeline Actions."); + eventPublisher.fireEvent(PIPELINE_ACTIONS, + new PipelineActionsFromDatanode(datanodeDetails, + heartbeat.getPipelineActions())); + } + if (heartbeat.hasCommandStatusReport()) { eventPublisher.fireEvent(CMD_STATUS_REPORT, new CommandStatusReportFromDatanode(datanodeDetails, @@ -167,6 +178,18 @@ public final class SCMDatanodeHeartbeatDispatcher { } } + /** + * Pipeline action event payload with origin. + */ + public static class PipelineActionsFromDatanode + extends ReportFromDatanode { + + public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails, + PipelineActionsProto actions) { + super(datanodeDetails, actions); + } + } + /** * Container report event payload with origin. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 061ff7855ef..b84f399eec1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -62,6 +62,8 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler; +import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdfs.DFSUtil; @@ -218,7 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl new CommandStatusReportHandler(); NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); - StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); + StaleNodeHandler staleNodeHandler = + new StaleNodeHandler(node2ContainerMap, scmContainerManager); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap, getScmContainerManager().getStateManager()); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); @@ -229,6 +232,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl new ContainerReportHandler(scmContainerManager, node2ContainerMap, replicationStatus); + PipelineActionEventHandler pipelineActionEventHandler = + new PipelineActionEventHandler(); + + PipelineCloseHandler pipelineCloseHandler = + new PipelineCloseHandler(scmContainerManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); @@ -242,6 +250,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus); eventQueue .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); + eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, + pipelineActionEventHandler); + eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index c0cd2935129..b8cb99701bc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; @@ -97,10 +97,10 @@ public class TestNode2PipelineMap { Assert.assertEquals(3, dns.size()); // get pipeline details by dnid - Set pipelines = mapping.getPipelineSelector() + Set pipelines = mapping.getPipelineSelector() .getNode2PipelineMap().getPipelines(dns.get(0).getUuid()); Assert.assertEquals(1, pipelines.size()); - pipelines.forEach(p -> Assert.assertEquals(p.getId(), + pipelines.forEach(p -> Assert.assertEquals(p, ratisContainer.getPipeline().getId())); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java new file mode 100644 index 00000000000..7e3969c52bc --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers + .ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationType.RATIS; + +/** + * Test Node failure detection and handling in Ratis. + */ +public class TestNodeFailure { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static ContainerWithPipeline ratisContainer1; + private static ContainerWithPipeline ratisContainer2; + private static ContainerMapping mapping; + private static long timeForFailure; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, + 10, TimeUnit.SECONDS); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, + 10, TimeUnit.SECONDS); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(6) + .setHbInterval(1000) + .setHbProcessorInterval(1000) + .build(); + cluster.waitForClusterToBeReady(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + mapping = (ContainerMapping)scm.getScmContainerManager(); + ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner"); + ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner"); + // At this stage, there should be 2 pipeline one with 1 open container each. + // Try closing the both the pipelines, one with a closed container and + // the other with an open container. + timeForFailure = conf.getTimeDuration( + OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, + OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT + .getDuration(), TimeUnit.MILLISECONDS); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testPipelineFail() throws InterruptedException, IOException, + TimeoutException { + Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(), + HddsProtos.LifeCycleState.OPEN); + Pipeline pipelineToFail = ratisContainer1.getPipeline(); + DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0); + cluster.shutdownHddsDatanode(dnToFail); + + // wait for sufficient time for the callback to be triggered + Thread.sleep(3 * timeForFailure); + + Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED, + ratisContainer1.getPipeline().getLifeCycleState()); + Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, + ratisContainer2.getPipeline().getLifeCycleState()); + Assert.assertNull( + mapping.getPipelineSelector().getPipeline(pipelineToFail.getId())); + // Now restart the datanode and make sure that a new pipeline is created. + cluster.restartHddsDatanode(dnToFail); + ContainerWithPipeline ratisContainer3 = + mapping.allocateContainer(RATIS, THREE, "testOwner"); + //Assert that new container is not created from the ratis 2 pipeline + Assert.assertNotEquals(ratisContainer3.getPipeline().getId(), + ratisContainer2.getPipeline().getId()); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index eb15396c69e..0f8f925475e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -112,8 +112,7 @@ public class TestPipelineClose { pipelineSelector.finalizePipeline(ratisContainer1.getPipeline()); Pipeline pipeline1 = pipelineSelector - .getPipeline(ratisContainer1.getPipeline().getId(), - ratisContainer1.getContainerInfo().getReplicationType()); + .getPipeline(ratisContainer1.getPipeline().getId()); Assert.assertNull(pipeline1); Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(), HddsProtos.LifeCycleState.CLOSED); @@ -140,8 +139,7 @@ public class TestPipelineClose { Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(), HddsProtos.LifeCycleState.CLOSING); Pipeline pipeline2 = pipelineSelector - .getPipeline(ratisContainer2.getPipeline().getId(), - ratisContainer2.getContainerInfo().getReplicationType()); + .getPipeline(ratisContainer2.getPipeline().getId()); Assert.assertEquals(pipeline2.getLifeCycleState(), HddsProtos.LifeCycleState.CLOSING); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index ae6a91ebbe0..e11cf9bdcf1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.om.OzoneManager; @@ -155,6 +156,13 @@ public interface MiniOzoneCluster { void restartHddsDatanode(int i) throws InterruptedException, TimeoutException; + /** + * Restart a particular HddsDatanode. + * + * @param dn HddsDatanode in the MiniOzoneCluster + */ + void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException, + TimeoutException, IOException; /** * Shutdown a particular HddsDatanode. * @@ -162,6 +170,13 @@ public interface MiniOzoneCluster { */ void shutdownHddsDatanode(int i); + /** + * Shutdown a particular HddsDatanode. + * + * @param dn HddsDatanode in the MiniOzoneCluster + */ + void shutdownHddsDatanode(DatanodeDetails dn) throws IOException; + /** * Shutdown the MiniOzoneCluster. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index e06e2f6c7c2..7b9bb0e808d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -157,6 +157,16 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { return hddsDatanodes; } + private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException { + for (HddsDatanodeService service : hddsDatanodes) { + if (service.getDatanodeDetails().equals(dn)) { + return hddsDatanodes.indexOf(service); + } + } + throw new IOException( + "Not able to find datanode with datanode Id " + dn.getUuid()); + } + @Override public OzoneClient getClient() throws IOException { return OzoneClientFactory.getClient(conf); @@ -242,11 +252,22 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { waitForClusterToBeReady(); } + @Override + public void restartHddsDatanode(DatanodeDetails dn) + throws InterruptedException, TimeoutException, IOException { + restartHddsDatanode(getHddsDatanodeIndex(dn)); + } + @Override public void shutdownHddsDatanode(int i) { hddsDatanodes.get(i).stop(); } + @Override + public void shutdownHddsDatanode(DatanodeDetails dn) throws IOException { + shutdownHddsDatanode(getHddsDatanodeIndex(dn)); + } + @Override public void shutdown() { try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java index 8b324b5341d..b53e6833263 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java @@ -156,7 +156,8 @@ public class TestCSMMetrics { conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); final ContainerDispatcher dispatcher = new TestContainerDispatcher(); - return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher); + return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher, + null); } static void initXceiverServerRatis( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index ebcc9302c30..3abc8f861d3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -138,7 +138,8 @@ public class TestContainerServer { conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); final ContainerDispatcher dispatcher = new TestContainerDispatcher(); - return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher); + return XceiverServerRatis + .newXceiverServerRatis(dn, conf, dispatcher, null); } static void initXceiverServerRatis( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java index 448742eb236..8c83fd3830a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java @@ -69,7 +69,7 @@ public class TestContainerStateMachine { new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy()); private ContainerStateMachine stateMachine = - new ContainerStateMachine(new TestContainerDispatcher(), executor); + new ContainerStateMachine(new TestContainerDispatcher(), executor, null); @Test