HDFS-16684. Exclude the current JournalNode (#4723)
Exclude bound local addresses, including the use of a wildcard address in the bound host configurations. * Allow sync attempts with unresolved addresses * Update the comments. * Remove unused import Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
c294a414b9
commit
a9e5fb3313
|
@ -124,6 +124,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||||
return journal;
|
return journal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public JournalNodeSyncer getJournalSyncer(String jid) {
|
||||||
|
return journalSyncersById.get(jid);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public boolean getJournalSyncerStatus(String jid) {
|
public boolean getJournalSyncerStatus(String jid) {
|
||||||
if (journalSyncersById.get(jid) != null) {
|
if (journalSyncersById.get(jid) != null) {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.qjournal.server;
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.Lists;
|
import org.apache.hadoop.util.Lists;
|
||||||
|
import org.apache.hadoop.util.Sets;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -50,10 +52,10 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Journal Sync thread runs through the lifetime of the JN. It periodically
|
* A Journal Sync thread runs through the lifetime of the JN. It periodically
|
||||||
|
@ -153,6 +155,9 @@ public class JournalNodeSyncer {
|
||||||
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
|
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Check if there are any other JournalNodes before starting the sync. Although some proxies
|
||||||
|
// may be unresolved now, the act of attempting to sync will instigate resolution when the
|
||||||
|
// servers become available.
|
||||||
if (otherJNProxies.isEmpty()) {
|
if (otherJNProxies.isEmpty()) {
|
||||||
LOG.error("Cannot sync as there is no other JN available for sync.");
|
LOG.error("Cannot sync as there is no other JN available for sync.");
|
||||||
return false;
|
return false;
|
||||||
|
@ -310,12 +315,24 @@ public class JournalNodeSyncer {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
|
@VisibleForTesting
|
||||||
|
protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
|
||||||
URISyntaxException,
|
URISyntaxException,
|
||||||
IOException {
|
IOException {
|
||||||
URI uri = new URI(uriStr);
|
URI uri = new URI(uriStr);
|
||||||
return Util.getLoggerAddresses(uri,
|
|
||||||
new HashSet<>(Arrays.asList(jn.getBoundIpcAddress())), conf);
|
InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
|
||||||
|
Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
|
||||||
|
List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded, conf);
|
||||||
|
|
||||||
|
// Exclude the current JournalNode instance (a local address and the same port). If the address
|
||||||
|
// is bound to a local address on the same port, then remove it to handle scenarios where a
|
||||||
|
// wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses
|
||||||
|
// since we may be running multiple servers on the same host.
|
||||||
|
addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()
|
||||||
|
&& boundIpcAddress.getPort() == addr.getPort());
|
||||||
|
|
||||||
|
return addrList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
|
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.qjournal.server;
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
|
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
|
||||||
.getLogFile;
|
.getLogFile;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
@ -96,12 +99,45 @@ public class TestJournalNodeSync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the "self exclusion" works when there are multiple JournalNode instances running on
|
||||||
|
* the same server, but on different ports.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
|
||||||
|
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
|
||||||
|
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
|
||||||
|
|
||||||
|
// Test: Get the Journal address list for the default configuration
|
||||||
|
List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);
|
||||||
|
|
||||||
|
// Verify: One of the addresses should be excluded so that the node isn't syncing with itself
|
||||||
|
assertEquals(2, addrList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the "self exclusion" works when there a host uses a wildcard address.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
|
||||||
|
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
|
||||||
|
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
|
||||||
|
|
||||||
|
// Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
|
||||||
|
// used as a bind host.
|
||||||
|
String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
|
||||||
|
List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);
|
||||||
|
|
||||||
|
// Verify: One of the address should be excluded so that the node isn't syncing with itself
|
||||||
|
assertEquals(2, boundHostAddrList.size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testJournalNodeSync() throws Exception {
|
public void testJournalNodeSync() throws Exception {
|
||||||
|
|
||||||
//As by default 3 journal nodes are started;
|
//As by default 3 journal nodes are started;
|
||||||
for(int i=0; i<3; i++) {
|
for(int i=0; i<3; i++) {
|
||||||
Assert.assertEquals(true,
|
assertEquals(true,
|
||||||
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
|
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,13 +422,13 @@ public class TestJournalNodeSync {
|
||||||
HdfsConstants.RollingUpgradeAction.PREPARE);
|
HdfsConstants.RollingUpgradeAction.PREPARE);
|
||||||
|
|
||||||
//query rolling upgrade
|
//query rolling upgrade
|
||||||
Assert.assertEquals(info, dfsActive.rollingUpgrade(
|
assertEquals(info, dfsActive.rollingUpgrade(
|
||||||
HdfsConstants.RollingUpgradeAction.QUERY));
|
HdfsConstants.RollingUpgradeAction.QUERY));
|
||||||
|
|
||||||
// Restart the Standby NN with rollingUpgrade option
|
// Restart the Standby NN with rollingUpgrade option
|
||||||
dfsCluster.restartNameNode(standbyNNindex, true,
|
dfsCluster.restartNameNode(standbyNNindex, true,
|
||||||
"-rollingUpgrade", "started");
|
"-rollingUpgrade", "started");
|
||||||
Assert.assertEquals(info, dfsActive.rollingUpgrade(
|
assertEquals(info, dfsActive.rollingUpgrade(
|
||||||
HdfsConstants.RollingUpgradeAction.QUERY));
|
HdfsConstants.RollingUpgradeAction.QUERY));
|
||||||
|
|
||||||
// Do some edits and delete some edit logs
|
// Do some edits and delete some edit logs
|
||||||
|
@ -420,7 +456,7 @@ public class TestJournalNodeSync {
|
||||||
// Restart the current standby NN (previously active)
|
// Restart the current standby NN (previously active)
|
||||||
dfsCluster.restartNameNode(standbyNNindex, true,
|
dfsCluster.restartNameNode(standbyNNindex, true,
|
||||||
"-rollingUpgrade", "started");
|
"-rollingUpgrade", "started");
|
||||||
Assert.assertEquals(info, dfsActive.rollingUpgrade(
|
assertEquals(info, dfsActive.rollingUpgrade(
|
||||||
HdfsConstants.RollingUpgradeAction.QUERY));
|
HdfsConstants.RollingUpgradeAction.QUERY));
|
||||||
dfsCluster.waitActive();
|
dfsCluster.waitActive();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue