HDFS-13347. RBF: Cache datanode reports. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-03-28 11:00:08 +08:00
parent d1e378d02b
commit a71656c1c1
6 changed files with 136 additions and 15 deletions

View File

@ -434,7 +434,7 @@ public class FederationMetrics implements FederationMBean {
try {
RouterRpcServer rpcServer = this.router.getRpcServer();
DatanodeInfo[] live = rpcServer.getDatanodeReport(
DatanodeReportType.LIVE, TIME_OUT);
DatanodeReportType.LIVE, false, TIME_OUT);
if (live.length > 0) {
float totalDfsUsed = 0;

View File

@ -25,6 +25,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -32,6 +34,7 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -39,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
@ -56,6 +60,10 @@ import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* Expose the Namenode metrics as the Router was one.
*/
@ -65,6 +73,22 @@ public class NamenodeBeanMetrics
private static final Logger LOG =
LoggerFactory.getLogger(NamenodeBeanMetrics.class);
/** Prevent holding the page from loading too long. */
private static final String DN_REPORT_TIME_OUT =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out";
/** We only wait for 1 second. */
private static final long DN_REPORT_TIME_OUT_DEFAULT =
TimeUnit.SECONDS.toMillis(1);
/** Time to cache the DN information. */
public static final String DN_REPORT_CACHE_EXPIRE =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire";
/** We cache the DN information for 10 seconds by default. */
public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
/** Instance of the Router being monitored. */
private final Router router;
/** FSNamesystem bean. */
@ -76,6 +100,11 @@ public class NamenodeBeanMetrics
/** NameNodeStatus bean. */
private ObjectName nnStatusBeanName;
/** Timeout to get the DN report. */
private final long dnReportTimeOut;
/** DN type -> full DN report in JSON. */
private final LoadingCache<DatanodeReportType, String> dnCache;
public NamenodeBeanMetrics(Router router) {
this.router = router;
@ -114,6 +143,23 @@ public class NamenodeBeanMetrics
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
}
// Initialize the cache for the DN reports
Configuration conf = router.getConfig();
this.dnReportTimeOut = conf.getTimeDuration(
DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS);
long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS);
this.dnCache = CacheBuilder.newBuilder()
.expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
.build(
new CacheLoader<DatanodeReportType, String>() {
@Override
public String load(DatanodeReportType type) throws Exception {
return getNodesImpl(type);
}
});
}
/**
@ -299,16 +345,32 @@ public class NamenodeBeanMetrics
}
/**
* Get all the nodes in the federation from a particular type.
* TODO this is expensive, we may want to cache it.
* Get all the nodes in the federation from a particular type. Getting this
* information is expensive and we use a cache.
* @param type Type of the datanodes to check.
* @return JSON with the nodes.
*/
private String getNodes(DatanodeReportType type) {
private String getNodes(final DatanodeReportType type) {
try {
return this.dnCache.get(type);
} catch (ExecutionException e) {
LOG.error("Cannot get the DN storage report for {}", type, e);
}
// If we cannot get the report, return empty JSON
return "{}";
}
/**
* Get all the nodes in the federation from a particular type.
* @param type Type of the datanodes to check.
* @return JSON with the nodes.
*/
private String getNodesImpl(final DatanodeReportType type) {
final Map<String, Map<String, Object>> info = new HashMap<>();
try {
RouterRpcServer rpcServer = this.router.getRpcServer();
DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
DatanodeInfo[] datanodes =
rpcServer.getDatanodeReport(type, false, dnReportTimeOut);
for (DatanodeInfo node : datanodes) {
Map<String, Object> innerinfo = new HashMap<>();
innerinfo.put("infoAddr", node.getInfoAddr());

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
@ -552,6 +553,18 @@ public class Router extends CompositeService {
return null;
}
/**
* Get the Namenode metrics.
*
* @return Namenode metrics.
*/
public NamenodeBeanMetrics getNamenodeMetrics() {
if (this.metrics != null) {
return this.metrics.getNamenodeMetrics();
}
return null;
}
/**
* Get the subcluster resolver for files.
*

View File

@ -94,6 +94,15 @@ public class RouterMetricsService extends AbstractService {
return this.federationMetrics;
}
/**
* Get the Namenode metrics.
*
* @return Namenode metrics.
*/
public NamenodeBeanMetrics getNamenodeMetrics() {
return this.nnMetrics;
}
/**
* Get the JVM metrics for the Router.
*

View File

@ -1237,18 +1237,20 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);
return getDatanodeReport(type, 0);
return getDatanodeReport(type, true, 0);
}
/**
* Get the datanode report with a timeout.
* @param type Type of the datanode.
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getDatanodeReport(
DatanodeReportType type, long timeOutMs) throws IOException {
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);
Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
@ -1257,8 +1259,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(
nss, method, true, false, timeOutMs, DatanodeInfo[].class);
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
@ -1278,9 +1280,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
}
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()];
combinedData = datanodes.toArray(combinedData);
return combinedData;
return toArray(datanodes, DatanodeInfo.class);
}
@Override // ClientProtocol
@ -2295,7 +2295,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @param clazz Class of the values.
* @return Array with the values in set.
*/
private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
private static <T> T[] toArray(Collection<T> set, Class<T> clazz) {
@SuppressWarnings("unchecked")
T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
combinedData = set.toArray(combinedData);

View File

@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -45,6 +46,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
@ -71,10 +73,11 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
@ -83,6 +86,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -150,7 +154,14 @@ public class TestRouterRpc {
cluster.startCluster();
// Start routers with only an RPC service
cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.build();
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
// Register and verify all NNs with all routers
@ -1032,6 +1043,32 @@ public class TestRouterRpc {
assertEquals(statsNamenode.toString(), statsRouter.toString());
}
@Test
public void testNamenodeMetrics() throws Exception {
final NamenodeBeanMetrics metrics =
router.getRouter().getNamenodeMetrics();
final String jsonString0 = metrics.getLiveNodes();
// We should have 12 nodes in total
JSONObject jsonObject = new JSONObject(jsonString0);
assertEquals(12, jsonObject.names().length());
// We should be caching this information
String jsonString1 = metrics.getLiveNodes();
assertEquals(jsonString0, jsonString1);
// We wait until the cached value is updated
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !jsonString0.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
// The cache should be updated now
assertNotEquals(jsonString0, metrics.getLiveNodes());
}
/**
* Check the erasure coding policies in the Router and the Namenode.
* @return The erasure coding policies.