diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 00d8bf4a79d..55764e0877a 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -291,6 +291,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds | | `HeartbeatsNumOps` | Total number of heartbeats | | `HeartbeatsAvgTime` | Average heartbeat time in milliseconds | +| `LifelinesNumOps` | Total number of lifeline messages | +| `LifelinesAvgTime` | Average lifeline message processing time in milliseconds | | `BlockReportsNumOps` | Total number of block report operations | | `BlockReportsAvgTime` | Average time of block report operations in milliseconds | | `IncrementalBlockReportsNumOps` | Total number of incremental block report operations | diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 2d7032f974c..5d7bbd95e30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -342,6 +342,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> HdfsServer.proto DatanodeProtocol.proto + DatanodeLifelineProtocol.proto HAZKInfo.proto InterDatanodeProtocol.proto JournalProtocol.proto diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c0a7bd517b9..feb72674e32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -477,6 +477,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; + public static final String DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY = + "dfs.datanode.lifeline.interval.seconds"; public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms"; public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; @@ -487,8 +489,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100; public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count"; public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10; - public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1; - public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count"; + public static final String DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY = + "dfs.namenode.lifeline.handler.ratio"; + public static final float DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT = + 0.1f; + public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = + "dfs.namenode.lifeline.handler.count"; public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count"; public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index f6d8b3eb8a8..22e92df97c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; @@ -570,6 +571,40 @@ public class DFSUtil { return addressList; } + /** + * Returns list of InetSocketAddresses corresponding to lifeline RPC servers + * at namenodes from the configuration. + * + * @param conf configuration + * @return list of InetSocketAddress + * @throws IOException on error + */ + public static Map> + getNNLifelineRpcAddressesForCluster(Configuration conf) + throws IOException { + + Collection parentNameServices = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); + + if (parentNameServices.isEmpty()) { + parentNameServices = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_NAMESERVICES); + } else { + // Ensure that the internal service is indeed in the list of all available + // nameservices. + Set availableNameServices = Sets.newHashSet(conf + .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); + for (String nsId : parentNameServices) { + if (!availableNameServices.contains(nsId)) { + throw new IOException("Unknown nameservice: " + nsId); + } + } + } + + return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null, + DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY); + } + /** * Map a logical namenode ID to its lifeline address. Use the given * nameservice if specified, or the configured one if none is given. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java new file mode 100644 index 00000000000..5c323ebf167 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto; +import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcClientUtil; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link DatanodeLifelineProtocol} interfaces to the RPC server implementing + * {@link DatanodeLifelineProtocolPB}. + */ +@InterfaceAudience.Private +public class DatanodeLifelineProtocolClientSideTranslatorPB implements + ProtocolMetaInterface, DatanodeLifelineProtocol, Closeable { + + /** RpcController is not used and hence is set to null. */ + private static final RpcController NULL_CONTROLLER = null; + + private final DatanodeLifelineProtocolPB rpcProxy; + + public DatanodeLifelineProtocolClientSideTranslatorPB( + InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, DatanodeLifelineProtocolPB.class, + ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + rpcProxy = createNamenode(nameNodeAddr, conf, ugi); + } + + private static DatanodeLifelineProtocolPB createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf, + UserGroupInformation ugi) throws IOException { + return RPC.getProxy(DatanodeLifelineProtocolPB.class, + RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), nameNodeAddr, + ugi, conf, + NetUtils.getSocketFactory(conf, DatanodeLifelineProtocolPB.class)); + } + + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + @Override + public void sendLifeline(DatanodeRegistration registration, + StorageReport[] reports, long cacheCapacity, long cacheUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary) throws IOException { + HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) + .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) + .setFailedVolumes(failedVolumes); + builder.addAllReports(PBHelperClient.convertStorageReports(reports)); + if (cacheCapacity != 0) { + builder.setCacheCapacity(cacheCapacity); + } + if (cacheUsed != 0) { + builder.setCacheUsed(cacheUsed); + } + if (volumeFailureSummary != null) { + builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary( + volumeFailureSummary)); + } + try { + rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build()); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + } + + @Override // ProtocolMetaInterface + public boolean isMethodSupported(String methodName) + throws IOException { + return RpcClientUtil.isMethodSupported(rpcProxy, + DatanodeLifelineProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), methodName); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java new file mode 100644 index 00000000000..a17a6b5649b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; + +/** + * Protocol used by a DataNode to send lifeline messages to a NameNode. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) +@ProtocolInfo( + protocolName = + "org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface DatanodeLifelineProtocolPB extends + DatanodeLifelineProtocolService.BlockingInterface { +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..83119933e23 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.LifelineResponseProto; +import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link DatanodeLifelineProtocolPB} to the + * {@link DatanodeLifelineProtocol} server implementation. + */ +@InterfaceAudience.Private +public class DatanodeLifelineProtocolServerSideTranslatorPB implements + DatanodeLifelineProtocolPB { + + private static final LifelineResponseProto VOID_LIFELINE_RESPONSE_PROTO = + LifelineResponseProto.newBuilder().build(); + + private final DatanodeLifelineProtocol impl; + + public DatanodeLifelineProtocolServerSideTranslatorPB( + DatanodeLifelineProtocol impl) { + this.impl = impl; + } + + @Override + public LifelineResponseProto sendLifeline(RpcController controller, + HeartbeatRequestProto request) throws ServiceException { + try { + final StorageReport[] report = PBHelperClient.convertStorageReports( + request.getReportsList()); + VolumeFailureSummary volumeFailureSummary = + request.hasVolumeFailureSummary() ? + PBHelper.convertVolumeFailureSummary( + request.getVolumeFailureSummary()) : null; + impl.sendLifeline(PBHelper.convert(request.getRegistration()), report, + request.getCacheCapacity(), request.getCacheUsed(), + request.getXmitsInProgress(), request.getXceiverCount(), + request.getFailedVolumes(), volumeFailureSummary); + return VOID_LIFELINE_RESPONSE_PROTO; + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index b41afdb313b..60e6610914f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1474,6 +1474,47 @@ public class DatanodeManager { return new DatanodeCommand[0]; } + /** + * Handles a lifeline message sent by a DataNode. + * + * @param nodeReg registration info for DataNode sending the lifeline + * @param reports storage reports from DataNode + * @param blockPoolId block pool ID + * @param cacheCapacity cache capacity at DataNode + * @param cacheUsed cache used at DataNode + * @param xceiverCount estimated count of transfer threads running at DataNode + * @param maxTransfers count of transfers running at DataNode + * @param failedVolumes count of failed volumes at DataNode + * @param volumeFailureSummary info on failed volumes at DataNode + * @throws IOException if there is an error + */ + public void handleLifeline(DatanodeRegistration nodeReg, + StorageReport[] reports, String blockPoolId, long cacheCapacity, + long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, + VolumeFailureSummary volumeFailureSummary) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Received handleLifeline from nodeReg = " + nodeReg); + } + DatanodeDescriptor nodeinfo = getDatanode(nodeReg); + if (nodeinfo == null) { + // This is null if the DataNode has not yet registered. We expect this + // will never happen, because the DataNode has logic to prevent sending + // lifeline messages until after initial registration is successful. + // Lifeline message handling can't send commands back to the DataNode to + // tell it to register, so simply exit. + return; + } + if (nodeinfo.isDisallowed()) { + // This is highly unlikely, because heartbeat handling is much more + // frequent and likely would have already sent the disallowed error. + // Lifeline messages are not intended to send any kind of control response + // back to the DataNode, so simply exit. + return; + } + heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed, + xceiverCount, failedVolumes, volumeFailureSummary); + } + /** * Convert a CachedBlockList into a DatanodeCommand with a list of blocks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index b8d30437729..cec4a1ad2c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -240,6 +240,20 @@ class HeartbeatManager implements DatanodeStatistics { stats.add(node); } + synchronized void updateLifeline(final DatanodeDescriptor node, + StorageReport[] reports, long cacheCapacity, long cacheUsed, + int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary) { + stats.subtract(node); + // This intentionally calls updateHeartbeatState instead of + // updateHeartbeat, because we don't want to modify the + // heartbeatedSinceRegistration flag. Arrival of a lifeline message does + // not count as arrival of the first heartbeat. + node.updateHeartbeatState(reports, cacheCapacity, cacheUsed, + xceiverCount, failedVolumes, volumeFailureSummary); + stats.add(node); + } + synchronized void startDecommission(final DatanodeDescriptor node) { if (!node.isAlive()) { LOG.info("Dead node {} is decommissioned immediately.", node); @@ -416,4 +430,4 @@ class HeartbeatManager implements DatanodeStatistics { } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index b4d89fc4ebc..5b652321f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -118,17 +118,22 @@ class BPOfferService { mWriteLock.unlock(); } - BPOfferService(List nnAddrs, DataNode dn) { + BPOfferService(List nnAddrs, + List lifelineNnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); + Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(), + "Must pass same number of NN addresses and lifeline addresses."); this.dn = dn; - for (InetSocketAddress addr : nnAddrs) { - this.bpServices.add(new BPServiceActor(addr, this)); + for (int i = 0; i < nnAddrs.size(); ++i) { + this.bpServices.add(new BPServiceActor(nnAddrs.get(i), + lifelineNnAddrs.get(i), this)); } } - void refreshNNList(ArrayList addrs) throws IOException { + void refreshNNList(ArrayList addrs, + ArrayList lifelineAddrs) throws IOException { Set oldAddrs = Sets.newHashSet(); for (BPServiceActor actor : bpServices) { oldAddrs.add(actor.getNNSocketAddress()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 6d42e34cb4d..ebf54963088 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.util.Time.monotonicNow; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; @@ -28,6 +29,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -103,14 +106,20 @@ class BPServiceActor implements Runnable { final LinkedList bpThreadQueue = new LinkedList(); - BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) { + BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr, + BPOfferService bpos) { this.bpos = bpos; this.dn = bpos.getDataNode(); this.nnAddr = nnAddr; + this.lifelineSender = lifelineNnAddr != null ? + new LifelineSender(lifelineNnAddr) : null; + this.initialRegistrationComplete = lifelineNnAddr != null ? + new CountDownLatch(1) : null; this.dnConf = dn.getDnConf(); this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval); prevBlockReportId = ThreadLocalRandom.current().nextLong(); - scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval); + scheduler = new Scheduler(dnConf.heartBeatInterval, + dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval); } public DatanodeRegistration getBpRegistration() { @@ -138,6 +147,9 @@ class BPServiceActor implements Runnable { return nnAddr; } + private final CountDownLatch initialRegistrationComplete; + private final LifelineSender lifelineSender; + /** * Used to inject a spy NN in the unit tests. */ @@ -151,6 +163,20 @@ class BPServiceActor implements Runnable { return bpNamenode; } + /** + * Used to inject a spy NN in the unit tests. + */ + @VisibleForTesting + void setLifelineNameNode( + DatanodeLifelineProtocolClientSideTranslatorPB dnLifelineProtocol) { + lifelineSender.lifelineNamenode = dnLifelineProtocol; + } + + @VisibleForTesting + DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() { + return lifelineSender.lifelineNamenode; + } + /** * Perform the first part of the handshake with the NameNode. * This calls versionRequest to determine the NN's @@ -420,29 +446,39 @@ class BPServiceActor implements Runnable { //Thread is started already return; } - bpThread = new Thread(this, formatThreadName()); + bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr)); bpThread.setDaemon(true); // needed for JUnit testing bpThread.start(); + + if (lifelineSender != null) { + lifelineSender.start(); + } } - private String formatThreadName() { + private String formatThreadName(String action, InetSocketAddress addr) { Collection dataDirs = DataNode.getStorageLocations(dn.getConf()); - return "DataNode: [" + dataDirs.toString() + "] " + - " heartbeating to " + nnAddr; + return "DataNode: [" + dataDirs.toString() + "] " + + action + " to " + addr; } //This must be called only by blockPoolManager. void stop() { shouldServiceRun = false; + if (lifelineSender != null) { + lifelineSender.stop(); + } if (bpThread != null) { - bpThread.interrupt(); + bpThread.interrupt(); } } //This must be called only by blockPoolManager void join() { try { + if (lifelineSender != null) { + lifelineSender.join(); + } if (bpThread != null) { bpThread.join(); } @@ -454,6 +490,7 @@ class BPServiceActor implements Runnable { shouldServiceRun = false; IOUtils.cleanup(null, bpNamenode); + IOUtils.cleanup(null, lifelineSender); bpos.shutdownActor(this); } @@ -480,7 +517,9 @@ class BPServiceActor implements Runnable { + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec" + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec" + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec" - + "; heartBeatInterval=" + dnConf.heartBeatInterval); + + "; heartBeatInterval=" + dnConf.heartBeatInterval + + (lifelineSender != null ? + "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : "")); long fullBlockReportLeaseId = 0; // @@ -684,6 +723,9 @@ class BPServiceActor implements Runnable { } runningState = RunningState.RUNNING; + if (initialRegistrationComplete != null) { + initialRegistrationComplete.countDown(); + } while (shouldRun()) { try { @@ -797,6 +839,135 @@ class BPServiceActor implements Runnable { return scheduler; } + private final class LifelineSender implements Runnable, Closeable { + + private final InetSocketAddress lifelineNnAddr; + private Thread lifelineThread; + private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode; + + public LifelineSender(InetSocketAddress lifelineNnAddr) { + this.lifelineNnAddr = lifelineNnAddr; + } + + @Override + public void close() { + stop(); + try { + join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + IOUtils.cleanup(null, lifelineNamenode); + } + + @Override + public void run() { + // The lifeline RPC depends on registration with the NameNode, so wait for + // initial registration to complete. + while (shouldRun()) { + try { + initialRegistrationComplete.await(); + break; + } catch (InterruptedException e) { + // The only way thread interruption can happen while waiting on this + // latch is if the state of the actor has been updated to signal + // shutdown. The next loop's call to shouldRun() will return false, + // and the thread will finish. + Thread.currentThread().interrupt(); + } + } + + // After initial NameNode registration has completed, execute the main + // loop for sending periodic lifeline RPCs if needed. This is done in a + // second loop to avoid a pointless wait on the above latch in every + // iteration of the main loop. + while (shouldRun()) { + try { + if (lifelineNamenode == null) { + lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr); + } + sendLifelineIfDue(); + Thread.sleep(scheduler.getLifelineWaitTime()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOG.warn("IOException in LifelineSender for " + BPServiceActor.this, + e); + } + } + + LOG.info("LifelineSender for " + BPServiceActor.this + " exiting."); + } + + public void start() { + lifelineThread = new Thread(this, formatThreadName("lifeline", + lifelineNnAddr)); + lifelineThread.setDaemon(true); + lifelineThread.setUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable t) { + LOG.error(thread + " terminating on unexpected exception", t); + } + }); + lifelineThread.start(); + } + + public void stop() { + if (lifelineThread != null) { + lifelineThread.interrupt(); + } + } + + public void join() throws InterruptedException { + if (lifelineThread != null) { + lifelineThread.join(); + } + } + + private void sendLifelineIfDue() throws IOException { + long startTime = scheduler.monotonicNow(); + if (!scheduler.isLifelineDue(startTime)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping sending lifeline for " + BPServiceActor.this + + ", because it is not due."); + } + return; + } + if (dn.areHeartbeatsDisabledForTests()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping sending lifeline for " + BPServiceActor.this + + ", because heartbeats are disabled for tests."); + } + return; + } + sendLifeline(); + dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime); + scheduler.scheduleNextLifeline(scheduler.monotonicNow()); + } + + private void sendLifeline() throws IOException { + StorageReport[] reports = + dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending lifeline with " + reports.length + " storage " + + " reports from service actor: " + BPServiceActor.this); + } + VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() + .getVolumeFailureSummary(); + int numFailedVolumes = volumeFailureSummary != null ? + volumeFailureSummary.getFailedStorageLocations().length : 0; + lifelineNamenode.sendLifeline(bpRegistration, + reports, + dn.getFSDataset().getCacheCapacity(), + dn.getFSDataset().getCacheUsed(), + dn.getXmitsInProgress(), + dn.getXceiverCount(), + numFailedVolumes, + volumeFailureSummary); + } + } + /** * Utility class that wraps the timestamp computations for scheduling * heartbeats and block reports. @@ -811,6 +982,9 @@ class BPServiceActor implements Runnable { @VisibleForTesting volatile long nextHeartbeatTime = monotonicNow(); + @VisibleForTesting + volatile long nextLifelineTime = monotonicNow(); + @VisibleForTesting boolean resetBlockReportTime = true; @@ -818,10 +992,13 @@ class BPServiceActor implements Runnable { new AtomicBoolean(false); private final long heartbeatIntervalMs; + private final long lifelineIntervalMs; private final long blockReportIntervalMs; - Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) { + Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs, + long blockReportIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; + this.lifelineIntervalMs = lifelineIntervalMs; this.blockReportIntervalMs = blockReportIntervalMs; } @@ -835,19 +1012,31 @@ class BPServiceActor implements Runnable { // Blockreport. long scheduleHeartbeat() { nextHeartbeatTime = monotonicNow(); + scheduleNextLifeline(nextHeartbeatTime); return nextHeartbeatTime; } long scheduleNextHeartbeat() { // Numerical overflow is possible here and is okay. nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs; + scheduleNextLifeline(nextHeartbeatTime); return nextHeartbeatTime; } + long scheduleNextLifeline(long baseTime) { + // Numerical overflow is possible here and is okay. + nextLifelineTime = baseTime + lifelineIntervalMs; + return nextLifelineTime; + } + boolean isHeartbeatDue(long startTime) { return (nextHeartbeatTime - startTime <= 0); } + boolean isLifelineDue(long startTime) { + return (nextLifelineTime - startTime <= 0); + } + boolean isBlockReportDue(long curTime) { return nextBlockReportTime - curTime <= 0; } @@ -903,6 +1092,10 @@ class BPServiceActor implements Runnable { return nextHeartbeatTime - monotonicNow(); } + long getLifelineWaitTime() { + return nextLifelineTime - monotonicNow(); + } + /** * Wrapped for testing. * @return diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 08b2fb0e253..e94bbb762f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -151,14 +151,18 @@ class BlockPoolManager { Map> newAddressMap = DFSUtil .getNNServiceRpcAddressesForCluster(conf); + Map> newLifelineAddressMap = DFSUtil + .getNNLifelineRpcAddressesForCluster(conf); synchronized (refreshNamenodesLock) { - doRefreshNamenodes(newAddressMap); + doRefreshNamenodes(newAddressMap, newLifelineAddressMap); } } private void doRefreshNamenodes( - Map> addrMap) throws IOException { + Map> addrMap, + Map> lifelineAddrMap) + throws IOException { assert Thread.holdsLock(refreshNamenodesLock); Set toRefresh = Sets.newLinkedHashSet(); @@ -195,9 +199,19 @@ class BlockPoolManager { Joiner.on(",").useForNull("").join(toAdd)); for (String nsToAdd : toAdd) { + Map nnIdToAddr = addrMap.get(nsToAdd); + Map nnIdToLifelineAddr = + lifelineAddrMap.get(nsToAdd); ArrayList addrs = - Lists.newArrayList(addrMap.get(nsToAdd).values()); - BPOfferService bpos = createBPOS(addrs); + Lists.newArrayListWithCapacity(nnIdToAddr.size()); + ArrayList lifelineAddrs = + Lists.newArrayListWithCapacity(nnIdToAddr.size()); + for (String nnId : nnIdToAddr.keySet()) { + addrs.add(nnIdToAddr.get(nnId)); + lifelineAddrs.add(nnIdToLifelineAddr != null ? + nnIdToLifelineAddr.get(nnId) : null); + } + BPOfferService bpos = createBPOS(addrs, lifelineAddrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } @@ -227,9 +241,19 @@ class BlockPoolManager { for (String nsToRefresh : toRefresh) { BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); + Map nnIdToAddr = addrMap.get(nsToRefresh); + Map nnIdToLifelineAddr = + lifelineAddrMap.get(nsToRefresh); ArrayList addrs = - Lists.newArrayList(addrMap.get(nsToRefresh).values()); - bpos.refreshNNList(addrs); + Lists.newArrayListWithCapacity(nnIdToAddr.size()); + ArrayList lifelineAddrs = + Lists.newArrayListWithCapacity(nnIdToAddr.size()); + for (String nnId : nnIdToAddr.keySet()) { + addrs.add(nnIdToAddr.get(nnId)); + lifelineAddrs.add(nnIdToLifelineAddr != null ? + nnIdToLifelineAddr.get(nnId) : null); + } + bpos.refreshNNList(addrs, lifelineAddrs); } } } @@ -237,7 +261,8 @@ class BlockPoolManager { /** * Extracted out for test purposes. */ - protected BPOfferService createBPOS(List nnAddrs) { - return new BPOfferService(nnAddrs, dn); + protected BPOfferService createBPOS(List nnAddrs, + List lifelineNnAddrs) { + return new BPOfferService(nnAddrs, lifelineNnAddrs, dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index a1e72140dfd..85132f76d42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; @@ -87,6 +88,7 @@ public class DNConf { final long readaheadLength; final long heartBeatInterval; + private final long lifelineIntervalMs; final long blockReportInterval; final long blockReportSplitThreshold; final long ibrInterval; @@ -185,6 +187,20 @@ public class DNConf { heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; + long confLifelineIntervalMs = + conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, + 3 * conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, + DFS_HEARTBEAT_INTERVAL_DEFAULT)) * 1000L; + if (confLifelineIntervalMs <= heartBeatInterval) { + confLifelineIntervalMs = 3 * heartBeatInterval; + DataNode.LOG.warn( + String.format("%s must be set to a value greater than %s. " + + "Resetting value to 3 * %s, which is %d milliseconds.", + DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, + DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_KEY, + confLifelineIntervalMs)); + } + lifelineIntervalMs = confLifelineIntervalMs; // do we need to sync block file contents to disk when blockfile is closed? this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, @@ -311,4 +327,13 @@ public class DNConf { public long getBpReadyTimeout() { return bpReadyTimeout; } + + /** + * Returns the interval in milliseconds between sending lifeline messages. + * + * @return interval in milliseconds between sending lifeline messages + */ + public long getLifelineIntervalMs() { + return lifelineIntervalMs; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index eeb5f89300a..a209c1ce05d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; @@ -1633,6 +1634,19 @@ public class DataNode extends ReconfigurableBase return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf); } + /** + * Connect to the NN for the lifeline protocol. This is separated out for + * easier testing. + * + * @param lifelineNnAddr address of lifeline RPC server + * @return lifeline RPC proxy + */ + DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN( + InetSocketAddress lifelineNnAddr) throws IOException { + return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr, + conf); + } + public static InterDatanodeProtocol createInterDataNodeProtocolProxy( DatanodeID datanodeid, final Configuration conf, final int socketTimeout, final boolean connectToDnViaHostname) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index a0f25dae012..aa518fb7688 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -107,6 +107,7 @@ public class DataNodeMetrics { @Metric MutableRate copyBlockOp; @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; + @Metric MutableRate lifelines; @Metric MutableRate blockReports; @Metric MutableRate incrementalBlockReports; @Metric MutableRate cacheReports; @@ -199,6 +200,10 @@ public class DataNodeMetrics { heartbeats.add(latency); } + public void addLifeline(long latency) { + lifelines.add(latency); + } + public void addBlockReport(long latency) { blockReports.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index dbf360dbc94..80a440b3f6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3588,6 +3588,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + /** + * Handles a lifeline message sent by a DataNode. This method updates contact + * information and statistics for the DataNode, so that it doesn't time out. + * Unlike a heartbeat, this method does not dispatch any commands back to the + * DataNode for local execution. This method also cannot request a lease for + * sending a full block report. Lifeline messages are used only as a fallback + * in case something prevents successful delivery of heartbeat messages. + * Therefore, the implementation of this method must remain lightweight + * compared to heartbeat handling. It should avoid lock contention and + * expensive computation. + * + * @param nodeReg registration info for DataNode sending the lifeline + * @param reports storage reports from DataNode + * @param cacheCapacity cache capacity at DataNode + * @param cacheUsed cache used at DataNode + * @param xceiverCount estimated count of transfer threads running at DataNode + * @param xmitsInProgress count of transfers running at DataNode + * @param failedVolumes count of failed volumes at DataNode + * @param volumeFailureSummary info on failed volumes at DataNode + * @throws IOException if there is an error + */ + void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports, + long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, + int failedVolumes, VolumeFailureSummary volumeFailureSummary) + throws IOException { + int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; + blockManager.getDatanodeManager().handleLifeline(nodeReg, reports, + getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, + failedVolumes, volumeFailureSummary); + } + /** * Returns whether or not there were available resources at the last check of * resources. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 554ea101123..220547c7ca2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; @@ -110,11 +111,14 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; @@ -255,6 +259,11 @@ class NameNodeRpcServer implements NamenodeProtocols { BlockingService dnProtoPbService = DatanodeProtocolService .newReflectiveBlockingService(dnProtoPbTranslator); + DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator = + new DatanodeLifelineProtocolServerSideTranslatorPB(this); + BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService + .newReflectiveBlockingService(lifelineProtoPbTranslator); + NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService NNPbService = NamenodeProtocolService @@ -369,9 +378,14 @@ class NameNodeRpcServer implements NamenodeProtocols { lifelineRpcAddr.getPort()); int lifelineHandlerCount = conf.getInt( - DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, - DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT); - + DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0); + if (lifelineHandlerCount <= 0) { + float lifelineHandlerRatio = conf.getFloat( + DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY, + DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT); + lifelineHandlerCount = Math.max( + (int)(handlerCount * lifelineHandlerRatio), 1); + } lifelineRpcServer = new RPC.Builder(conf) .setProtocol(HAServiceProtocolPB.class) .setInstance(haPbService) @@ -382,6 +396,9 @@ class NameNodeRpcServer implements NamenodeProtocols { .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); + DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class, + lifelineProtoPbService, lifelineRpcServer); + // Update the address with the correct port InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress(); lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(), @@ -1502,6 +1519,17 @@ class NameNodeRpcServer implements NamenodeProtocols { return namesystem.getNamespaceInfo(); } + @Override // DatanodeLifelineProtocol + public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report, + long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, + int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary) throws IOException { + checkNNStartup(); + verifyRequest(nodeReg); + namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed, + xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary); + } + /** * Verifies the given registration. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java new file mode 100644 index 00000000000..b30e60b45d5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.security.KerberosInfo; + +/** + * Protocol used by a DataNode to send lifeline messages to a NameNode. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) +@InterfaceAudience.Private +public interface DatanodeLifelineProtocol { + + @Idempotent + void sendLifeline(DatanodeRegistration registration, StorageReport[] reports, + long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, + int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java index 4a3d83dee7e..d874e8f75dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java @@ -34,6 +34,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocol; public interface NamenodeProtocols extends ClientProtocol, DatanodeProtocol, + DatanodeLifelineProtocol, NamenodeProtocol, RefreshAuthorizationPolicyProtocol, ReconfigurationProtocol, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto new file mode 100644 index 00000000000..b6ab75653a1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +option java_package = "org.apache.hadoop.hdfs.protocol.proto"; +option java_outer_classname = "DatanodeLifelineProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdfs.datanodelifeline; + +import "DatanodeProtocol.proto"; + +// The lifeline protocol does not use a new request message type. Instead, it +// reuses the existing heartbeat request message. + +// Unlike heartbeats, the response is empty. There is no command dispatch. +message LifelineResponseProto { +} + +service DatanodeLifelineProtocolService { + rpc sendLifeline(hadoop.hdfs.datanode.HeartbeatRequestProto) + returns(LifelineResponseProto); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ac7ad8477e8..997589791b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -704,6 +704,22 @@ Determines datanode heartbeat interval in seconds. + + dfs.datanode.lifeline.interval.seconds + + + Sets the interval in seconds between sending DataNode Lifeline Protocol + messages from the DataNode to the NameNode. The value must be greater than + the value of dfs.heartbeat.interval. If this property is not defined, then + the default behavior is to calculate the interval as 3x the value of + dfs.heartbeat.interval. Note that normal heartbeat processing may cause the + DataNode to postpone sending lifeline messages if they are not required. + Under normal operations with speedy heartbeat processing, it is possible + that no lifeline messages will need to be sent at all. This property has no + effect if dfs.namenode.lifeline.rpc-address is not defined. + + + dfs.namenode.handler.count 10 @@ -725,14 +741,34 @@ - dfs.namenode.lifeline.handler.count - 1 + dfs.namenode.lifeline.handler.ratio + 0.10 - Sets number of RPC server threads the NameNode runs for handling the - lifeline RPC server. The default value is 1, because this RPC server - handles only HA health check requests from ZKFC. These are lightweight - requests that run single-threaded from the ZKFC client side. This property - has no effect if dfs.namenode.lifeline.rpc-address is not defined. + A ratio applied to the value of dfs.namenode.handler.count, which then + provides the number of RPC server threads the NameNode runs for handling the + lifeline RPC server. For example, if dfs.namenode.handler.count is 100, and + dfs.namenode.lifeline.handler.factor is 0.10, then the NameNode starts + 100 * 0.10 = 10 threads for handling the lifeline RPC server. It is common + to tune the value of dfs.namenode.handler.count as a function of the number + of DataNodes in a cluster. Using this property allows for the lifeline RPC + server handler threads to be tuned automatically without needing to touch a + separate property. Lifeline message processing is lightweight, so it is + expected to require many fewer threads than the main NameNode RPC server. + This property is not used if dfs.namenode.lifeline.handler.count is defined, + which sets an absolute thread count. This property has no effect if + dfs.namenode.lifeline.rpc-address is not defined. + + + + + dfs.namenode.lifeline.handler.count + + + Sets an absolute number of RPC server threads the NameNode runs for handling + the DataNode Lifeline Protocol and HA health check requests from ZKFC. If + this property is defined, then it overrides the behavior of + dfs.namenode.lifeline.handler.ratio. By default, it is not defined. This + property has no effect if dfs.namenode.lifeline.rpc-address is not defined. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 1421f0fda09..95a103e2e83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -396,7 +397,8 @@ public class TestBPOfferService { Mockito.eq(new InetSocketAddress(port))); } - return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn); + return new BPOfferService(Lists.newArrayList(nnMap.keySet()), + Collections.nCopies(nnMap.size(), null), mockDn); } private void waitForInitialization(final BPOfferService bpos) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java index 27e99dbf9ea..48006dcb06c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java @@ -51,7 +51,8 @@ public class TestBlockPoolManager { bpm = new BlockPoolManager(mockDN){ @Override - protected BPOfferService createBPOS(List nnAddrs) { + protected BPOfferService createBPOS(List nnAddrs, + List lifelineNnAddrs) { final int idx = mockIdx++; doLog("create #" + idx); final BPOfferService bpos = Mockito.mock(BPOfferService.class); @@ -66,6 +67,7 @@ public class TestBlockPoolManager { return null; } }).when(bpos).refreshNNList( + Mockito.>any(), Mockito.>any()); } catch (IOException e) { throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index efdd87c8575..76885e417ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -49,6 +49,7 @@ public class TestBpServiceActorScheduler { public Timeout timeout = new Timeout(300000); private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds + private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS; private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds private final Random random = new Random(System.nanoTime()); @@ -166,9 +167,23 @@ public class TestBpServiceActorScheduler { } } + @Test + public void testScheduleLifeline() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + scheduler.scheduleNextLifeline(now); + assertFalse(scheduler.isLifelineDue(now)); + assertThat(scheduler.getLifelineWaitTime(), is(LIFELINE_INTERVAL_MS)); + scheduler.scheduleNextLifeline(now - LIFELINE_INTERVAL_MS); + assertTrue(scheduler.isLifelineDue(now)); + assertThat(scheduler.getLifelineWaitTime(), is(0L)); + } + } + private Scheduler makeMockScheduler(long now) { LOG.info("Using now = " + now); - Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS)); + Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, + LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS)); doReturn(now).when(mockScheduler).monotonicNow(); mockScheduler.nextBlockReportTime = now; mockScheduler.nextHeartbeatTime = now; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java new file mode 100644 index 00000000000..fd661156e54 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.test.GenericTestUtils; + +import org.apache.log4j.Level; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; + +/** + * Test suite covering lifeline protocol handling in the DataNode. + */ +public class TestDataNodeLifeline { + + private static final Logger LOG = LoggerFactory.getLogger( + TestDataNodeLifeline.class); + + static { + GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); + } + + @Rule + public Timeout timeout = new Timeout(60000); + + private MiniDFSCluster cluster; + private HdfsConfiguration conf; + private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode; + private DataNodeMetrics metrics; + private DatanodeProtocolClientSideTranslatorPB namenode; + private FSNamesystem namesystem; + + @Before + public void setup() throws Exception { + // Configure cluster with lifeline RPC server enabled, and down-tune + // heartbeat timings to try to force quick dead/stale DataNodes. + conf = new HdfsConfiguration(); + conf.setInt(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, 2); + conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 6 * 1000); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + namesystem = cluster.getNameNode().getNamesystem(); + + // Set up spies on RPC proxies so that we can inject failures. + DataNode dn = cluster.getDataNodes().get(0); + metrics = dn.getMetrics(); + assertNotNull(metrics); + List allBpos = dn.getAllBpOs(); + assertNotNull(allBpos); + assertEquals(1, allBpos.size()); + + BPOfferService bpos = allBpos.get(0); + List allBpsa = bpos.getBPServiceActors(); + assertNotNull(allBpsa); + assertEquals(1, allBpsa.size()); + + final BPServiceActor bpsa = allBpsa.get(0); + assertNotNull(bpsa); + + // Lifeline RPC proxy gets created on separate thread, so poll until found. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + if (bpsa.getLifelineNameNodeProxy() != null) { + lifelineNamenode = spy(bpsa.getLifelineNameNodeProxy()); + bpsa.setLifelineNameNode(lifelineNamenode); + } + return lifelineNamenode != null; + } + }, 100, 10000); + + assertNotNull(bpsa.getNameNodeProxy()); + namenode = spy(bpsa.getNameNodeProxy()); + bpsa.setNameNode(namenode); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + GenericTestUtils.assertNoThreadsMatching(".*lifeline.*"); + } + } + + @Test + public void testSendLifelineIfHeartbeatBlocked() throws Exception { + // Run the test for the duration of sending 10 lifeline RPC messages. + int numLifelines = 10; + CountDownLatch lifelinesSent = new CountDownLatch(numLifelines); + + // Intercept heartbeat to inject an artificial delay, until all expected + // lifeline RPC messages have been sent. + doAnswer(new LatchAwaitingAnswer(lifelinesSent)) + .when(namenode).sendHeartbeat( + any(DatanodeRegistration.class), + any(StorageReport[].class), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + any(VolumeFailureSummary.class), + anyBoolean()); + + // Intercept lifeline to trigger latch count-down on each call. + doAnswer(new LatchCountingAnswer(lifelinesSent)) + .when(lifelineNamenode).sendLifeline( + any(DatanodeRegistration.class), + any(StorageReport[].class), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + any(VolumeFailureSummary.class)); + + // While waiting on the latch for the expected number of lifeline messages, + // poll DataNode tracking information. Thanks to the lifeline, we expect + // that the DataNode always stays alive, and never goes stale or dead. + while (!lifelinesSent.await(1, SECONDS)) { + assertEquals("Expect DataNode to be kept alive by lifeline.", 1, + namesystem.getNumLiveDataNodes()); + assertEquals("Expect DataNode not marked dead due to lifeline.", 0, + namesystem.getNumDeadDataNodes()); + assertEquals("Expect DataNode not marked stale due to lifeline.", 0, + namesystem.getNumStaleDataNodes()); + } + + // Verify that we did in fact call the lifeline RPC. + verify(lifelineNamenode, atLeastOnce()).sendLifeline( + any(DatanodeRegistration.class), + any(StorageReport[].class), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + any(VolumeFailureSummary.class)); + + // Also verify lifeline call through metrics. We expect at least + // numLifelines, guaranteed by waiting on the latch. There is a small + // possibility of extra lifeline calls depending on timing, so we allow + // slack in the assertion. + assertTrue("Expect metrics to count at least " + numLifelines + " calls.", + getLongCounter("LifelinesNumOps", getMetrics(metrics.name())) >= + numLifelines); + } + + @Test + public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception { + // Run the test for the duration of sending 10 heartbeat RPC messages. + int numHeartbeats = 10; + CountDownLatch heartbeatsSent = new CountDownLatch(numHeartbeats); + + // Intercept heartbeat to trigger latch count-down on each call. + doAnswer(new LatchCountingAnswer(heartbeatsSent)) + .when(namenode).sendHeartbeat( + any(DatanodeRegistration.class), + any(StorageReport[].class), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + any(VolumeFailureSummary.class), + anyBoolean()); + + // While waiting on the latch for the expected number of heartbeat messages, + // poll DataNode tracking information. We expect that the DataNode always + // stays alive, and never goes stale or dead. + while (!heartbeatsSent.await(1, SECONDS)) { + assertEquals("Expect DataNode to be kept alive by lifeline.", 1, + namesystem.getNumLiveDataNodes()); + assertEquals("Expect DataNode not marked dead due to lifeline.", 0, + namesystem.getNumDeadDataNodes()); + assertEquals("Expect DataNode not marked stale due to lifeline.", 0, + namesystem.getNumStaleDataNodes()); + } + + // Verify that we did not call the lifeline RPC. + verify(lifelineNamenode, never()).sendLifeline( + any(DatanodeRegistration.class), + any(StorageReport[].class), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + any(VolumeFailureSummary.class)); + + // Also verify no lifeline calls through metrics. + assertEquals("Expect metrics to count no lifeline calls.", 0, + getLongCounter("LifelinesNumOps", getMetrics(metrics.name()))); + } + + /** + * Waits on a {@link CountDownLatch} before calling through to the method. + */ + private final class LatchAwaitingAnswer implements Answer { + private final CountDownLatch latch; + + public LatchAwaitingAnswer(CountDownLatch latch) { + this.latch = latch; + } + + @Override + @SuppressWarnings("unchecked") + public T answer(InvocationOnMock invocation) + throws Throwable { + LOG.info("Awaiting, remaining latch count is {}.", latch.getCount()); + latch.await(); + return (T)invocation.callRealMethod(); + } + } + + /** + * Counts on a {@link CountDownLatch} after each call through to the method. + */ + private final class LatchCountingAnswer implements Answer { + private final CountDownLatch latch; + + public LatchCountingAnswer(CountDownLatch latch) { + this.latch = latch; + } + + @Override + @SuppressWarnings("unchecked") + public T answer(InvocationOnMock invocation) + throws Throwable { + T result = (T)invocation.callRealMethod(); + latch.countDown(); + LOG.info("Countdown, remaining latch count is {}.", latch.getCount()); + return result; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java index 216ff3d177c..f2a5d089bca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java @@ -61,7 +61,7 @@ public class TestDatanodeRegister { BPOfferService mockBPOS = mock(BPOfferService.class); doReturn(mockDN).when(mockBPOS).getDataNode(); - actor = new BPServiceActor(INVALID_ADDR, mockBPOS); + actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS); fakeNsInfo = mock(NamespaceInfo.class); // Return a a good software version.