HDFS-12498. Journal Syncer is not started in Federated + HA cluster. Contributed by Bharat Viswanadham.

This commit is contained in:
Arpit Agarwal 2017-11-10 16:30:38 -08:00
parent 1d6f8bebe9
commit 6d201f77c7
2 changed files with 147 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
@ -51,6 +52,8 @@ import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
/** /**
@ -263,25 +266,63 @@ public class JournalNodeSyncer {
} }
private List<InetSocketAddress> getOtherJournalNodeAddrs() { private List<InetSocketAddress> getOtherJournalNodeAddrs() {
URI uri = null; String uriStr = "";
try { try {
String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); uriStr = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
if (uriStr == null || uriStr.isEmpty()) { if (uriStr == null || uriStr.isEmpty()) {
LOG.warn("Could not construct Shared Edits Uri"); if (nameServiceId != null) {
return null; uriStr = conf.getTrimmed(DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + nameServiceId);
}
} }
uri = new URI(uriStr);
return Util.getLoggerAddresses(uri, if (uriStr == null || uriStr.isEmpty()) {
Sets.newHashSet(jn.getBoundIpcAddress())); HashSet<String> sharedEditsUri = Sets.newHashSet();
if (nameServiceId != null) {
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(
conf, nameServiceId);
for (String nnId : nnIds) {
String suffix = nameServiceId + "." + nnId;
uriStr = conf.getTrimmed(DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "." + suffix);
sharedEditsUri.add(uriStr);
}
if (sharedEditsUri.size() > 1) {
uriStr = null;
LOG.error("The conf property " + DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly, " +
"it has been configured with different journalnode values " +
sharedEditsUri.toString() + " for a" +
" single nameserviceId" + nameServiceId);
}
}
}
if (uriStr == null || uriStr.isEmpty()) {
LOG.error("Could not construct Shared Edits Uri");
return null;
} else {
return getJournalAddrList(uriStr);
}
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.error("The conf property " + DFSConfigKeys LOG.error("The conf property " + DFSConfigKeys
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly."); .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly.");
} catch (IOException e) { } catch (IOException e) {
LOG.error("Could not parse JournalNode addresses: " + uri); LOG.error("Could not parse JournalNode addresses: " + uriStr);
} }
return null; return null;
} }
private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException,
IOException {
URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));
}
private JournalIdProto convertJournalId(String journalId) { private JournalIdProto convertJournalId(String journalId) {
return QJournalProtocolProtos.JournalIdProto.newBuilder() return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(journalId) .setIdentifier(journalId)

View File

@ -106,6 +106,24 @@ public class TestJournalNode {
"testJournalNodeSyncerNotStartWhenSyncEnabled")) { "testJournalNodeSyncerNotStartWhenSyncEnabled")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901"); "qjournal://jn0:9900;jn1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeConfigWithNameServiceId")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1",
"qjournal://journalnode0:9900;journalnode0:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeConfigWithNamenodeId")) {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
"qjournal://journalnode0:9900;journalnode1:9901");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
"qjournal://journalnode0:9900;journalnode1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId")) {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
"qjournal://journalnode0:9900;journalnode1:9901");
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
"qjournal://journalnode0:9902;journalnode1:9903");
} }
jn = new JournalNode(); jn = new JournalNode();
jn.setConf(conf); jn.setConf(conf);
@ -387,7 +405,7 @@ public class TestJournalNode {
@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncDisabled() public void testJournalNodeSyncerNotStartWhenSyncDisabled()
throws IOException{ throws IOException {
//JournalSyncer will not be started, as journalsync is not enabled //JournalSyncer will not be started, as journalsync is not enabled
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false);
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
@ -408,7 +426,7 @@ public class TestJournalNode {
@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI() public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()
throws IOException{ throws IOException {
//JournalSyncer will not be started, //JournalSyncer will not be started,
// as shared edits hostnames are not resolved // as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
@ -431,7 +449,7 @@ public class TestJournalNode {
@Test @Test
public void testJournalNodeSyncerNotStartWhenSyncEnabled() public void testJournalNodeSyncerNotStartWhenSyncEnabled()
throws IOException{ throws IOException {
//JournalSyncer will not be started, //JournalSyncer will not be started,
// as shared edits hostnames are not resolved // as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId); jn.getOrCreateJournal(journalId);
@ -452,9 +470,84 @@ public class TestJournalNode {
} }
private void setupStaticHostResolution(int nameServiceIdCount,
@Test
public void testJournalNodeSyncwithFederationTypeConfigWithNameServiceId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started
jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void testJournalNodeSyncwithFederationTypeConfigWithNamenodeId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId +
// namenodeId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started
jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void
testJournalNodeSyncwithFederationTypeIncorrectConfigWithNamenodeId()
throws IOException {
//JournalSyncer will not be started, as nameserviceId passed is null,
// but configured shared edits dir is appended with nameserviceId +
// namenodeId
setupStaticHostResolution(2, "journalnode");
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and journalnode syncer will not be started
// as for each nnId, different shared Edits dir value is configured
jn.getOrCreateJournal(journalId, "ns1");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
private void setupStaticHostResolution(int journalNodeCount,
String hostname) { String hostname) {
for (int i = 0; i < nameServiceIdCount; i++) { for (int i = 0; i < journalNodeCount; i++) {
NetUtils.addStaticResolution(hostname + i, NetUtils.addStaticResolution(hostname + i,
"localhost"); "localhost");
} }