HDFS-13347. RBF: Cache datanode reports. Contributed by Inigo Goiri.
(cherry picked from commit a71656c1c1
)
This commit is contained in:
parent
c6939e9330
commit
c903efeb51
|
@ -429,7 +429,7 @@ public class FederationMetrics implements FederationMBean {
|
|||
try {
|
||||
RouterRpcServer rpcServer = this.router.getRpcServer();
|
||||
DatanodeInfo[] live = rpcServer.getDatanodeReport(
|
||||
DatanodeReportType.LIVE, TIME_OUT);
|
||||
DatanodeReportType.LIVE, false, TIME_OUT);
|
||||
|
||||
if (live.length > 0) {
|
||||
float totalDfsUsed = 0;
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -32,6 +34,7 @@ import javax.management.NotCompliantMBeanException;
|
|||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -39,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||
|
@ -56,6 +60,10 @@ import org.eclipse.jetty.util.ajax.JSON;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
|
||||
/**
|
||||
* Expose the Namenode metrics as the Router was one.
|
||||
*/
|
||||
|
@ -65,6 +73,22 @@ public class NamenodeBeanMetrics
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NamenodeBeanMetrics.class);
|
||||
|
||||
/** Prevent holding the page from loading too long. */
|
||||
private static final String DN_REPORT_TIME_OUT =
|
||||
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out";
|
||||
/** We only wait for 1 second. */
|
||||
private static final long DN_REPORT_TIME_OUT_DEFAULT =
|
||||
TimeUnit.SECONDS.toMillis(1);
|
||||
|
||||
/** Time to cache the DN information. */
|
||||
public static final String DN_REPORT_CACHE_EXPIRE =
|
||||
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire";
|
||||
/** We cache the DN information for 10 seconds by default. */
|
||||
public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT =
|
||||
TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
|
||||
/** Instance of the Router being monitored. */
|
||||
private final Router router;
|
||||
|
||||
/** FSNamesystem bean. */
|
||||
|
@ -76,6 +100,11 @@ public class NamenodeBeanMetrics
|
|||
/** NameNodeStatus bean. */
|
||||
private ObjectName nnStatusBeanName;
|
||||
|
||||
/** Timeout to get the DN report. */
|
||||
private final long dnReportTimeOut;
|
||||
/** DN type -> full DN report in JSON. */
|
||||
private final LoadingCache<DatanodeReportType, String> dnCache;
|
||||
|
||||
|
||||
public NamenodeBeanMetrics(Router router) {
|
||||
this.router = router;
|
||||
|
@ -114,6 +143,23 @@ public class NamenodeBeanMetrics
|
|||
} catch (NotCompliantMBeanException e) {
|
||||
throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
|
||||
}
|
||||
|
||||
// Initialize the cache for the DN reports
|
||||
Configuration conf = router.getConfig();
|
||||
this.dnReportTimeOut = conf.getTimeDuration(
|
||||
DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
long dnCacheExpire = conf.getTimeDuration(
|
||||
DN_REPORT_CACHE_EXPIRE,
|
||||
DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
this.dnCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
|
||||
.build(
|
||||
new CacheLoader<DatanodeReportType, String>() {
|
||||
@Override
|
||||
public String load(DatanodeReportType type) throws Exception {
|
||||
return getNodesImpl(type);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -294,16 +340,32 @@ public class NamenodeBeanMetrics
|
|||
}
|
||||
|
||||
/**
|
||||
* Get all the nodes in the federation from a particular type.
|
||||
* TODO this is expensive, we may want to cache it.
|
||||
* Get all the nodes in the federation from a particular type. Getting this
|
||||
* information is expensive and we use a cache.
|
||||
* @param type Type of the datanodes to check.
|
||||
* @return JSON with the nodes.
|
||||
*/
|
||||
private String getNodes(DatanodeReportType type) {
|
||||
private String getNodes(final DatanodeReportType type) {
|
||||
try {
|
||||
return this.dnCache.get(type);
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Cannot get the DN storage report for {}", type, e);
|
||||
}
|
||||
// If we cannot get the report, return empty JSON
|
||||
return "{}";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the nodes in the federation from a particular type.
|
||||
* @param type Type of the datanodes to check.
|
||||
* @return JSON with the nodes.
|
||||
*/
|
||||
private String getNodesImpl(final DatanodeReportType type) {
|
||||
final Map<String, Map<String, Object>> info = new HashMap<>();
|
||||
try {
|
||||
RouterRpcServer rpcServer = this.router.getRpcServer();
|
||||
DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
|
||||
DatanodeInfo[] datanodes =
|
||||
rpcServer.getDatanodeReport(type, false, dnReportTimeOut);
|
||||
for (DatanodeInfo node : datanodes) {
|
||||
Map<String, Object> innerinfo = new HashMap<>();
|
||||
innerinfo.put("infoAddr", node.getInfoAddr());
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||
|
@ -541,6 +542,18 @@ public class Router extends CompositeService {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Namenode metrics.
|
||||
*
|
||||
* @return Namenode metrics.
|
||||
*/
|
||||
public NamenodeBeanMetrics getNamenodeMetrics() {
|
||||
if (this.metrics != null) {
|
||||
return this.metrics.getNamenodeMetrics();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the subcluster resolver for files.
|
||||
*
|
||||
|
|
|
@ -94,6 +94,15 @@ public class RouterMetricsService extends AbstractService {
|
|||
return this.federationMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Namenode metrics.
|
||||
*
|
||||
* @return Namenode metrics.
|
||||
*/
|
||||
public NamenodeBeanMetrics getNamenodeMetrics() {
|
||||
return this.nnMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the JVM metrics for the Router.
|
||||
*
|
||||
|
|
|
@ -1220,18 +1220,20 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
return getDatanodeReport(type, 0);
|
||||
return getDatanodeReport(type, true, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the datanode report with a timeout.
|
||||
* @param type Type of the datanode.
|
||||
* @param requireResponse If we require all the namespaces to report.
|
||||
* @param timeOutMs Time out for the reply in milliseconds.
|
||||
* @return List of datanodes.
|
||||
* @throws IOException If it cannot get the report.
|
||||
*/
|
||||
public DatanodeInfo[] getDatanodeReport(
|
||||
DatanodeReportType type, long timeOutMs) throws IOException {
|
||||
DatanodeReportType type, boolean requireResponse, long timeOutMs)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
|
||||
Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
|
||||
|
@ -1240,8 +1242,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
|
||||
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
||||
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
|
||||
rpcClient.invokeConcurrent(
|
||||
nss, method, true, false, timeOutMs, DatanodeInfo[].class);
|
||||
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
|
||||
timeOutMs, DatanodeInfo[].class);
|
||||
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
|
||||
results.entrySet()) {
|
||||
FederationNamespaceInfo ns = entry.getKey();
|
||||
|
@ -1261,9 +1263,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
}
|
||||
// Map -> Array
|
||||
Collection<DatanodeInfo> datanodes = datanodesMap.values();
|
||||
DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()];
|
||||
combinedData = datanodes.toArray(combinedData);
|
||||
return combinedData;
|
||||
return toArray(datanodes, DatanodeInfo.class);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -2268,7 +2268,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
* @param clazz Class of the values.
|
||||
* @return Array with the values in set.
|
||||
*/
|
||||
private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
|
||||
private static <T> T[] toArray(Collection<T> set, Class<T> clazz) {
|
||||
@SuppressWarnings("unchecked")
|
||||
T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
|
||||
combinedData = set.toArray(combinedData);
|
||||
|
|
|
@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -45,6 +46,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
|
@ -71,10 +73,11 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
|
@ -83,6 +86,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -150,7 +154,14 @@ public class TestRouterRpc {
|
|||
cluster.startCluster();
|
||||
|
||||
// Start routers with only an RPC service
|
||||
cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
|
||||
Configuration routerConf = new RouterConfigBuilder()
|
||||
.metrics()
|
||||
.rpc()
|
||||
.build();
|
||||
// We decrease the DN cache times to make the test faster
|
||||
routerConf.setTimeDuration(
|
||||
NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
|
||||
cluster.addRouterOverrides(routerConf);
|
||||
cluster.startRouters();
|
||||
|
||||
// Register and verify all NNs with all routers
|
||||
|
@ -1032,6 +1043,32 @@ public class TestRouterRpc {
|
|||
assertEquals(statsNamenode.toString(), statsRouter.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamenodeMetrics() throws Exception {
|
||||
final NamenodeBeanMetrics metrics =
|
||||
router.getRouter().getNamenodeMetrics();
|
||||
final String jsonString0 = metrics.getLiveNodes();
|
||||
|
||||
// We should have 12 nodes in total
|
||||
JSONObject jsonObject = new JSONObject(jsonString0);
|
||||
assertEquals(12, jsonObject.names().length());
|
||||
|
||||
// We should be caching this information
|
||||
String jsonString1 = metrics.getLiveNodes();
|
||||
assertEquals(jsonString0, jsonString1);
|
||||
|
||||
// We wait until the cached value is updated
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return !jsonString0.equals(metrics.getLiveNodes());
|
||||
}
|
||||
}, 500, 5 * 1000);
|
||||
|
||||
// The cache should be updated now
|
||||
assertNotEquals(jsonString0, metrics.getLiveNodes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the erasure coding policies in the Router and the Namenode.
|
||||
* @return The erasure coding policies.
|
||||
|
|
Loading…
Reference in New Issue