HDFS-2861. checkpointing should verify that the dfs.http.address has been configured to a non-loopback for peer NN. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1239886 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4324e1bcd7
commit
32c313d51c
|
@ -151,3 +151,5 @@ HDFS-2742. HA: observed dataloss in replication stress test. (todd via eli)
|
|||
HDFS-2870. Fix log level for block debug info in processMisReplicatedBlocks (todd)
|
||||
|
||||
HDFS-2859. LOCAL_ADDRESS_MATCHER.match has NPE when called from DFSUtil.getSuffixIDs when the host is incorrect (Bikas Saha via todd)
|
||||
|
||||
HDFS-2861. checkpointing should verify that the dfs.http.address has been configured to a non-loopback for peer NN (todd)
|
||||
|
|
|
@ -676,6 +676,33 @@ public class DFSUtil {
|
|||
return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Substitute a default host in the case that an address has been configured
|
||||
* with a wildcard. This is used, for example, when determining the HTTP
|
||||
* address of the NN -- if it's configured to bind to 0.0.0.0, we want to
|
||||
* substitute the hostname from the filesystem URI rather than trying to
|
||||
* connect to 0.0.0.0.
|
||||
* @param configuredAddress the address found in the configuration
|
||||
* @param defaultHost the host to substitute with, if configuredAddress
|
||||
* is a local/wildcard address.
|
||||
* @return the substituted address
|
||||
* @throws IOException if it is a wildcard address and security is enabled
|
||||
*/
|
||||
public static String substituteForWildcardAddress(String configuredAddress,
|
||||
String defaultHost) throws IOException {
|
||||
InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
|
||||
if (sockAddr.getAddress().isAnyLocalAddress()) {
|
||||
if(UserGroupInformation.isSecurityEnabled()) {
|
||||
throw new IOException("Cannot use a wildcard address with security. " +
|
||||
"Must explicitly set bind address for Kerberos");
|
||||
}
|
||||
return defaultHost + ":" + sockAddr.getPort();
|
||||
} else {
|
||||
return configuredAddress;
|
||||
}
|
||||
}
|
||||
|
||||
private static String getSuffixedConf(Configuration conf,
|
||||
String key, String defaultVal, String[] suffixes) {
|
||||
String ret = conf.get(DFSUtil.addKeySuffixes(key, suffixes));
|
||||
|
|
|
@ -129,16 +129,28 @@ public class HAUtil {
|
|||
Configuration myConf) {
|
||||
|
||||
String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
|
||||
Preconditions.checkArgument(nsId != null,
|
||||
"Could not determine namespace id. Please ensure that this " +
|
||||
"machine is one of the machines listed as a NN RPC address, " +
|
||||
"or configure " + DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID);
|
||||
|
||||
Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId);
|
||||
String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
|
||||
Preconditions.checkArgument(nnIds != null,
|
||||
"Could not determine namenode ids in namespace '%s'",
|
||||
"Could not determine namenode ids in namespace '%s'. " +
|
||||
"Please configure " +
|
||||
DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY,
|
||||
nsId),
|
||||
nsId);
|
||||
Preconditions.checkArgument(nnIds.size() == 2,
|
||||
"Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'",
|
||||
Joiner.on("','").join(nnIds));
|
||||
"Expected exactly 2 NameNodes in namespace '%s'. " +
|
||||
"Instead, got only %s (NN ids were '%s'",
|
||||
nsId, nnIds.size(), Joiner.on("','").join(nnIds));
|
||||
Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
|
||||
"Could not determine own NN ID");
|
||||
"Could not determine own NN ID in namespace '%s'. Please " +
|
||||
"ensure that this node is one of the machines listed as an " +
|
||||
"NN RPC address, or configure " + DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY,
|
||||
nsId);
|
||||
|
||||
ArrayList<String> nnSet = Lists.newArrayList(nnIds);
|
||||
nnSet.remove(myNNId);
|
||||
|
|
|
@ -567,10 +567,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (leaseManager != null) {
|
||||
leaseManager.stopMonitor();
|
||||
}
|
||||
dir.fsImage.editLog.close();
|
||||
// Update the fsimage with the last txid that we wrote
|
||||
// so that the tailer starts from the right spot.
|
||||
dir.fsImage.updateLastAppliedTxIdFromWritten();
|
||||
if (dir != null && dir.fsImage != null) {
|
||||
if (dir.fsImage.editLog != null) {
|
||||
dir.fsImage.editLog.close();
|
||||
}
|
||||
// Update the fsimage with the last txid that we wrote
|
||||
// so that the tailer starts from the right spot.
|
||||
dir.fsImage.updateLastAppliedTxIdFromWritten();
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
@ -612,7 +616,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (editLogTailer != null) {
|
||||
editLogTailer.stop();
|
||||
}
|
||||
dir.fsImage.editLog.close();
|
||||
if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
|
||||
dir.fsImage.editLog.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -450,19 +450,10 @@ public class SecondaryNameNode implements Runnable {
|
|||
}
|
||||
|
||||
String configuredAddress = DFSUtil.getInfoServer(null, conf, true);
|
||||
InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
|
||||
if (sockAddr.getAddress().isAnyLocalAddress()) {
|
||||
if(UserGroupInformation.isSecurityEnabled()) {
|
||||
throw new IOException("Cannot use a wildcard address with security. " +
|
||||
"Must explicitly set bind address for Kerberos");
|
||||
}
|
||||
return fsName.getHost() + ":" + sockAddr.getPort();
|
||||
} else {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("configuredAddress = " + configuredAddress);
|
||||
}
|
||||
return configuredAddress;
|
||||
}
|
||||
String address = DFSUtil.substituteForWildcardAddress(configuredAddress,
|
||||
fsName.getHost());
|
||||
LOG.debug("Will connect to NameNode at HTTP address: " + address);
|
||||
return address;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.HAUtil;
|
|||
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -75,12 +76,11 @@ public class StandbyCheckpointer {
|
|||
*/
|
||||
private void setNameNodeAddresses(Configuration conf) {
|
||||
// Look up our own address.
|
||||
String myAddrString = DFSUtil.getInfoServer(null, conf, true);
|
||||
String myAddrString = getHttpAddress(conf);
|
||||
|
||||
// Look up the active node's address
|
||||
Configuration confForActive = HAUtil.getConfForOtherNode(conf);
|
||||
activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true);
|
||||
|
||||
activeNNAddress = getHttpAddress(confForActive);
|
||||
|
||||
// Sanity-check.
|
||||
Preconditions.checkArgument(checkAddress(activeNNAddress),
|
||||
|
@ -90,13 +90,28 @@ public class StandbyCheckpointer {
|
|||
myNNAddress = NetUtils.createSocketAddr(myAddrString);
|
||||
}
|
||||
|
||||
private String getHttpAddress(Configuration conf) {
|
||||
String configuredAddr = DFSUtil.getInfoServer(null, conf, true);
|
||||
|
||||
// Use the hostname from the RPC address as a default, in case
|
||||
// the HTTP address is configured to 0.0.0.0.
|
||||
String hostnameFromRpc = NameNode.getServiceAddress(
|
||||
conf, true).getHostName();
|
||||
try {
|
||||
return DFSUtil.substituteForWildcardAddress(
|
||||
configuredAddr, hostnameFromRpc);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that the given address is valid and has a port
|
||||
* specified.
|
||||
*/
|
||||
private boolean checkAddress(String addrStr) {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(addrStr);
|
||||
return addr.getPort() != 0;
|
||||
return addr.getPort() != 0 && !addr.getAddress().isAnyLocalAddress();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -287,4 +302,9 @@ public class StandbyCheckpointer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
String getActiveNNAddress() {
|
||||
return activeNNAddress;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -586,13 +586,13 @@ public class MiniDFSCluster {
|
|||
conf.set(FS_DEFAULT_NAME_KEY, "127.0.0.1:" + onlyNN.getIpcPort());
|
||||
}
|
||||
|
||||
// If we have more than one nameservice, need to enumerate them in the
|
||||
// config.
|
||||
if (federation) {
|
||||
List<String> allNsIds = Lists.newArrayList();
|
||||
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
||||
List<String> allNsIds = Lists.newArrayList();
|
||||
for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
|
||||
if (nameservice.getId() != null) {
|
||||
allNsIds.add(nameservice.getId());
|
||||
}
|
||||
}
|
||||
if (!allNsIds.isEmpty()) {
|
||||
conf.set(DFS_FEDERATION_NAMESERVICES, Joiner.on(",").join(allNsIds));
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ public class MiniDFSNNTopology {
|
|||
*/
|
||||
public static MiniDFSNNTopology simpleHATopology() {
|
||||
return new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null)
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("minidfs-ns")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1"))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2")));
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
@ -43,6 +44,15 @@ import org.apache.hadoop.fs.BlockLocation;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
|
||||
public class TestDFSUtil {
|
||||
|
||||
/**
|
||||
* Reset to default UGI settings since some tests change them.
|
||||
*/
|
||||
@Before
|
||||
public void resetUGI() {
|
||||
UserGroupInformation.setConfiguration(new Configuration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test conversion of LocatedBlock to BlockLocation
|
||||
*/
|
||||
|
@ -398,4 +408,11 @@ public class TestDFSUtil {
|
|||
assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubstituteForWildcardAddress() throws IOException {
|
||||
assertEquals("foo:12345",
|
||||
DFSUtil.substituteForWildcardAddress("0.0.0.0:12345", "foo"));
|
||||
assertEquals("127.0.0.1:12345",
|
||||
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
|
||||
}
|
||||
}
|
|
@ -125,7 +125,7 @@ public class TestEditLogTailer {
|
|||
|
||||
// Have to specify IPC ports so the NNs can talk to each other.
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null)
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002)));
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ public class TestFailureToReadEdits {
|
|||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null)
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Test cases that the HA configuration is reasonably validated and
|
||||
* interpreted in various places. These should be proper unit tests
|
||||
* which don't start daemons.
|
||||
*/
|
||||
public class TestHAConfiguration {
|
||||
private static final String NSID = "ns1";
|
||||
private static String HOST_A = "1.2.3.1";
|
||||
private static String HOST_B = "1.2.3.2";
|
||||
|
||||
private FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||
private Configuration conf = new Configuration();
|
||||
|
||||
@Test
|
||||
public void testCheckpointerValidityChecks() throws Exception {
|
||||
try {
|
||||
new StandbyCheckpointer(conf, fsn);
|
||||
fail("Bad config did not throw an error");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Invalid URI for NameNode address", iae);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOtherNNHttpAddress() {
|
||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, NSID);
|
||||
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, NSID);
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFSConfigKeys.DFS_HA_NAMENODES_KEY, NSID),
|
||||
"nn1,nn2");
|
||||
conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NSID, "nn1"),
|
||||
HOST_A + ":12345");
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
NSID, "nn2"),
|
||||
HOST_B + ":12345");
|
||||
NameNode.initializeGenericKeys(conf, NSID, "nn1");
|
||||
|
||||
// Since we didn't configure the HTTP address, and the default is
|
||||
// 0.0.0.0, it should substitute the address from the RPC configuratoin
|
||||
// above.
|
||||
StandbyCheckpointer checkpointer = new StandbyCheckpointer(conf, fsn);
|
||||
assertEquals(HOST_B + ":" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
|
||||
checkpointer.getActiveNNAddress());
|
||||
}
|
||||
}
|
|
@ -60,7 +60,7 @@ public class TestStandbyCheckpoints {
|
|||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null)
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
|
||||
|
||||
|
|
Loading…
Reference in New Issue