HDFS-13043. RBF: Expose the state of the Routers in the federation. Contributed by Inigo Goiri.
(cherry picked from commit 6ca7204ceb
)
This commit is contained in:
parent
38c3e17117
commit
6769c78cdd
|
@ -46,6 +46,12 @@ public interface FederationMBean {
|
||||||
*/
|
*/
|
||||||
String getMountTable();
|
String getMountTable();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the latest state of all routers.
|
||||||
|
* @return JSON with all of the known routers or null if failure.
|
||||||
|
*/
|
||||||
|
String getRouters();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the total capacity of the federated cluster.
|
* Get the total capacity of the federated cluster.
|
||||||
* @return Total capacity of the federated cluster.
|
* @return Total capacity of the federated cluster.
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||||
|
@ -61,10 +62,14 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegist
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
|
@ -73,6 +78,8 @@ import org.mortbay.util.ajax.JSON;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the Router metrics collector.
|
* Implementation of the Router metrics collector.
|
||||||
*/
|
*/
|
||||||
|
@ -103,6 +110,8 @@ public class FederationMetrics implements FederationMBean {
|
||||||
private MembershipStore membershipStore;
|
private MembershipStore membershipStore;
|
||||||
/** Mount table store. */
|
/** Mount table store. */
|
||||||
private MountTableStore mountTableStore;
|
private MountTableStore mountTableStore;
|
||||||
|
/** Router state store. */
|
||||||
|
private RouterStore routerStore;
|
||||||
|
|
||||||
|
|
||||||
public FederationMetrics(Router router) throws IOException {
|
public FederationMetrics(Router router) throws IOException {
|
||||||
|
@ -128,6 +137,8 @@ public class FederationMetrics implements FederationMBean {
|
||||||
MembershipStore.class);
|
MembershipStore.class);
|
||||||
this.mountTableStore = stateStore.getRegisteredRecordStore(
|
this.mountTableStore = stateStore.getRegisteredRecordStore(
|
||||||
MountTableStore.class);
|
MountTableStore.class);
|
||||||
|
this.routerStore = stateStore.getRegisteredRecordStore(
|
||||||
|
RouterStore.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,6 +269,63 @@ public class FederationMetrics implements FederationMBean {
|
||||||
return JSON.toString(info);
|
return JSON.toString(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRouters() {
|
||||||
|
final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
|
||||||
|
try {
|
||||||
|
// Get all the routers in order
|
||||||
|
GetRouterRegistrationsRequest request =
|
||||||
|
GetRouterRegistrationsRequest.newInstance();
|
||||||
|
GetRouterRegistrationsResponse response =
|
||||||
|
routerStore.getRouterRegistrations(request);
|
||||||
|
final List<RouterState> routers = response.getRouters();
|
||||||
|
List<RouterState> routersOrder = new ArrayList<>(routers);
|
||||||
|
Collections.sort(routersOrder);
|
||||||
|
|
||||||
|
// Dump router information into JSON
|
||||||
|
for (RouterState record : routersOrder) {
|
||||||
|
Map<String, Object> innerInfo = new HashMap<>();
|
||||||
|
Map<String, Object> map = getJson(record);
|
||||||
|
innerInfo.putAll(map);
|
||||||
|
long dateModified = record.getDateModified();
|
||||||
|
long lastHeartbeat = getSecondsSince(dateModified);
|
||||||
|
innerInfo.put("lastHeartbeat", lastHeartbeat);
|
||||||
|
|
||||||
|
StateStoreVersion stateStoreVersion = record.getStateStoreVersion();
|
||||||
|
if (stateStoreVersion == null) {
|
||||||
|
LOG.error("Cannot get State Store versions");
|
||||||
|
} else {
|
||||||
|
setStateStoreVersions(innerInfo, stateStoreVersion);
|
||||||
|
}
|
||||||
|
|
||||||
|
info.put(record.getPrimaryKey(),
|
||||||
|
Collections.unmodifiableMap(innerInfo));
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Cannot get Routers JSON from the State Store", e);
|
||||||
|
return "{}";
|
||||||
|
}
|
||||||
|
return JSON.toString(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populate the map with the State Store versions.
|
||||||
|
*
|
||||||
|
* @param innerInfo Map with the information.
|
||||||
|
* @param version State Store versions.
|
||||||
|
*/
|
||||||
|
private static void setStateStoreVersions(
|
||||||
|
Map<String, Object> map, StateStoreVersion version) {
|
||||||
|
|
||||||
|
long membershipVersion = version.getMembershipVersion();
|
||||||
|
String lastMembershipUpdate = getDateString(membershipVersion);
|
||||||
|
map.put("lastMembershipUpdate", lastMembershipUpdate);
|
||||||
|
|
||||||
|
long mountTableVersion = version.getMountTableVersion();
|
||||||
|
String lastMountTableDate = getDateString(mountTableVersion);
|
||||||
|
map.put("lastMountTableUpdate", lastMountTableDate);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getTotalCapacity() {
|
public long getTotalCapacity() {
|
||||||
return getNameserviceAggregatedLong("getTotalSpace");
|
return getNameserviceAggregatedLong("getTotalSpace");
|
||||||
|
@ -631,7 +699,8 @@ public class FederationMetrics implements FederationMBean {
|
||||||
* @param time Seconds since 1970.
|
* @param time Seconds since 1970.
|
||||||
* @return String representing the date.
|
* @return String representing the date.
|
||||||
*/
|
*/
|
||||||
private static String getDateString(long time) {
|
@VisibleForTesting
|
||||||
|
static String getDateString(long time) {
|
||||||
if (time <= 0) {
|
if (time <= 0) {
|
||||||
return "-";
|
return "-";
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,12 +107,12 @@ public final class FederationUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch the compile timestamp for this jar.
|
* Fetch the Hadoop version string for this jar.
|
||||||
*
|
*
|
||||||
* @return Date compiled.
|
* @return Hadoop version string, e.g., 3.0.1.
|
||||||
*/
|
*/
|
||||||
public static String getBuildVersion() {
|
public static String getVersion() {
|
||||||
return VersionInfo.getBuildVersion();
|
return VersionInfo.getVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -60,7 +60,7 @@ public abstract class RouterState extends BaseRecord {
|
||||||
record.setAddress(addr);
|
record.setAddress(addr);
|
||||||
record.setStatus(status);
|
record.setStatus(status);
|
||||||
record.setCompileInfo(FederationUtil.getCompileInfo());
|
record.setCompileInfo(FederationUtil.getCompileInfo());
|
||||||
record.setBuildVersion(FederationUtil.getBuildVersion());
|
record.setVersion(FederationUtil.getVersion());
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,9 +78,9 @@ public abstract class RouterState extends BaseRecord {
|
||||||
|
|
||||||
public abstract void setStatus(RouterServiceState newStatus);
|
public abstract void setStatus(RouterServiceState newStatus);
|
||||||
|
|
||||||
public abstract String getBuildVersion();
|
public abstract String getVersion();
|
||||||
|
|
||||||
public abstract void setBuildVersion(String version);
|
public abstract void setVersion(String version);
|
||||||
|
|
||||||
public abstract String getCompileInfo();
|
public abstract String getCompileInfo();
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ public abstract class RouterState extends BaseRecord {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getAddress() + " -> " + getStatus() + "," + getBuildVersion();
|
return getAddress() + " -> " + getStatus() + "," + getVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -133,21 +133,21 @@ public class RouterStatePBImpl extends RouterState implements PBRecord {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getBuildVersion() {
|
public String getVersion() {
|
||||||
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
|
RouterRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
|
||||||
if (!proto.hasBuildVersion()) {
|
if (!proto.hasVersion()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return proto.getBuildVersion();
|
return proto.getVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBuildVersion(String version) {
|
public void setVersion(String version) {
|
||||||
RouterRecordProto.Builder builder = this.translator.getBuilder();
|
RouterRecordProto.Builder builder = this.translator.getBuilder();
|
||||||
if (version == null) {
|
if (version == null) {
|
||||||
builder.clearBuildVersion();
|
builder.clearVersion();
|
||||||
} else {
|
} else {
|
||||||
builder.setBuildVersion(version);
|
builder.setVersion(version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -184,7 +184,7 @@ message RouterRecordProto {
|
||||||
optional string address = 3;
|
optional string address = 3;
|
||||||
optional string status = 4;
|
optional string status = 4;
|
||||||
optional StateStoreVersionRecordProto stateStoreVersion = 5;
|
optional StateStoreVersionRecordProto stateStoreVersion = 5;
|
||||||
optional string buildVersion = 6;
|
optional string version = 6;
|
||||||
optional string compileInfo = 7;
|
optional string compileInfo = 7;
|
||||||
optional uint64 dateStarted = 8;
|
optional uint64 dateStarted = 8;
|
||||||
}
|
}
|
||||||
|
|
|
@ -245,6 +245,47 @@
|
||||||
</small>
|
</small>
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
|
<div class="page-header"><h1>Routers Information</h1></div>
|
||||||
|
<div>
|
||||||
|
<ul class="dfshealth-node-legend">
|
||||||
|
<li class="dfshealth-node-icon dfshealth-node-alive">Active</li>
|
||||||
|
<li class="dfshealth-node-icon dfshealth-node-decommisioned">Safe mode</li>
|
||||||
|
<li class="dfshealth-node-icon dfshealth-node-down">Unavailable</li>
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
<table class="table">
|
||||||
|
<thead>
|
||||||
|
<tr>
|
||||||
|
<th colspan="4"></th>
|
||||||
|
<th colspan="2">Last update</th>
|
||||||
|
<th></th>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th></th>
|
||||||
|
<th>Address</th>
|
||||||
|
<th>Status</th>
|
||||||
|
<th>Heartbeat</th>
|
||||||
|
<th>Membership</th>
|
||||||
|
<th>Mount table</th>
|
||||||
|
<th>Version</th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody>
|
||||||
|
{#Routers}
|
||||||
|
<tr>
|
||||||
|
<td class="dfshealth-node-icon dfshealth-node-{iconState}" title="{title}"></td>
|
||||||
|
<td>{address}</td>
|
||||||
|
<td>{status}</td>
|
||||||
|
<td>{lastHeartbeat} sec ago</td>
|
||||||
|
<td>{lastMembershipUpdate}</td>
|
||||||
|
<td>{lastMountTableUpdate}</td>
|
||||||
|
<td>{version}</td>
|
||||||
|
</tr>
|
||||||
|
{/Routers}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</script>
|
||||||
|
|
||||||
<!-- Datanodes -->
|
<!-- Datanodes -->
|
||||||
<script type="text/x-dust-template" id="tmpl-datanode">
|
<script type="text/x-dust-template" id="tmpl-datanode">
|
||||||
<div class="page-header"><h1>Datanode Information</h1></div>
|
<div class="page-header"><h1>Datanode Information</h1></div>
|
||||||
|
|
|
@ -133,10 +133,36 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function augment_routers(nodes) {
|
||||||
|
for (var i = 0, e = nodes.length; i < e; ++i) {
|
||||||
|
var n = nodes[i];
|
||||||
|
n.title = "Unavailable"
|
||||||
|
n.iconState = "down";
|
||||||
|
if (n.status === "INITIALIZING") {
|
||||||
|
n.title = capitalise(n.status);
|
||||||
|
n.iconState = "alive";
|
||||||
|
} else if (n.status === "RUNNING") {
|
||||||
|
n.title = capitalise(n.status);
|
||||||
|
n.iconState = "alive";
|
||||||
|
} else if (n.status === "SAFEMODE") {
|
||||||
|
n.title = capitalise(n.status);
|
||||||
|
n.iconState = "down-decommisioned";
|
||||||
|
} else if (n.status === "STOPPING") {
|
||||||
|
n.title = capitalise(n.status);
|
||||||
|
n.iconState = "decommisioned";
|
||||||
|
} else if (n.status === "SHUTDOWN") {
|
||||||
|
n.title = capitalise(n.status);
|
||||||
|
n.iconState = "down";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
r.Nameservices = node_map_to_array(JSON.parse(r.Nameservices));
|
r.Nameservices = node_map_to_array(JSON.parse(r.Nameservices));
|
||||||
augment_namenodes(r.Nameservices);
|
augment_namenodes(r.Nameservices);
|
||||||
r.Namenodes = node_map_to_array(JSON.parse(r.Namenodes));
|
r.Namenodes = node_map_to_array(JSON.parse(r.Namenodes));
|
||||||
augment_namenodes(r.Namenodes);
|
augment_namenodes(r.Namenodes);
|
||||||
|
r.Routers = node_map_to_array(JSON.parse(r.Routers));
|
||||||
|
augment_routers(r.Routers);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
||||||
import org.codehaus.jettison.json.JSONArray;
|
import org.codehaus.jettison.json.JSONArray;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
|
@ -195,11 +197,59 @@ public class TestFederationMetrics extends TestMetricsBase {
|
||||||
assertEquals(getNameservices().size(), nameservicesFound);
|
assertEquals(getNameservices().size(), nameservicesFound);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterStatsDataSource() throws IOException, JSONException {
|
||||||
|
|
||||||
|
FederationMetrics metrics = getRouter().getMetrics();
|
||||||
|
String jsonString = metrics.getRouters();
|
||||||
|
JSONObject jsonObject = new JSONObject(jsonString);
|
||||||
|
Iterator<?> keys = jsonObject.keys();
|
||||||
|
int routersFound = 0;
|
||||||
|
while (keys.hasNext()) {
|
||||||
|
JSONObject json = jsonObject.getJSONObject((String) keys.next());
|
||||||
|
String address = json.getString("address");
|
||||||
|
assertNotNullAndNotEmpty(address);
|
||||||
|
RouterState router = findMockRouter(address);
|
||||||
|
assertNotNull(router);
|
||||||
|
|
||||||
|
assertEquals(router.getStatus().toString(), json.getString("status"));
|
||||||
|
assertEquals(router.getCompileInfo(), json.getString("compileInfo"));
|
||||||
|
assertEquals(router.getVersion(), json.getString("version"));
|
||||||
|
assertEquals(router.getDateStarted(), json.getLong("dateStarted"));
|
||||||
|
assertEquals(router.getDateCreated(), json.getLong("dateCreated"));
|
||||||
|
assertEquals(router.getDateModified(), json.getLong("dateModified"));
|
||||||
|
|
||||||
|
StateStoreVersion version = router.getStateStoreVersion();
|
||||||
|
assertEquals(
|
||||||
|
FederationMetrics.getDateString(version.getMembershipVersion()),
|
||||||
|
json.get("lastMembershipUpdate"));
|
||||||
|
assertEquals(
|
||||||
|
FederationMetrics.getDateString(version.getMountTableVersion()),
|
||||||
|
json.get("lastMountTableUpdate"));
|
||||||
|
assertEquals(version.getMembershipVersion(),
|
||||||
|
json.get("membershipVersion"));
|
||||||
|
assertEquals(version.getMountTableVersion(),
|
||||||
|
json.get("mountTableVersion"));
|
||||||
|
routersFound++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(getMockRouters().size(), routersFound);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertNotNullAndNotEmpty(String field) {
|
private void assertNotNullAndNotEmpty(String field) {
|
||||||
assertNotNull(field);
|
assertNotNull(field);
|
||||||
assertTrue(field.length() > 0);
|
assertTrue(field.length() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RouterState findMockRouter(String routerId) {
|
||||||
|
for (RouterState router : getMockRouters()) {
|
||||||
|
if (router.getAddress().equals(routerId)) {
|
||||||
|
return router;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private void validateClusterStatsBean(FederationMBean bean)
|
private void validateClusterStatsBean(FederationMBean bean)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
|
|
@ -34,11 +34,19 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -49,12 +57,14 @@ public class TestMetricsBase {
|
||||||
|
|
||||||
private StateStoreService stateStore;
|
private StateStoreService stateStore;
|
||||||
private MembershipStore membershipStore;
|
private MembershipStore membershipStore;
|
||||||
|
private RouterStore routerStore;
|
||||||
private Router router;
|
private Router router;
|
||||||
private Configuration routerConfig;
|
private Configuration routerConfig;
|
||||||
|
|
||||||
private List<MembershipState> activeMemberships;
|
private List<MembershipState> activeMemberships;
|
||||||
private List<MembershipState> standbyMemberships;
|
private List<MembershipState> standbyMemberships;
|
||||||
private List<MountTable> mockMountTable;
|
private List<MountTable> mockMountTable;
|
||||||
|
private List<RouterState> mockRouters;
|
||||||
private List<String> nameservices;
|
private List<String> nameservices;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -74,6 +84,7 @@ public class TestMetricsBase {
|
||||||
|
|
||||||
membershipStore =
|
membershipStore =
|
||||||
stateStore.getRegisteredRecordStore(MembershipStore.class);
|
stateStore.getRegisteredRecordStore(MembershipStore.class);
|
||||||
|
routerStore = stateStore.getRegisteredRecordStore(RouterStore.class);
|
||||||
|
|
||||||
// Read all data and load all caches
|
// Read all data and load all caches
|
||||||
waitStateStore(stateStore, 10000);
|
waitStateStore(stateStore, 10000);
|
||||||
|
@ -123,6 +134,36 @@ public class TestMetricsBase {
|
||||||
// Add 2 mount table memberships
|
// Add 2 mount table memberships
|
||||||
mockMountTable = createMockMountTable(nameservices);
|
mockMountTable = createMockMountTable(nameservices);
|
||||||
synchronizeRecords(stateStore, mockMountTable, MountTable.class);
|
synchronizeRecords(stateStore, mockMountTable, MountTable.class);
|
||||||
|
|
||||||
|
// Add 2 router memberships in addition to the running router.
|
||||||
|
long t1 = Time.now();
|
||||||
|
mockRouters = new ArrayList<>();
|
||||||
|
RouterState router1 = RouterState.newInstance(
|
||||||
|
"router1", t1, RouterServiceState.RUNNING);
|
||||||
|
router1.setStateStoreVersion(StateStoreVersion.newInstance(
|
||||||
|
t1 - 1000, t1 - 2000));
|
||||||
|
RouterHeartbeatRequest heartbeatRequest =
|
||||||
|
RouterHeartbeatRequest.newInstance(router1);
|
||||||
|
assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus());
|
||||||
|
|
||||||
|
GetRouterRegistrationRequest getRequest =
|
||||||
|
GetRouterRegistrationRequest.newInstance("router1");
|
||||||
|
GetRouterRegistrationResponse getResponse =
|
||||||
|
routerStore.getRouterRegistration(getRequest);
|
||||||
|
RouterState routerState1 = getResponse.getRouter();
|
||||||
|
mockRouters.add(routerState1);
|
||||||
|
|
||||||
|
long t2 = Time.now();
|
||||||
|
RouterState router2 = RouterState.newInstance(
|
||||||
|
"router2", t2, RouterServiceState.RUNNING);
|
||||||
|
router2.setStateStoreVersion(StateStoreVersion.newInstance(
|
||||||
|
t2 - 6000, t2 - 7000));
|
||||||
|
heartbeatRequest.setRouter(router2);
|
||||||
|
assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus());
|
||||||
|
getRequest.setRouterId("router2");
|
||||||
|
getResponse = routerStore.getRouterRegistration(getRequest);
|
||||||
|
RouterState routerState2 = getResponse.getRouter();
|
||||||
|
mockRouters.add(routerState2);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Router getRouter() {
|
protected Router getRouter() {
|
||||||
|
@ -145,6 +186,10 @@ public class TestMetricsBase {
|
||||||
return nameservices;
|
return nameservices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected List<RouterState> getMockRouters() {
|
||||||
|
return mockRouters;
|
||||||
|
}
|
||||||
|
|
||||||
protected StateStoreService getStateStore() {
|
protected StateStoreService getStateStore() {
|
||||||
return stateStore;
|
return stateStore;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.store;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
|
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -125,7 +126,7 @@ public class TestStateStoreRouterState extends TestStateStoreBase {
|
||||||
assertEquals(address, record.getAddress());
|
assertEquals(address, record.getAddress());
|
||||||
assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo());
|
assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo());
|
||||||
// Build version may vary a bit
|
// Build version may vary a bit
|
||||||
assertTrue(record.getBuildVersion().length() > 0);
|
assertFalse(record.getVersion().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -31,7 +31,7 @@ import org.junit.Test;
|
||||||
public class TestRouterState {
|
public class TestRouterState {
|
||||||
|
|
||||||
private static final String ADDRESS = "address";
|
private static final String ADDRESS = "address";
|
||||||
private static final String BUILD_VERSION = "buildVersion";
|
private static final String VERSION = "version";
|
||||||
private static final String COMPILE_INFO = "compileInfo";
|
private static final String COMPILE_INFO = "compileInfo";
|
||||||
private static final long START_TIME = 100;
|
private static final long START_TIME = 100;
|
||||||
private static final long DATE_MODIFIED = 200;
|
private static final long DATE_MODIFIED = 200;
|
||||||
|
@ -42,7 +42,7 @@ public class TestRouterState {
|
||||||
|
|
||||||
private RouterState generateRecord() throws IOException {
|
private RouterState generateRecord() throws IOException {
|
||||||
RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
|
RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
|
||||||
record.setBuildVersion(BUILD_VERSION);
|
record.setVersion(VERSION);
|
||||||
record.setCompileInfo(COMPILE_INFO);
|
record.setCompileInfo(COMPILE_INFO);
|
||||||
record.setDateCreated(DATE_CREATED);
|
record.setDateCreated(DATE_CREATED);
|
||||||
record.setDateModified(DATE_MODIFIED);
|
record.setDateModified(DATE_MODIFIED);
|
||||||
|
@ -58,7 +58,7 @@ public class TestRouterState {
|
||||||
assertEquals(START_TIME, record.getDateStarted());
|
assertEquals(START_TIME, record.getDateStarted());
|
||||||
assertEquals(STATE, record.getStatus());
|
assertEquals(STATE, record.getStatus());
|
||||||
assertEquals(COMPILE_INFO, record.getCompileInfo());
|
assertEquals(COMPILE_INFO, record.getCompileInfo());
|
||||||
assertEquals(BUILD_VERSION, record.getBuildVersion());
|
assertEquals(VERSION, record.getVersion());
|
||||||
|
|
||||||
StateStoreVersion version = record.getStateStoreVersion();
|
StateStoreVersion version = record.getStateStoreVersion();
|
||||||
assertEquals(FILE_RESOLVER_VERSION, version.getMountTableVersion());
|
assertEquals(FILE_RESOLVER_VERSION, version.getMountTableVersion());
|
||||||
|
|
Loading…
Reference in New Issue