HDFS-4445. All BKJM ledgers are not checked while tailing, So failover will fail. Contributed Vinay.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1441935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8590564dc5
commit
52e6f5a276
|
@ -750,6 +750,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HDFS-4452. getAdditionalBlock() can create multiple blocks if the client
|
HDFS-4452. getAdditionalBlock() can create multiple blocks if the client
|
||||||
times out and retries. (shv)
|
times out and retries. (shv)
|
||||||
|
|
||||||
|
HDFS-4445. All BKJM ledgers are not checked while tailing, So failover will fail.
|
||||||
|
(Vinay via umamahesh)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3077 SUBTASKS
|
BREAKDOWN OF HDFS-3077 SUBTASKS
|
||||||
|
|
||||||
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
||||||
|
|
|
@ -503,7 +503,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk) throws IOException {
|
long fromTxId, boolean inProgressOk) throws IOException {
|
||||||
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(inProgressOk);
|
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
||||||
|
inProgressOk);
|
||||||
try {
|
try {
|
||||||
BookKeeperEditLogInputStream elis = null;
|
BookKeeperEditLogInputStream elis = null;
|
||||||
for (EditLogLedgerMetadata l : currentLedgerList) {
|
for (EditLogLedgerMetadata l : currentLedgerList) {
|
||||||
|
@ -511,6 +512,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
if (l.isInProgress()) {
|
if (l.isInProgress()) {
|
||||||
lastTxId = recoverLastTxId(l, false);
|
lastTxId = recoverLastTxId(l, false);
|
||||||
}
|
}
|
||||||
|
// Check once again, required in case of InProgress and is case of any
|
||||||
|
// gap.
|
||||||
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
||||||
LedgerHandle h;
|
LedgerHandle h;
|
||||||
if (l.isInProgress()) { // we don't want to fence the current journal
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
||||||
|
@ -523,6 +526,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
elis = new BookKeeperEditLogInputStream(h, l);
|
elis = new BookKeeperEditLogInputStream(h, l);
|
||||||
elis.skipTo(fromTxId);
|
elis.skipTo(fromTxId);
|
||||||
} else {
|
} else {
|
||||||
|
// If mismatches then there might be some gap, so we should not check
|
||||||
|
// further.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
streams.add(elis);
|
streams.add(elis);
|
||||||
|
@ -732,6 +737,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
*/
|
*/
|
||||||
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return getLedgerList(-1, inProgressOk);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<EditLogLedgerMetadata> getLedgerList(long fromTxId,
|
||||||
|
boolean inProgressOk) throws IOException {
|
||||||
List<EditLogLedgerMetadata> ledgers
|
List<EditLogLedgerMetadata> ledgers
|
||||||
= new ArrayList<EditLogLedgerMetadata>();
|
= new ArrayList<EditLogLedgerMetadata>();
|
||||||
try {
|
try {
|
||||||
|
@ -744,6 +754,12 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
try {
|
try {
|
||||||
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
||||||
.read(zkc, legderMetadataPath);
|
.read(zkc, legderMetadataPath);
|
||||||
|
if (editLogLedgerMetadata.getLastTxId() != HdfsConstants.INVALID_TXID
|
||||||
|
&& editLogLedgerMetadata.getLastTxId() < fromTxId) {
|
||||||
|
// exclude already read closed edits, but include inprogress edits
|
||||||
|
// as this will be handled in caller
|
||||||
|
continue;
|
||||||
|
}
|
||||||
ledgers.add(editLogLedgerMetadata);
|
ledgers.add(editLogLedgerMetadata);
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
LOG.warn("ZNode: " + legderMetadataPath
|
LOG.warn("ZNode: " + legderMetadataPath
|
||||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
|
||||||
|
@ -34,11 +33,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -352,4 +349,42 @@ public class TestBookKeeperAsHASharedDir {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NameNode should load the edits correctly if the applicable edits are
|
||||||
|
* present in the BKJM.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
|
||||||
|
.createJournalURI("/correctEditLogSelection").toString());
|
||||||
|
BKJMUtil.addJournalManagerDefinition(conf);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0)
|
||||||
|
.manageNameDfsSharedDirs(false).build();
|
||||||
|
NameNode nn1 = cluster.getNameNode(0);
|
||||||
|
NameNode nn2 = cluster.getNameNode(1);
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
nn1.getRpcServer().rollEditLog(); // Roll Edits from current Active.
|
||||||
|
// Transition to standby current active gracefully.
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
// Make the other Active and Roll edits multiple times
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
nn2.getRpcServer().rollEditLog();
|
||||||
|
nn2.getRpcServer().rollEditLog();
|
||||||
|
// Now One more failover. So NN1 should be able to failover successfully.
|
||||||
|
cluster.transitionToStandby(1);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue