HDFS-14891. RBF: namenode links in NameFederation Health page (federationhealth.html) cannot use https scheme. Contributed by Xieming Li
This commit is contained in:
parent
b643a1cbe8
commit
7901062707
|
@ -44,9 +44,16 @@ public interface FederationNamenodeContext {
|
||||||
String getLifelineAddress();
|
String getLifelineAddress();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the HTTP server address of the namenode.
|
* Get the Scheme of web address of the namenode.
|
||||||
*
|
*
|
||||||
* @return HTTP address in the form of host:port.
|
* @return Scheme of web address (HTTP/HTTPS).
|
||||||
|
*/
|
||||||
|
String getWebScheme();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the HTTP(s) server address of the namenode.
|
||||||
|
*
|
||||||
|
* @return HTTP(s) address in the form of host:port.
|
||||||
*/
|
*/
|
||||||
String getWebAddress();
|
String getWebAddress();
|
||||||
|
|
||||||
|
|
|
@ -260,7 +260,8 @@ public class MembershipNamenodeResolver
|
||||||
routerId, report.getNameserviceId(), report.getNamenodeId(),
|
routerId, report.getNameserviceId(), report.getNamenodeId(),
|
||||||
report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
|
report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
|
||||||
report.getServiceAddress(), report.getLifelineAddress(),
|
report.getServiceAddress(), report.getLifelineAddress(),
|
||||||
report.getWebAddress(), report.getState(), report.getSafemode());
|
report.getWebScheme(), report.getWebAddress(), report.getState(),
|
||||||
|
report.getSafemode());
|
||||||
|
|
||||||
if (report.statsValid()) {
|
if (report.statsValid()) {
|
||||||
MembershipStats stats = MembershipStats.newInstance();
|
MembershipStats stats = MembershipStats.newInstance();
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class NamenodeStatusReport {
|
||||||
private String serviceAddress = "";
|
private String serviceAddress = "";
|
||||||
private String lifelineAddress = "";
|
private String lifelineAddress = "";
|
||||||
private String webAddress = "";
|
private String webAddress = "";
|
||||||
|
private String webScheme = "";
|
||||||
|
|
||||||
/** Namenode state. */
|
/** Namenode state. */
|
||||||
private HAServiceState status = HAServiceState.STANDBY;
|
private HAServiceState status = HAServiceState.STANDBY;
|
||||||
|
@ -76,12 +77,13 @@ public class NamenodeStatusReport {
|
||||||
private boolean haStateValid = false;
|
private boolean haStateValid = false;
|
||||||
|
|
||||||
public NamenodeStatusReport(String ns, String nn, String rpc, String service,
|
public NamenodeStatusReport(String ns, String nn, String rpc, String service,
|
||||||
String lifeline, String web) {
|
String lifeline, String webScheme, String web) {
|
||||||
this.nameserviceId = ns;
|
this.nameserviceId = ns;
|
||||||
this.namenodeId = nn;
|
this.namenodeId = nn;
|
||||||
this.rpcAddress = rpc;
|
this.rpcAddress = rpc;
|
||||||
this.serviceAddress = service;
|
this.serviceAddress = service;
|
||||||
this.lifelineAddress = lifeline;
|
this.lifelineAddress = lifeline;
|
||||||
|
this.webScheme = webScheme;
|
||||||
this.webAddress = web;
|
this.webAddress = web;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,6 +201,15 @@ public class NamenodeStatusReport {
|
||||||
return this.webAddress;
|
return this.webAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the scheme of web address.
|
||||||
|
*
|
||||||
|
* @return The scheme of web address.
|
||||||
|
*/
|
||||||
|
public String getWebScheme() {
|
||||||
|
return this.webScheme;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the HA service state.
|
* Set the HA service state.
|
||||||
*
|
*
|
||||||
|
|
|
@ -248,7 +248,8 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
*/
|
*/
|
||||||
protected NamenodeStatusReport getNamenodeStatusReport() {
|
protected NamenodeStatusReport getNamenodeStatusReport() {
|
||||||
NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
|
NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
|
||||||
namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
|
namenodeId, rpcAddress, serviceAddress,
|
||||||
|
lifelineAddress, scheme, webAddress);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LOG.debug("Probing NN at service address: {}", serviceAddress);
|
LOG.debug("Probing NN at service address: {}", serviceAddress);
|
||||||
|
|
|
@ -89,14 +89,16 @@ public abstract class MembershipState extends BaseRecord
|
||||||
* @param rpcAddress RPC address.
|
* @param rpcAddress RPC address.
|
||||||
* @param serviceAddress Service RPC address.
|
* @param serviceAddress Service RPC address.
|
||||||
* @param lifelineAddress Lifeline RPC address.
|
* @param lifelineAddress Lifeline RPC address.
|
||||||
* @param webAddress HTTP address.
|
* @param webScheme Scheme of Web Address, HTTP or HTTPS.
|
||||||
|
* @param webAddress HTTP(s) address.
|
||||||
* @param state State of the federation.
|
* @param state State of the federation.
|
||||||
* @param safemode If the safe mode is enabled.
|
* @param safemode If the safe mode is enabled.
|
||||||
* @return Membership instance.
|
* @return Membership instance.
|
||||||
*/
|
*/
|
||||||
public static MembershipState newInstance(String router, String nameservice,
|
public static MembershipState newInstance(String router, String nameservice,
|
||||||
String namenode, String clusterId, String blockPoolId, String rpcAddress,
|
String namenode, String clusterId, String blockPoolId, String rpcAddress,
|
||||||
String serviceAddress, String lifelineAddress, String webAddress,
|
String serviceAddress, String lifelineAddress,
|
||||||
|
String webScheme, String webAddress,
|
||||||
FederationNamenodeServiceState state, boolean safemode) {
|
FederationNamenodeServiceState state, boolean safemode) {
|
||||||
|
|
||||||
MembershipState record = MembershipState.newInstance();
|
MembershipState record = MembershipState.newInstance();
|
||||||
|
@ -111,6 +113,7 @@ public abstract class MembershipState extends BaseRecord
|
||||||
record.setState(state);
|
record.setState(state);
|
||||||
record.setClusterId(clusterId);
|
record.setClusterId(clusterId);
|
||||||
record.setBlockPoolId(blockPoolId);
|
record.setBlockPoolId(blockPoolId);
|
||||||
|
record.setWebScheme(webScheme);
|
||||||
record.validate();
|
record.validate();
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
@ -139,6 +142,8 @@ public abstract class MembershipState extends BaseRecord
|
||||||
|
|
||||||
public abstract void setState(FederationNamenodeServiceState state);
|
public abstract void setState(FederationNamenodeServiceState state);
|
||||||
|
|
||||||
|
public abstract void setWebScheme(String webScheme);
|
||||||
|
|
||||||
public abstract String getNameserviceId();
|
public abstract String getNameserviceId();
|
||||||
|
|
||||||
public abstract String getNamenodeId();
|
public abstract String getNamenodeId();
|
||||||
|
@ -157,6 +162,8 @@ public abstract class MembershipState extends BaseRecord
|
||||||
|
|
||||||
public abstract boolean getIsSafeMode();
|
public abstract boolean getIsSafeMode();
|
||||||
|
|
||||||
|
public abstract String getWebScheme();
|
||||||
|
|
||||||
public abstract FederationNamenodeServiceState getState();
|
public abstract FederationNamenodeServiceState getState();
|
||||||
|
|
||||||
public abstract void setStats(MembershipStats stats);
|
public abstract void setStats(MembershipStats stats);
|
||||||
|
|
|
@ -165,6 +165,16 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWebScheme(String webScheme) {
|
||||||
|
Builder builder = this.translator.getBuilder();
|
||||||
|
if (webScheme == null) {
|
||||||
|
builder.clearWebScheme();
|
||||||
|
} else {
|
||||||
|
builder.setWebScheme(webScheme);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRouterId() {
|
public String getRouterId() {
|
||||||
NamenodeMembershipRecordProtoOrBuilder proto =
|
NamenodeMembershipRecordProtoOrBuilder proto =
|
||||||
|
@ -277,6 +287,16 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWebScheme() {
|
||||||
|
NamenodeMembershipRecordProtoOrBuilder proto =
|
||||||
|
this.translator.getProtoOrBuilder();
|
||||||
|
if (!proto.hasWebScheme()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return this.translator.getProtoOrBuilder().getWebScheme();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setStats(MembershipStats stats) {
|
public void setStats(MembershipStats stats) {
|
||||||
if (stats instanceof MembershipStatsPBImpl) {
|
if (stats instanceof MembershipStatsPBImpl) {
|
||||||
|
|
|
@ -68,6 +68,7 @@ message NamenodeMembershipRecordProto {
|
||||||
optional bool isSafeMode = 14;
|
optional bool isSafeMode = 14;
|
||||||
|
|
||||||
optional NamenodeMembershipStatsRecordProto stats = 15;
|
optional NamenodeMembershipStatsRecordProto stats = 15;
|
||||||
|
optional string webScheme = 16;
|
||||||
}
|
}
|
||||||
|
|
||||||
message FederationNamespaceInfoProto {
|
message FederationNamespaceInfoProto {
|
||||||
|
|
|
@ -166,8 +166,8 @@
|
||||||
{#Nameservices}
|
{#Nameservices}
|
||||||
<tr>
|
<tr>
|
||||||
<td class="federationhealth-namenode-icon federationhealth-namenode-{iconState}" title="{title}"></td>
|
<td class="federationhealth-namenode-icon federationhealth-namenode-{iconState}" title="{title}"></td>
|
||||||
<td><a href="http://{webAddress}">{nameserviceId}</a></td>
|
<td><a href="{webScheme}://{webAddress}">{nameserviceId}</a></td>
|
||||||
<td><a href="http://{webAddress}">{namenodeId}</a></td>
|
<td><a href="{webScheme}://{webAddress}">{namenodeId}</a></td>
|
||||||
<td>{lastHeartbeat}</td>
|
<td>{lastHeartbeat}</td>
|
||||||
<td ng-value="{usedPercentage}" style="width:210px">
|
<td ng-value="{usedPercentage}" style="width:210px">
|
||||||
<div>
|
<div>
|
||||||
|
@ -235,7 +235,7 @@
|
||||||
<td class="federationhealth-namenode-icon federationhealth-namenode-{iconState}" title="{title}"></td>
|
<td class="federationhealth-namenode-icon federationhealth-namenode-{iconState}" title="{title}"></td>
|
||||||
<td>{nameserviceId}</td>
|
<td>{nameserviceId}</td>
|
||||||
<td>{namenodeId}</td>
|
<td>{namenodeId}</td>
|
||||||
<td><a href="http://{webAddress}">{webAddress}</a></td>
|
<td><a href="{webScheme}://{webAddress}">{webScheme}://{webAddress}</a></td>
|
||||||
<td>{lastHeartbeat}</td>
|
<td>{lastHeartbeat}</td>
|
||||||
<td ng-value="{usedPercentage}" style="width:210px">
|
<td ng-value="{usedPercentage}" style="width:210px">
|
||||||
<div>
|
<div>
|
||||||
|
|
|
@ -140,7 +140,8 @@ public final class FederationTestUtils {
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn,
|
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn,
|
||||||
"localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000),
|
"localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000),
|
||||||
"localhost:" + rand.nextInt(10000), "testwebaddress-" + ns + nn);
|
"localhost:" + rand.nextInt(10000), "http",
|
||||||
|
"testwebaddress-" + ns + nn);
|
||||||
if (state == null) {
|
if (state == null) {
|
||||||
// Unavailable, no additional info
|
// Unavailable, no additional info
|
||||||
return report;
|
return report;
|
||||||
|
|
|
@ -859,7 +859,7 @@ public class MiniRouterDFSCluster {
|
||||||
NamenodeStatusReport report = new NamenodeStatusReport(
|
NamenodeStatusReport report = new NamenodeStatusReport(
|
||||||
nn.nameserviceId, nn.namenodeId,
|
nn.nameserviceId, nn.namenodeId,
|
||||||
nn.getRpcAddress(), nn.getServiceAddress(),
|
nn.getRpcAddress(), nn.getServiceAddress(),
|
||||||
nn.getLifelineAddress(), nn.getWebAddress());
|
nn.getLifelineAddress(), "http", nn.getWebAddress());
|
||||||
FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
|
FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
|
||||||
NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
|
NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
|
||||||
report.setNamespaceInfo(nsInfo);
|
report.setNamespaceInfo(nsInfo);
|
||||||
|
|
|
@ -539,8 +539,10 @@ public class MockNamenode {
|
||||||
String nsId = nn.getNameserviceId();
|
String nsId = nn.getNameserviceId();
|
||||||
String rpcAddress = "localhost:" + nn.getRPCPort();
|
String rpcAddress = "localhost:" + nn.getRPCPort();
|
||||||
String httpAddress = "localhost:" + nn.getHTTPPort();
|
String httpAddress = "localhost:" + nn.getHTTPPort();
|
||||||
|
String scheme = "http";
|
||||||
NamenodeStatusReport report = new NamenodeStatusReport(
|
NamenodeStatusReport report = new NamenodeStatusReport(
|
||||||
nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
|
nsId, null, rpcAddress, rpcAddress,
|
||||||
|
rpcAddress, scheme, httpAddress);
|
||||||
if (unavailableSubclusters.contains(nsId)) {
|
if (unavailableSubclusters.contains(nsId)) {
|
||||||
LOG.info("Register {} as UNAVAILABLE", nsId);
|
LOG.info("Register {} as UNAVAILABLE", nsId);
|
||||||
report.setRegistrationValid(false);
|
report.setRegistrationValid(false);
|
||||||
|
|
|
@ -155,12 +155,14 @@ public class MockResolver
|
||||||
return Collections.unmodifiableList(new ArrayList<>(namenodes));
|
return Collections.unmodifiableList(new ArrayList<>(namenodes));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("checkstyle:ParameterNumber")
|
||||||
private static class MockNamenodeContext
|
private static class MockNamenodeContext
|
||||||
implements FederationNamenodeContext {
|
implements FederationNamenodeContext {
|
||||||
|
|
||||||
private String namenodeId;
|
private String namenodeId;
|
||||||
private String nameserviceId;
|
private String nameserviceId;
|
||||||
|
|
||||||
|
private String webScheme;
|
||||||
private String webAddress;
|
private String webAddress;
|
||||||
private String rpcAddress;
|
private String rpcAddress;
|
||||||
private String serviceAddress;
|
private String serviceAddress;
|
||||||
|
@ -170,11 +172,12 @@ public class MockResolver
|
||||||
private long dateModified;
|
private long dateModified;
|
||||||
|
|
||||||
MockNamenodeContext(
|
MockNamenodeContext(
|
||||||
String rpc, String service, String lifeline, String web,
|
String rpc, String service, String lifeline, String scheme, String web,
|
||||||
String ns, String nn, FederationNamenodeServiceState state) {
|
String ns, String nn, FederationNamenodeServiceState state) {
|
||||||
this.rpcAddress = rpc;
|
this.rpcAddress = rpc;
|
||||||
this.serviceAddress = service;
|
this.serviceAddress = service;
|
||||||
this.lifelineAddress = lifeline;
|
this.lifelineAddress = lifeline;
|
||||||
|
this.webScheme = scheme;
|
||||||
this.webAddress = web;
|
this.webAddress = web;
|
||||||
this.namenodeId = nn;
|
this.namenodeId = nn;
|
||||||
this.nameserviceId = ns;
|
this.nameserviceId = ns;
|
||||||
|
@ -202,6 +205,11 @@ public class MockResolver
|
||||||
return lifelineAddress;
|
return lifelineAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWebScheme() {
|
||||||
|
return webScheme;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getWebAddress() {
|
public String getWebAddress() {
|
||||||
return webAddress;
|
return webAddress;
|
||||||
|
@ -242,8 +250,9 @@ public class MockResolver
|
||||||
|
|
||||||
MockNamenodeContext context = new MockNamenodeContext(
|
MockNamenodeContext context = new MockNamenodeContext(
|
||||||
report.getRpcAddress(), report.getServiceAddress(),
|
report.getRpcAddress(), report.getServiceAddress(),
|
||||||
report.getLifelineAddress(), report.getWebAddress(),
|
report.getLifelineAddress(), report.getWebScheme(),
|
||||||
report.getNameserviceId(), report.getNamenodeId(), report.getState());
|
report.getWebAddress(), report.getNameserviceId(),
|
||||||
|
report.getNamenodeId(), report.getState());
|
||||||
|
|
||||||
String nsId = report.getNameserviceId();
|
String nsId = report.getNameserviceId();
|
||||||
String bpId = report.getBlockPoolId();
|
String bpId = report.getBlockPoolId();
|
||||||
|
|
|
@ -250,7 +250,8 @@ public class TestMetricsBase {
|
||||||
MembershipState record =
|
MembershipState record =
|
||||||
MembershipState.newInstance(routerId, ns, nn, "testcluster",
|
MembershipState.newInstance(routerId, ns, nn, "testcluster",
|
||||||
"testblock-" + ns, "testrpc-" + ns + nn, "testservice-" + ns + nn,
|
"testblock-" + ns, "testrpc-" + ns + nn, "testservice-" + ns + nn,
|
||||||
"testlifeline-" + ns + nn, "testweb-" + ns + nn, state, false);
|
"testlifeline-" + ns + nn, "http", "testweb-" + ns + nn,
|
||||||
|
state, false);
|
||||||
NamenodeHeartbeatRequest request =
|
NamenodeHeartbeatRequest request =
|
||||||
NamenodeHeartbeatRequest.newInstance(record);
|
NamenodeHeartbeatRequest.newInstance(record);
|
||||||
NamenodeHeartbeatResponse response =
|
NamenodeHeartbeatResponse response =
|
||||||
|
|
|
@ -156,6 +156,7 @@ public class TestRBFMetrics extends TestMetricsBase {
|
||||||
stats.getNumOfEnteringMaintenanceDataNodes());
|
stats.getNumOfEnteringMaintenanceDataNodes());
|
||||||
assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
|
assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
|
||||||
assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
|
assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
|
||||||
|
assertEquals(json.getString("webScheme"), mockEntry.getWebScheme());
|
||||||
assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
|
assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
|
||||||
nnsFound++;
|
nnsFound++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class TestLocalResolver {
|
||||||
private MembershipState newMembershipState(String addr, String nsId) {
|
private MembershipState newMembershipState(String addr, String nsId) {
|
||||||
return MembershipState.newInstance(
|
return MembershipState.newInstance(
|
||||||
"routerId", nsId, "nn0", "cluster0", "blockPool0",
|
"routerId", nsId, "nn0", "cluster0", "blockPool0",
|
||||||
addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004",
|
addr + ":8001", addr + ":8002", addr + ":8003", "http", addr + ":8004",
|
||||||
FederationNamenodeServiceState.ACTIVE, false);
|
FederationNamenodeServiceState.ACTIVE, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,205 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the scheme of Http address of Namenodes displayed in Router.
|
||||||
|
* This feature is managed by {@link DFSConfigKeys#DFS_HTTP_POLICY_KEY}
|
||||||
|
*/
|
||||||
|
public class TestRouterNamenodeWebScheme {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestRouterNamenodeWebScheme.class);
|
||||||
|
|
||||||
|
/** Router for the test. */
|
||||||
|
private Router router;
|
||||||
|
/** Namenodes in the cluster. */
|
||||||
|
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
|
||||||
|
/** Nameservices in the federated cluster. */
|
||||||
|
private List<String> nsIds = asList("ns0", "ns1");
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
LOG.info("Initialize the Mock Namenodes to monitor");
|
||||||
|
for (String nsId : nsIds) {
|
||||||
|
nns.put(nsId, new HashMap<>());
|
||||||
|
for (String nnId : asList("nn0", "nn1")) {
|
||||||
|
nns.get(nsId).put(nnId, new MockNamenode(nsId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Set nn0 to active for all nameservices");
|
||||||
|
for (Map<String, MockNamenode> nnNS : nns.values()) {
|
||||||
|
nnNS.get("nn0").transitionToActive();
|
||||||
|
nnNS.get("nn1").transitionToStandby();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() throws Exception {
|
||||||
|
for (Map<String, MockNamenode> nnNS : nns.values()) {
|
||||||
|
for (MockNamenode nn : nnNS.values()) {
|
||||||
|
nn.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nns.clear();
|
||||||
|
|
||||||
|
if (router != null) {
|
||||||
|
router.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the configuration of the cluster which contains all the Namenodes and
|
||||||
|
* their addresses.
|
||||||
|
* @return Configuration containing all the Namenodes.
|
||||||
|
*/
|
||||||
|
private Configuration getNamenodesConfig() {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
|
||||||
|
StringUtils.join(",", nns.keySet()));
|
||||||
|
for (String nsId : nns.keySet()) {
|
||||||
|
Set<String> nnIds = nns.get(nsId).keySet();
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
conf.set(sb.toString(), StringUtils.join(",", nnIds));
|
||||||
|
|
||||||
|
for (String nnId : nnIds) {
|
||||||
|
final MockNamenode nn = nns.get(nsId).get(nnId);
|
||||||
|
|
||||||
|
sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
sb.append(".").append(nnId);
|
||||||
|
conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
|
||||||
|
|
||||||
|
sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
sb.append(".").append(nnId);
|
||||||
|
conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWebSchemeHttp() throws IOException {
|
||||||
|
testWebScheme(HttpConfig.Policy.HTTP_ONLY, "http");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWebSchemeHttps() throws IOException {
|
||||||
|
testWebScheme(HttpConfig.Policy.HTTPS_ONLY, "https");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testWebScheme(HttpConfig.Policy httpPolicy,
|
||||||
|
String expectedScheme) throws IOException {
|
||||||
|
Configuration nsConf = getNamenodesConfig();
|
||||||
|
|
||||||
|
// Setup the State Store for the Router to use
|
||||||
|
Configuration stateStoreConfig = getStateStoreConfiguration();
|
||||||
|
stateStoreConfig.setClass(
|
||||||
|
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
|
||||||
|
stateStoreConfig.setClass(
|
||||||
|
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MountTableResolver.class, FileSubclusterResolver.class);
|
||||||
|
|
||||||
|
Configuration routerConf = new RouterConfigBuilder(nsConf)
|
||||||
|
.enableLocalHeartbeat(true)
|
||||||
|
.heartbeat()
|
||||||
|
.stateStore()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// set "dfs.http.policy" to "HTTPS_ONLY"
|
||||||
|
routerConf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, httpPolicy.name());
|
||||||
|
|
||||||
|
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
|
||||||
|
"ns1.nn0,ns1.nn1");
|
||||||
|
routerConf.addResource(stateStoreConfig);
|
||||||
|
|
||||||
|
// Specify local node (ns0.nn1) to monitor
|
||||||
|
routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
|
||||||
|
routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
||||||
|
|
||||||
|
// Start the Router with the namenodes to monitor
|
||||||
|
router = new Router();
|
||||||
|
router.init(routerConf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
// Manually trigger the heartbeat and update the values
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices =
|
||||||
|
router.getNamenodeHeartbeatServices();
|
||||||
|
for (NamenodeHeartbeatService service : heartbeatServices) {
|
||||||
|
service.periodicInvoke();
|
||||||
|
}
|
||||||
|
MembershipNamenodeResolver resolver =
|
||||||
|
(MembershipNamenodeResolver) router.getNamenodeResolver();
|
||||||
|
resolver.loadCache(true);
|
||||||
|
|
||||||
|
// Check that the webSchemes are "https"
|
||||||
|
final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
|
||||||
|
for (String nsId : nns.keySet()) {
|
||||||
|
List<? extends FederationNamenodeContext> nnReports =
|
||||||
|
resolver.getNamenodesForNameserviceId(nsId);
|
||||||
|
namespaceInfo.addAll(nnReports);
|
||||||
|
}
|
||||||
|
for (FederationNamenodeContext nnInfo : namespaceInfo) {
|
||||||
|
assertEquals("Unexpected scheme for Policy: " + httpPolicy.name(),
|
||||||
|
nnInfo.getWebScheme(), expectedScheme);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -173,7 +173,7 @@ public class TestRouterRPCClientRetries {
|
||||||
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
|
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
|
||||||
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
|
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
|
||||||
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
|
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
|
||||||
nnInfo.getWebAddress());
|
nnInfo.getWebScheme(), nnInfo.getWebAddress());
|
||||||
report.setRegistrationValid(false);
|
report.setRegistrationValid(false);
|
||||||
assertTrue(resolver.registerNamenode(report));
|
assertTrue(resolver.registerNamenode(report));
|
||||||
resolver.loadCache(true);
|
resolver.loadCache(true);
|
||||||
|
|
|
@ -260,7 +260,8 @@ public final class FederationStateStoreTestUtils {
|
||||||
FederationNamenodeServiceState state) throws IOException {
|
FederationNamenodeServiceState state) throws IOException {
|
||||||
MembershipState entry = MembershipState.newInstance(
|
MembershipState entry = MembershipState.newInstance(
|
||||||
"routerId", nameserviceId, namenodeId, "clusterId", "test",
|
"routerId", nameserviceId, namenodeId, "clusterId", "test",
|
||||||
"0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", state, false);
|
"0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "http", "0.0.0.0:0",
|
||||||
|
state, false);
|
||||||
MembershipStats stats = MembershipStats.newInstance();
|
MembershipStats stats = MembershipStats.newInstance();
|
||||||
stats.setNumOfActiveDatanodes(100);
|
stats.setNumOfActiveDatanodes(100);
|
||||||
stats.setNumOfDeadDatanodes(10);
|
stats.setNumOfDeadDatanodes(10);
|
||||||
|
|
|
@ -170,7 +170,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
router, ns,
|
router, ns,
|
||||||
nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn,
|
nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn,
|
||||||
"testservice-"+ ns + nn, "testlifeline-"+ ns + nn,
|
"testservice-"+ ns + nn, "testlifeline-"+ ns + nn,
|
||||||
"testweb-" + ns + nn, state, false);
|
"http", "testweb-" + ns + nn, state, false);
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,34 +238,35 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
String lifelineAddress = "testlifelineaddress";
|
String lifelineAddress = "testlifelineaddress";
|
||||||
String blockPoolId = "testblockpool";
|
String blockPoolId = "testblockpool";
|
||||||
String clusterId = "testcluster";
|
String clusterId = "testcluster";
|
||||||
|
String webScheme = "http";
|
||||||
String webAddress = "testwebaddress";
|
String webAddress = "testwebaddress";
|
||||||
boolean safemode = false;
|
boolean safemode = false;
|
||||||
|
|
||||||
// Active
|
// Active
|
||||||
MembershipState record = MembershipState.newInstance(
|
MembershipState record = MembershipState.newInstance(
|
||||||
ROUTERS[0], ns, nn, clusterId, blockPoolId,
|
ROUTERS[0], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme,
|
||||||
FederationNamenodeServiceState.ACTIVE, safemode);
|
webAddress, FederationNamenodeServiceState.ACTIVE, safemode);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[1], ns, nn, clusterId, blockPoolId,
|
ROUTERS[1], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
webAddress, FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[2], ns, nn, clusterId, blockPoolId,
|
ROUTERS[2], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[3], ns, nn, clusterId, blockPoolId,
|
ROUTERS[3], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
registerAndLoadRegistrations(registrationList);
|
registerAndLoadRegistrations(registrationList);
|
||||||
|
@ -293,6 +294,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
String lifelineAddress = "testlifelineaddress";
|
String lifelineAddress = "testlifelineaddress";
|
||||||
String blockPoolId = "testblockpool";
|
String blockPoolId = "testblockpool";
|
||||||
String clusterId = "testcluster";
|
String clusterId = "testcluster";
|
||||||
|
String webScheme = "http";
|
||||||
String webAddress = "testwebaddress";
|
String webAddress = "testwebaddress";
|
||||||
boolean safemode = false;
|
boolean safemode = false;
|
||||||
long startingTime = Time.now();
|
long startingTime = Time.now();
|
||||||
|
@ -300,7 +302,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
// Expired
|
// Expired
|
||||||
MembershipState record = MembershipState.newInstance(
|
MembershipState record = MembershipState.newInstance(
|
||||||
ROUTERS[0], ns, nn, clusterId, blockPoolId,
|
ROUTERS[0], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, webAddress, lifelineAddress, webAddress,
|
rpcAddress, webAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
record.setDateModified(startingTime - 10000);
|
record.setDateModified(startingTime - 10000);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
@ -308,7 +310,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[1], ns, nn, clusterId, blockPoolId,
|
ROUTERS[1], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
record.setDateModified(startingTime);
|
record.setDateModified(startingTime);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
@ -316,7 +318,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[2], ns, nn, clusterId, blockPoolId,
|
ROUTERS[2], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
record.setDateModified(startingTime);
|
record.setDateModified(startingTime);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
@ -324,7 +326,7 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
||||||
// Expired
|
// Expired
|
||||||
record = MembershipState.newInstance(
|
record = MembershipState.newInstance(
|
||||||
ROUTERS[3], ns, nn, clusterId, blockPoolId,
|
ROUTERS[3], ns, nn, clusterId, blockPoolId,
|
||||||
rpcAddress, serviceAddress, lifelineAddress, webAddress,
|
rpcAddress, serviceAddress, lifelineAddress, webScheme, webAddress,
|
||||||
FederationNamenodeServiceState.EXPIRED, safemode);
|
FederationNamenodeServiceState.EXPIRED, safemode);
|
||||||
record.setDateModified(startingTime);
|
record.setDateModified(startingTime);
|
||||||
registrationList.add(record);
|
registrationList.add(record);
|
||||||
|
|
|
@ -127,8 +127,9 @@ public class TestStateStoreDriverBase {
|
||||||
generateRandomString(), generateRandomString(),
|
generateRandomString(), generateRandomString(),
|
||||||
generateRandomString(), generateRandomString(),
|
generateRandomString(), generateRandomString(),
|
||||||
generateRandomString(), generateRandomString(),
|
generateRandomString(), generateRandomString(),
|
||||||
generateRandomString(), generateRandomString(),
|
generateRandomString(), "http", generateRandomString(),
|
||||||
generateRandomEnum(FederationNamenodeServiceState.class), false);
|
generateRandomEnum(FederationNamenodeServiceState.class),
|
||||||
|
false);
|
||||||
} else if (recordClass == MountTable.class) {
|
} else if (recordClass == MountTable.class) {
|
||||||
String src = "/" + generateRandomString();
|
String src = "/" + generateRandomString();
|
||||||
Map<String, String> destMap = Collections.singletonMap(
|
Map<String, String> destMap = Collections.singletonMap(
|
||||||
|
|
|
@ -40,6 +40,7 @@ public class TestMembershipState {
|
||||||
private static final String LIFELINE_ADDRESS = "lifelineaddress";
|
private static final String LIFELINE_ADDRESS = "lifelineaddress";
|
||||||
private static final String WEB_ADDRESS = "webaddress";
|
private static final String WEB_ADDRESS = "webaddress";
|
||||||
private static final boolean SAFE_MODE = false;
|
private static final boolean SAFE_MODE = false;
|
||||||
|
private static final String SCHEME = "http";
|
||||||
|
|
||||||
private static final long DATE_CREATED = 100;
|
private static final long DATE_CREATED = 100;
|
||||||
private static final long DATE_MODIFIED = 200;
|
private static final long DATE_MODIFIED = 200;
|
||||||
|
@ -68,7 +69,7 @@ public class TestMembershipState {
|
||||||
MembershipState record = MembershipState.newInstance(
|
MembershipState record = MembershipState.newInstance(
|
||||||
ROUTER, NAMESERVICE, NAMENODE, CLUSTER_ID,
|
ROUTER, NAMESERVICE, NAMENODE, CLUSTER_ID,
|
||||||
BLOCKPOOL_ID, RPC_ADDRESS, SERVICE_ADDRESS, LIFELINE_ADDRESS,
|
BLOCKPOOL_ID, RPC_ADDRESS, SERVICE_ADDRESS, LIFELINE_ADDRESS,
|
||||||
WEB_ADDRESS, STATE, SAFE_MODE);
|
SCHEME, WEB_ADDRESS, STATE, SAFE_MODE);
|
||||||
record.setDateCreated(DATE_CREATED);
|
record.setDateCreated(DATE_CREATED);
|
||||||
record.setDateModified(DATE_MODIFIED);
|
record.setDateModified(DATE_MODIFIED);
|
||||||
|
|
||||||
|
@ -98,6 +99,7 @@ public class TestMembershipState {
|
||||||
assertEquals(CLUSTER_ID, record.getClusterId());
|
assertEquals(CLUSTER_ID, record.getClusterId());
|
||||||
assertEquals(BLOCKPOOL_ID, record.getBlockPoolId());
|
assertEquals(BLOCKPOOL_ID, record.getBlockPoolId());
|
||||||
assertEquals(RPC_ADDRESS, record.getRpcAddress());
|
assertEquals(RPC_ADDRESS, record.getRpcAddress());
|
||||||
|
assertEquals(SCHEME, record.getWebScheme());
|
||||||
assertEquals(WEB_ADDRESS, record.getWebAddress());
|
assertEquals(WEB_ADDRESS, record.getWebAddress());
|
||||||
assertEquals(STATE, record.getState());
|
assertEquals(STATE, record.getState());
|
||||||
assertEquals(SAFE_MODE, record.getIsSafeMode());
|
assertEquals(SAFE_MODE, record.getIsSafeMode());
|
||||||
|
|
Loading…
Reference in New Issue