HDFS-11826. Federation Namenode Heartbeat. Contributed by Inigo Goiri.
(cherry picked from commit928f8dab52
) (cherry picked from commitd8c8107332
)
This commit is contained in:
parent
366bf3c5dc
commit
6f787d262c
|
@ -1039,6 +1039,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
FEDERATION_ROUTER_PREFIX + "rpc.enable";
|
||||
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
|
||||
|
||||
// HDFS Router heartbeat
|
||||
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
|
||||
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
|
||||
public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
|
||||
public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
|
||||
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
|
||||
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
|
||||
TimeUnit.SECONDS.toMillis(5);
|
||||
public static final String DFS_ROUTER_MONITOR_NAMENODE =
|
||||
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
|
||||
public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
|
||||
FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
|
||||
public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
|
||||
|
||||
// HDFS Router NN client
|
||||
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
|
||||
FEDERATION_ROUTER_PREFIX + "connection.pool-size";
|
||||
|
|
|
@ -1238,6 +1238,44 @@ public class DFSUtil {
|
|||
return serviceRpcAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a logical namenode ID to its web address. Use the given nameservice if
|
||||
* specified, or the configured one if none is given.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nsId which nameservice nnId is a part of, optional
|
||||
* @param nnId the namenode ID to get the service addr for
|
||||
* @return the service addr, null if it could not be determined
|
||||
*/
|
||||
public static String getNamenodeWebAddr(final Configuration conf, String nsId,
|
||||
String nnId) {
|
||||
|
||||
if (nsId == null) {
|
||||
nsId = getOnlyNameServiceIdOrNull(conf);
|
||||
}
|
||||
|
||||
String webAddrKey = DFSUtilClient.concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
String webAddr =
|
||||
conf.get(webAddrKey, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT);
|
||||
return webAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of the Web addresses of the individual NNs in a given nameservice.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nsId the nameservice whose NNs addresses we want.
|
||||
* @param defaultValue default address to return in case key is not found.
|
||||
* @return A map from nnId -> Web address of each NN in the nameservice.
|
||||
*/
|
||||
public static Map<String, InetSocketAddress> getWebAddressesForNameserviceId(
|
||||
Configuration conf, String nsId, String defaultValue) {
|
||||
return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
|
||||
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the configuration refers to only a single nameservice, return the
|
||||
* name of that nameservice. If it refers to 0 or more than 1, return null.
|
||||
|
|
|
@ -39,8 +39,29 @@ public class NamenodeStatusReport {
|
|||
private HAServiceState status = HAServiceState.STANDBY;
|
||||
private boolean safeMode = false;
|
||||
|
||||
/** Datanodes stats. */
|
||||
private int liveDatanodes = -1;
|
||||
private int deadDatanodes = -1;
|
||||
/** Decommissioning datanodes. */
|
||||
private int decomDatanodes = -1;
|
||||
/** Live decommissioned datanodes. */
|
||||
private int liveDecomDatanodes = -1;
|
||||
/** Dead decommissioned datanodes. */
|
||||
private int deadDecomDatanodes = -1;
|
||||
|
||||
/** Space stats. */
|
||||
private long availableSpace = -1;
|
||||
private long numOfFiles = -1;
|
||||
private long numOfBlocks = -1;
|
||||
private long numOfBlocksMissing = -1;
|
||||
private long numOfBlocksPendingReplication = -1;
|
||||
private long numOfBlocksUnderReplicated = -1;
|
||||
private long numOfBlocksPendingDeletion = -1;
|
||||
private long totalSpace = -1;
|
||||
|
||||
/** If the fields are valid. */
|
||||
private boolean registrationValid = false;
|
||||
private boolean statsValid = false;
|
||||
private boolean haStateValid = false;
|
||||
|
||||
public NamenodeStatusReport(String ns, String nn, String rpc, String service,
|
||||
|
@ -53,6 +74,15 @@ public class NamenodeStatusReport {
|
|||
this.webAddress = web;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the statistics are valid.
|
||||
*
|
||||
* @return If the statistics are valid.
|
||||
*/
|
||||
public boolean statsValid() {
|
||||
return this.statsValid;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the registration is valid.
|
||||
*
|
||||
|
@ -187,6 +217,169 @@ public class NamenodeStatusReport {
|
|||
return this.safeMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the datanode information.
|
||||
*
|
||||
* @param numLive Number of live nodes.
|
||||
* @param numDead Number of dead nodes.
|
||||
* @param numDecom Number of decommissioning nodes.
|
||||
* @param numLiveDecom Number of decommissioned live nodes.
|
||||
* @param numDeadDecom Number of decommissioned dead nodes.
|
||||
*/
|
||||
public void setDatanodeInfo(int numLive, int numDead, int numDecom,
|
||||
int numLiveDecom, int numDeadDecom) {
|
||||
this.liveDatanodes = numLive;
|
||||
this.deadDatanodes = numDead;
|
||||
this.decomDatanodes = numDecom;
|
||||
this.liveDecomDatanodes = numLiveDecom;
|
||||
this.deadDecomDatanodes = numDeadDecom;
|
||||
this.statsValid = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of live blocks.
|
||||
*
|
||||
* @return The number of dead nodes.
|
||||
*/
|
||||
public int getNumLiveDatanodes() {
|
||||
return this.liveDatanodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of dead blocks.
|
||||
*
|
||||
* @return The number of dead nodes.
|
||||
*/
|
||||
public int getNumDeadDatanodes() {
|
||||
return this.deadDatanodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of decommissionining nodes.
|
||||
*
|
||||
* @return The number of decommissionining nodes.
|
||||
*/
|
||||
public int getNumDecommissioningDatanodes() {
|
||||
return this.decomDatanodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of live decommissioned nodes.
|
||||
*
|
||||
* @return The number of live decommissioned nodes.
|
||||
*/
|
||||
public int getNumDecomLiveDatanodes() {
|
||||
return this.liveDecomDatanodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of dead decommissioned nodes.
|
||||
*
|
||||
* @return The number of dead decommissioned nodes.
|
||||
*/
|
||||
public int getNumDecomDeadDatanodes() {
|
||||
return this.deadDecomDatanodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the filesystem information.
|
||||
*
|
||||
* @param available Available capacity.
|
||||
* @param total Total capacity.
|
||||
* @param numFiles Number of files.
|
||||
* @param numBlocks Total number of blocks.
|
||||
* @param numBlocksMissing Number of missing blocks.
|
||||
* @param numOfBlocksPendingReplication Number of blocks pending replication.
|
||||
* @param numOfBlocksUnderReplicated Number of blocks under replication.
|
||||
* @param numOfBlocksPendingDeletion Number of blocks pending deletion.
|
||||
*/
|
||||
public void setNamesystemInfo(long available, long total,
|
||||
long numFiles, long numBlocks, long numBlocksMissing,
|
||||
long numBlocksPendingReplication, long numBlocksUnderReplicated,
|
||||
long numBlocksPendingDeletion) {
|
||||
this.totalSpace = total;
|
||||
this.availableSpace = available;
|
||||
this.numOfBlocks = numBlocks;
|
||||
this.numOfBlocksMissing = numBlocksMissing;
|
||||
this.numOfBlocksPendingReplication = numBlocksPendingReplication;
|
||||
this.numOfBlocksUnderReplicated = numBlocksUnderReplicated;
|
||||
this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
|
||||
this.numOfFiles = numFiles;
|
||||
this.statsValid = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of blocks.
|
||||
*
|
||||
* @return The number of blocks.
|
||||
*/
|
||||
public long getNumBlocks() {
|
||||
return this.numOfBlocks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of files.
|
||||
*
|
||||
* @return The number of files.
|
||||
*/
|
||||
public long getNumFiles() {
|
||||
return this.numOfFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total space.
|
||||
*
|
||||
* @return The total space.
|
||||
*/
|
||||
public long getTotalSpace() {
|
||||
return this.totalSpace;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the available space.
|
||||
*
|
||||
* @return The available space.
|
||||
*/
|
||||
public long getAvailableSpace() {
|
||||
return this.availableSpace;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of missing blocks.
|
||||
*
|
||||
* @return Number of missing blocks.
|
||||
*/
|
||||
public long getNumBlocksMissing() {
|
||||
return this.numOfBlocksMissing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending replication blocks.
|
||||
*
|
||||
* @return Number of pending replication blocks.
|
||||
*/
|
||||
public long getNumOfBlocksPendingReplication() {
|
||||
return this.numOfBlocksPendingReplication;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of under replicated blocks.
|
||||
*
|
||||
* @return Number of under replicated blocks.
|
||||
*/
|
||||
public long getNumOfBlocksUnderReplicated() {
|
||||
return this.numOfBlocksUnderReplicated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending deletion blocks.
|
||||
*
|
||||
* @return Number of pending deletion blocks.
|
||||
*/
|
||||
public long getNumOfBlocksPendingDeletion() {
|
||||
return this.numOfBlocksPendingDeletion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s-%s:%s",
|
||||
|
|
|
@ -17,13 +17,22 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -39,6 +48,63 @@ public final class FederationUtil {
|
|||
// Utility Class
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a JMX data from a web endpoint.
|
||||
*
|
||||
* @param beanQuery JMX bean.
|
||||
* @param webAddress Web address of the JMX endpoint.
|
||||
* @return JSON with the JMX data
|
||||
*/
|
||||
public static JSONArray getJmx(String beanQuery, String webAddress) {
|
||||
JSONArray ret = null;
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
String host = webAddress;
|
||||
int port = -1;
|
||||
if (webAddress.indexOf(":") > 0) {
|
||||
String[] webAddressSplit = webAddress.split(":");
|
||||
host = webAddressSplit[0];
|
||||
port = Integer.parseInt(webAddressSplit[1]);
|
||||
}
|
||||
URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
|
||||
URLConnection conn = jmxURL.openConnection();
|
||||
conn.setConnectTimeout(5 * 1000);
|
||||
conn.setReadTimeout(5 * 1000);
|
||||
InputStream in = conn.getInputStream();
|
||||
InputStreamReader isr = new InputStreamReader(in, "UTF-8");
|
||||
reader = new BufferedReader(isr);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line = null;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
sb.append(line);
|
||||
}
|
||||
String jmxOutput = sb.toString();
|
||||
|
||||
// Parse JSON
|
||||
JSONObject json = new JSONObject(jmxOutput);
|
||||
ret = json.getJSONArray("beans");
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot read JMX bean {} from server {}: {}",
|
||||
beanQuery, webAddress, e.getMessage());
|
||||
} catch (JSONException e) {
|
||||
LOG.error("Cannot parse JMX output for {} from server {}: {}",
|
||||
beanQuery, webAddress, e.getMessage());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot parse JMX output for {} from server {}: {}",
|
||||
beanQuery, webAddress, e);
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Problem closing {}", webAddress, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance of an interface with a constructor using a context.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,350 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The {@link Router} periodically checks the state of a Namenode (usually on
|
||||
* the same server) and reports their high availability (HA) state and
|
||||
* load/space status to the
|
||||
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
|
||||
* . Note that this is an optional role as a Router can be independent of any
|
||||
* subcluster.
|
||||
* <p>
|
||||
* For performance with Namenode HA, the Router uses the high availability state
|
||||
* information in the State Store to forward the request to the Namenode that is
|
||||
* most likely to be active.
|
||||
* <p>
|
||||
* Note that this service can be embedded into the Namenode itself to simplify
|
||||
* the operation.
|
||||
*/
|
||||
public class NamenodeHeartbeatService extends PeriodicService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NamenodeHeartbeatService.class);
|
||||
|
||||
|
||||
/** Configuration for the heartbeat. */
|
||||
private Configuration conf;
|
||||
|
||||
/** Router performing the heartbeating. */
|
||||
private final ActiveNamenodeResolver resolver;
|
||||
|
||||
/** Interface to the tracked NN. */
|
||||
private final String nameserviceId;
|
||||
private final String namenodeId;
|
||||
|
||||
/** Namenode HA target. */
|
||||
private NNHAServiceTarget localTarget;
|
||||
/** RPC address for the namenode. */
|
||||
private String rpcAddress;
|
||||
/** Service RPC address for the namenode. */
|
||||
private String serviceAddress;
|
||||
/** Service RPC address for the namenode. */
|
||||
private String lifelineAddress;
|
||||
/** HTTP address for the namenode. */
|
||||
private String webAddress;
|
||||
|
||||
/**
|
||||
* Create a new Namenode status updater.
|
||||
* @param resolver Namenode resolver service to handle NN registration.
|
||||
* @param nameserviceId Identifier of the nameservice.
|
||||
* @param namenodeId Identifier of the namenode in HA.
|
||||
*/
|
||||
public NamenodeHeartbeatService(
|
||||
ActiveNamenodeResolver resolver, String nsId, String nnId) {
|
||||
super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " +
|
||||
nnId);
|
||||
|
||||
this.resolver = resolver;
|
||||
|
||||
this.nameserviceId = nsId;
|
||||
this.namenodeId = nnId;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration configuration) throws Exception {
|
||||
|
||||
this.conf = configuration;
|
||||
|
||||
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
|
||||
this.localTarget = new NNHAServiceTarget(
|
||||
conf, nameserviceId, namenodeId);
|
||||
} else {
|
||||
this.localTarget = null;
|
||||
}
|
||||
|
||||
// Get the RPC address for the clients to connect
|
||||
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
|
||||
LOG.info("{}-{} RPC address: {}",
|
||||
nameserviceId, namenodeId, rpcAddress);
|
||||
|
||||
// Get the Service RPC address for monitoring
|
||||
this.serviceAddress =
|
||||
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
|
||||
if (this.serviceAddress == null) {
|
||||
LOG.error("Cannot locate RPC service address for NN {}-{}, " +
|
||||
"using RPC address {}", nameserviceId, namenodeId, this.rpcAddress);
|
||||
this.serviceAddress = this.rpcAddress;
|
||||
}
|
||||
LOG.info("{}-{} Service RPC address: {}",
|
||||
nameserviceId, namenodeId, serviceAddress);
|
||||
|
||||
// Get the Lifeline RPC address for faster monitoring
|
||||
this.lifelineAddress =
|
||||
DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
|
||||
if (this.lifelineAddress == null) {
|
||||
this.lifelineAddress = this.serviceAddress;
|
||||
}
|
||||
LOG.info("{}-{} Lifeline RPC address: {}",
|
||||
nameserviceId, namenodeId, lifelineAddress);
|
||||
|
||||
// Get the Web address for UI
|
||||
this.webAddress =
|
||||
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
|
||||
LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress);
|
||||
|
||||
this.setIntervalMs(conf.getLong(
|
||||
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
|
||||
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
|
||||
|
||||
|
||||
super.serviceInit(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void periodicInvoke() {
|
||||
updateState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the RPC address for a Namenode.
|
||||
* @param conf Configuration.
|
||||
* @param nsId Name service identifier.
|
||||
* @param nnId Name node identifier.
|
||||
* @return RPC address in format hostname:1234.
|
||||
*/
|
||||
private static String getRpcAddress(
|
||||
Configuration conf, String nsId, String nnId) {
|
||||
|
||||
// Get it from the regular RPC setting
|
||||
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
String ret = conf.get(confKey);
|
||||
|
||||
if (nsId != null && nnId != null) {
|
||||
// Get if for the proper nameservice and namenode
|
||||
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
|
||||
ret = conf.get(confKey);
|
||||
|
||||
// If not available, get it from the map
|
||||
if (ret == null) {
|
||||
Map<String, InetSocketAddress> rpcAddresses =
|
||||
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
|
||||
if (rpcAddresses.containsKey(nnId)) {
|
||||
InetSocketAddress sockAddr = rpcAddresses.get(nnId);
|
||||
InetAddress addr = sockAddr.getAddress();
|
||||
ret = addr.getHostAddress() + ":" + sockAddr.getPort();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the state of the Namenode.
|
||||
*/
|
||||
private void updateState() {
|
||||
NamenodeStatusReport report = getNamenodeStatusReport();
|
||||
if (!report.registrationValid()) {
|
||||
// Not operational
|
||||
LOG.error("Namenode is not operational: {}", getNamenodeDesc());
|
||||
} else if (report.haStateValid()) {
|
||||
// block and HA status available
|
||||
LOG.debug("Received service state: {} from HA namenode: {}",
|
||||
report.getState(), getNamenodeDesc());
|
||||
} else if (localTarget == null) {
|
||||
// block info available, HA status not expected
|
||||
LOG.debug(
|
||||
"Reporting non-HA namenode as operational: " + getNamenodeDesc());
|
||||
} else {
|
||||
// block info available, HA status should be available, but was not
|
||||
// fetched do nothing and let the current state stand
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!resolver.registerNamenode(report)) {
|
||||
LOG.warn("Cannot register namenode {}", report);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Cannot register namenode in the State Store");
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Unhandled exception updating NN registration for {}",
|
||||
getNamenodeDesc(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the status report for the Namenode monitored by this heartbeater.
|
||||
* @return Namenode status report.
|
||||
*/
|
||||
protected NamenodeStatusReport getNamenodeStatusReport() {
|
||||
NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
|
||||
namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
|
||||
|
||||
try {
|
||||
LOG.debug("Probing NN at service address: {}", serviceAddress);
|
||||
|
||||
URI serviceURI = new URI("hdfs://" + serviceAddress);
|
||||
// Read the filesystem info from RPC (required)
|
||||
NamenodeProtocol nn = NameNodeProxies
|
||||
.createProxy(this.conf, serviceURI, NamenodeProtocol.class)
|
||||
.getProxy();
|
||||
|
||||
if (nn != null) {
|
||||
NamespaceInfo info = nn.versionRequest();
|
||||
if (info != null) {
|
||||
report.setNamespaceInfo(info);
|
||||
}
|
||||
}
|
||||
if (!report.registrationValid()) {
|
||||
return report;
|
||||
}
|
||||
|
||||
// Check for safemode from the client protocol. Currently optional, but
|
||||
// should be required at some point for QoS
|
||||
try {
|
||||
ClientProtocol client = NameNodeProxies
|
||||
.createProxy(this.conf, serviceURI, ClientProtocol.class)
|
||||
.getProxy();
|
||||
if (client != null) {
|
||||
boolean isSafeMode = client.setSafeMode(
|
||||
SafeModeAction.SAFEMODE_GET, false);
|
||||
report.setSafeMode(isSafeMode);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
|
||||
}
|
||||
|
||||
// Read the stats from JMX (optional)
|
||||
updateJMXParameters(webAddress, report);
|
||||
|
||||
if (localTarget != null) {
|
||||
// Try to get the HA status
|
||||
try {
|
||||
// Determine if NN is active
|
||||
// TODO: dynamic timeout
|
||||
HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000);
|
||||
HAServiceStatus status = haProtocol.getServiceStatus();
|
||||
report.setHAServiceState(status.getState());
|
||||
} catch (Throwable e) {
|
||||
// Failed to fetch HA status, ignoring failure
|
||||
LOG.error("Cannot fetch HA status for {}: {}",
|
||||
getNamenodeDesc(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} catch(IOException e) {
|
||||
LOG.error("Cannot communicate with {}: {}",
|
||||
getNamenodeDesc(), e.getMessage());
|
||||
} catch(Throwable e) {
|
||||
// Generic error that we don't know about
|
||||
LOG.error("Unexpected exception while communicating with {}: {}",
|
||||
getNamenodeDesc(), e.getMessage(), e);
|
||||
}
|
||||
return report;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the description of the Namenode to monitor.
|
||||
* @return Description of the Namenode to monitor.
|
||||
*/
|
||||
public String getNamenodeDesc() {
|
||||
if (namenodeId != null && !namenodeId.isEmpty()) {
|
||||
return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
|
||||
} else {
|
||||
return nameserviceId + ":" + serviceAddress;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the parameters for a Namenode from JMX and add them to the report.
|
||||
* @param webAddress Web interface of the Namenode to monitor.
|
||||
* @param report Namenode status report to update with JMX data.
|
||||
*/
|
||||
private void updateJMXParameters(
|
||||
String address, NamenodeStatusReport report) {
|
||||
try {
|
||||
// TODO part of this should be moved to its own utility
|
||||
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
|
||||
JSONArray aux = FederationUtil.getJmx(query, address);
|
||||
if (aux != null) {
|
||||
for (int i = 0; i < aux.length(); i++) {
|
||||
JSONObject jsonObject = aux.getJSONObject(i);
|
||||
String name = jsonObject.getString("name");
|
||||
if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
|
||||
report.setDatanodeInfo(
|
||||
jsonObject.getInt("NumLiveDataNodes"),
|
||||
jsonObject.getInt("NumDeadDataNodes"),
|
||||
jsonObject.getInt("NumDecommissioningDataNodes"),
|
||||
jsonObject.getInt("NumDecomLiveDataNodes"),
|
||||
jsonObject.getInt("NumDecomDeadDataNodes"));
|
||||
} else if (name.equals(
|
||||
"Hadoop:service=NameNode,name=FSNamesystem")) {
|
||||
report.setNamesystemInfo(
|
||||
jsonObject.getLong("CapacityRemaining"),
|
||||
jsonObject.getLong("CapacityTotal"),
|
||||
jsonObject.getLong("FilesTotal"),
|
||||
jsonObject.getLong("BlocksTotal"),
|
||||
jsonObject.getLong("MissingBlocks"),
|
||||
jsonObject.getLong("PendingReplicationBlocks"),
|
||||
jsonObject.getLong("UnderReplicatedBlocks"),
|
||||
jsonObject.getLong("PendingDeletionBlocks"));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,12 +25,16 @@ import java.io.IOException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
|
@ -85,6 +89,8 @@ public class Router extends CompositeService {
|
|||
|
||||
/** Interface to identify the active NN for a nameservice or blockpool ID. */
|
||||
private ActiveNamenodeResolver namenodeResolver;
|
||||
/** Updates the namenode status in the namenode resolver. */
|
||||
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
|
||||
|
||||
|
||||
/** Usage string for help message. */
|
||||
|
@ -133,6 +139,22 @@ public class Router extends CompositeService {
|
|||
this.setRpcServerAddress(rpcServer.getRpcAddress());
|
||||
}
|
||||
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
||||
|
||||
// Create status updater for each monitored Namenode
|
||||
this.namenodeHearbeatServices = createNamenodeHearbeatServices();
|
||||
for (NamenodeHeartbeatService hearbeatService :
|
||||
this.namenodeHearbeatServices) {
|
||||
addService(hearbeatService);
|
||||
}
|
||||
|
||||
if (this.namenodeHearbeatServices.isEmpty()) {
|
||||
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
||||
}
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -242,6 +264,96 @@ public class Router extends CompositeService {
|
|||
return this.rpcAddress;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// Namenode heartbeat monitors
|
||||
/////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Create each of the services that will monitor a Namenode.
|
||||
*
|
||||
* @return List of heartbeat services.
|
||||
*/
|
||||
protected Collection<NamenodeHeartbeatService>
|
||||
createNamenodeHearbeatServices() {
|
||||
|
||||
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
|
||||
|
||||
if (conf.getBoolean(
|
||||
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
|
||||
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
|
||||
// Create a local heartbet service
|
||||
NamenodeHeartbeatService localHeartbeatService =
|
||||
createLocalNamenodeHearbeatService();
|
||||
if (localHeartbeatService != null) {
|
||||
String nnDesc = localHeartbeatService.getNamenodeDesc();
|
||||
ret.put(nnDesc, localHeartbeatService);
|
||||
}
|
||||
}
|
||||
|
||||
// Create heartbeat services for a list specified by the admin
|
||||
String namenodes = this.conf.get(
|
||||
DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
|
||||
if (namenodes != null) {
|
||||
for (String namenode : namenodes.split(",")) {
|
||||
String[] namenodeSplit = namenode.split("\\.");
|
||||
String nsId = null;
|
||||
String nnId = null;
|
||||
if (namenodeSplit.length == 2) {
|
||||
nsId = namenodeSplit[0];
|
||||
nnId = namenodeSplit[1];
|
||||
} else if (namenodeSplit.length == 1) {
|
||||
nsId = namenode;
|
||||
} else {
|
||||
LOG.error("Wrong Namenode to monitor: {}", namenode);
|
||||
}
|
||||
if (nsId != null) {
|
||||
NamenodeHeartbeatService heartbeatService =
|
||||
createNamenodeHearbeatService(nsId, nnId);
|
||||
if (heartbeatService != null) {
|
||||
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new status updater for the local Namenode.
|
||||
*
|
||||
* @return Updater of the status for the local Namenode.
|
||||
*/
|
||||
protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
|
||||
// Detect NN running in this machine
|
||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
String nnId = null;
|
||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
||||
nnId = HAUtil.getNameNodeId(conf, nsId);
|
||||
if (nnId == null) {
|
||||
LOG.error("Cannot find namenode id for local {}", nsId);
|
||||
}
|
||||
}
|
||||
|
||||
return createNamenodeHearbeatService(nsId, nnId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a heartbeat monitor for a particular Namenode.
|
||||
*
|
||||
* @param nsId Identifier of the nameservice to monitor.
|
||||
* @param nnId Identifier of the namenode (HA) to monitor.
|
||||
* @return Updater of the status for the specified Namenode.
|
||||
*/
|
||||
protected NamenodeHeartbeatService createNamenodeHearbeatService(
|
||||
String nsId, String nnId) {
|
||||
|
||||
LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
|
||||
NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
|
||||
namenodeResolver, nsId, nnId);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// Submodule getters
|
||||
/////////////////////////////////////////////////////////
|
||||
|
|
|
@ -4505,4 +4505,36 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.heartbeat.enable</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Enables the Router to heartbeat into the State Store.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.heartbeat.interval</name>
|
||||
<value>5000</value>
|
||||
<description>
|
||||
How often the Router should heartbeat into the State Store in milliseconds.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.monitor.namenode</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The identifier of the namenodes to monitor and heartbeat.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.monitor.localnamenode.enable</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If the Router should monitor the namenode in the local machine.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -581,6 +581,14 @@ public class MiniDFSCluster implements AutoCloseable {
|
|||
public void setStartOpt(StartupOption startOpt) {
|
||||
this.startOpt = startOpt;
|
||||
}
|
||||
|
||||
public String getNameserviceId() {
|
||||
return this.nameserviceId;
|
||||
}
|
||||
|
||||
public String getNamenodeId() {
|
||||
return this.nnId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -56,9 +56,16 @@ public class MockResolver
|
|||
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
|
||||
private String defaultNamespace = null;
|
||||
|
||||
public MockResolver() {
|
||||
this.cleanRegistrations();
|
||||
}
|
||||
|
||||
public MockResolver(Configuration conf) {
|
||||
this();
|
||||
}
|
||||
|
||||
public MockResolver(Configuration conf, StateStoreService store) {
|
||||
this.cleanRegistrations();
|
||||
this();
|
||||
}
|
||||
|
||||
public void addLocation(String mount, String nsId, String location) {
|
||||
|
|
|
@ -28,6 +28,8 @@ public class RouterConfigBuilder {
|
|||
private Configuration conf;
|
||||
|
||||
private boolean enableRpcServer = false;
|
||||
private boolean enableHeartbeat = false;
|
||||
private boolean enableLocalHeartbeat = false;
|
||||
|
||||
public RouterConfigBuilder(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
|
@ -39,6 +41,13 @@ public class RouterConfigBuilder {
|
|||
|
||||
public RouterConfigBuilder all() {
|
||||
this.enableRpcServer = true;
|
||||
this.enableHeartbeat = true;
|
||||
this.enableLocalHeartbeat = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder enableLocalHeartbeat(boolean enable) {
|
||||
this.enableLocalHeartbeat = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -47,12 +56,25 @@ public class RouterConfigBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder heartbeat(boolean enable) {
|
||||
this.enableHeartbeat = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RouterConfigBuilder rpc() {
|
||||
return this.rpc(true);
|
||||
}
|
||||
|
||||
public RouterConfigBuilder heartbeat() {
|
||||
return this.heartbeat(true);
|
||||
}
|
||||
|
||||
public Configuration build() {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
this.enableHeartbeat);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
|
||||
this.enableLocalHeartbeat);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DFSClient;
|
|||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
|
||||
|
@ -753,6 +754,48 @@ public class RouterDFSCluster {
|
|||
return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Switch a namenode in a nameservice to be the active.
|
||||
* @param nsId Nameservice identifier.
|
||||
* @param nnId Namenode identifier.
|
||||
*/
|
||||
public void switchToActive(String nsId, String nnId) {
|
||||
try {
|
||||
int total = cluster.getNumNameNodes();
|
||||
NameNodeInfo[] nns = cluster.getNameNodeInfos();
|
||||
for (int i = 0; i < total; i++) {
|
||||
NameNodeInfo nn = nns[i];
|
||||
if (nn.getNameserviceId().equals(nsId) &&
|
||||
nn.getNamenodeId().equals(nnId)) {
|
||||
cluster.transitionToActive(i);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Cannot transition to active", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Switch a namenode in a nameservice to be in standby.
|
||||
* @param nsId Nameservice identifier.
|
||||
* @param nnId Namenode identifier.
|
||||
*/
|
||||
public void switchToStandby(String nsId, String nnId) {
|
||||
try {
|
||||
int total = cluster.getNumNameNodes();
|
||||
NameNodeInfo[] nns = cluster.getNameNodeInfos();
|
||||
for (int i = 0; i < total; i++) {
|
||||
NameNodeInfo nn = nns[i];
|
||||
if (nn.getNameserviceId().equals(nsId) &&
|
||||
nn.getNamenodeId().equals(nnId)) {
|
||||
cluster.transitionToStandby(i);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Cannot transition to standby", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the federated HDFS cluster.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.MockResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Test the service that heartbeats the state of the namenodes to the State
|
||||
* Store.
|
||||
*/
|
||||
public class TestNamenodeHeartbeat {
|
||||
|
||||
private static RouterDFSCluster cluster;
|
||||
private static ActiveNamenodeResolver namenodeResolver;
|
||||
private static List<NamenodeHeartbeatService> services;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void globalSetUp() throws Exception {
|
||||
|
||||
cluster = new RouterDFSCluster(true, 2);
|
||||
|
||||
// Start NNs and DNs and wait until ready
|
||||
cluster.startCluster();
|
||||
|
||||
// Mock locator that records the heartbeats
|
||||
List<String> nss = cluster.getNameservices();
|
||||
String ns = nss.get(0);
|
||||
Configuration conf = cluster.generateNamenodeConfiguration(ns);
|
||||
namenodeResolver = new MockResolver(conf);
|
||||
namenodeResolver.setRouterId("testrouter");
|
||||
|
||||
// Create one heartbeat service per NN
|
||||
services = new ArrayList<>();
|
||||
for (NamenodeContext nn : cluster.getNamenodes()) {
|
||||
String nsId = nn.getNameserviceId();
|
||||
String nnId = nn.getNamenodeId();
|
||||
NamenodeHeartbeatService service = new NamenodeHeartbeatService(
|
||||
namenodeResolver, nsId, nnId);
|
||||
service.init(conf);
|
||||
service.start();
|
||||
services.add(service);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
cluster.shutdown();
|
||||
for (NamenodeHeartbeatService service: services) {
|
||||
service.stop();
|
||||
service.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamenodeHeartbeatService() throws IOException {
|
||||
|
||||
RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
|
||||
Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
|
||||
NAMESERVICES[0]);
|
||||
NamenodeHeartbeatService server = new NamenodeHeartbeatService(
|
||||
namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
|
||||
server.init(heartbeatConfig);
|
||||
assertEquals(STATE.INITED, server.getServiceState());
|
||||
server.start();
|
||||
assertEquals(STATE.STARTED, server.getServiceState());
|
||||
server.stop();
|
||||
assertEquals(STATE.STOPPED, server.getServiceState());
|
||||
server.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHearbeat() throws InterruptedException, IOException {
|
||||
|
||||
// Set NAMENODE1 to active for all nameservices
|
||||
if (cluster.isHighAvailability()) {
|
||||
for (String ns : cluster.getNameservices()) {
|
||||
cluster.switchToActive(ns, NAMENODES[0]);
|
||||
cluster.switchToStandby(ns, NAMENODES[1]);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for heartbeats to record
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Verify the locator has matching NN entries for each NS
|
||||
for (String ns : cluster.getNameservices()) {
|
||||
List<? extends FederationNamenodeContext> nns =
|
||||
namenodeResolver.getNamenodesForNameserviceId(ns);
|
||||
|
||||
// Active
|
||||
FederationNamenodeContext active = nns.get(0);
|
||||
assertEquals(NAMENODES[0], active.getNamenodeId());
|
||||
|
||||
// Standby
|
||||
FederationNamenodeContext standby = nns.get(1);
|
||||
assertEquals(NAMENODES[1], standby.getNamenodeId());
|
||||
}
|
||||
|
||||
// Switch active NNs in 1/2 nameservices
|
||||
List<String> nss = cluster.getNameservices();
|
||||
String failoverNS = nss.get(0);
|
||||
String normalNs = nss.get(1);
|
||||
|
||||
cluster.switchToStandby(failoverNS, NAMENODES[0]);
|
||||
cluster.switchToActive(failoverNS, NAMENODES[1]);
|
||||
|
||||
// Wait for heartbeats to record
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Verify the locator has recorded the failover for the failover NS
|
||||
List<? extends FederationNamenodeContext> failoverNSs =
|
||||
namenodeResolver.getNamenodesForNameserviceId(failoverNS);
|
||||
// Active
|
||||
FederationNamenodeContext active = failoverNSs.get(0);
|
||||
assertEquals(NAMENODES[1], active.getNamenodeId());
|
||||
|
||||
// Standby
|
||||
FederationNamenodeContext standby = failoverNSs.get(1);
|
||||
assertEquals(NAMENODES[0], standby.getNamenodeId());
|
||||
|
||||
// Verify the locator has the same records for the other ns
|
||||
List<? extends FederationNamenodeContext> normalNss =
|
||||
namenodeResolver.getNamenodesForNameserviceId(normalNs);
|
||||
// Active
|
||||
active = normalNss.get(0);
|
||||
assertEquals(NAMENODES[0], active.getNamenodeId());
|
||||
// Standby
|
||||
standby = normalNss.get(1);
|
||||
assertEquals(NAMENODES[1], standby.getNamenodeId());
|
||||
}
|
||||
}
|
|
@ -98,6 +98,9 @@ public class TestRouter {
|
|||
// Rpc only
|
||||
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
|
||||
|
||||
// Heartbeat only
|
||||
testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
|
||||
|
||||
// Run with all services
|
||||
testRouterStartup(new RouterConfigBuilder(conf).all().build());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue