HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId file in the current directory. Contributed by Jing Zhao.
This commit is contained in:
parent
8269bfa613
commit
e9c37de485
|
@ -961,6 +961,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7127. TestLeaseRecovery leaks MiniDFSCluster instances. (cnauroth)
|
HDFS-7127. TestLeaseRecovery leaks MiniDFSCluster instances. (cnauroth)
|
||||||
|
|
||||||
|
HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
|
||||||
|
file in the current directory. (jing9)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -990,7 +990,7 @@ public class Journal implements Closeable {
|
||||||
public synchronized void doPreUpgrade() throws IOException {
|
public synchronized void doPreUpgrade() throws IOException {
|
||||||
// Do not hold file lock on committedTxnId, because the containing
|
// Do not hold file lock on committedTxnId, because the containing
|
||||||
// directory will be renamed. It will be reopened lazily on next access.
|
// directory will be renamed. It will be reopened lazily on next access.
|
||||||
committedTxnId.close();
|
IOUtils.cleanup(LOG, committedTxnId);
|
||||||
storage.getJournalManager().doPreUpgrade();
|
storage.getJournalManager().doPreUpgrade();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1015,14 +1015,25 @@ public class Journal implements Closeable {
|
||||||
new File(previousDir, LAST_PROMISED_FILENAME), 0);
|
new File(previousDir, LAST_PROMISED_FILENAME), 0);
|
||||||
PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
|
PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
|
||||||
new File(previousDir, LAST_WRITER_EPOCH), 0);
|
new File(previousDir, LAST_WRITER_EPOCH), 0);
|
||||||
|
BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile(
|
||||||
|
new File(previousDir, COMMITTED_TXID_FILENAME),
|
||||||
|
HdfsConstants.INVALID_TXID);
|
||||||
|
|
||||||
lastPromisedEpoch = new PersistentLongFile(
|
lastPromisedEpoch = new PersistentLongFile(
|
||||||
new File(currentDir, LAST_PROMISED_FILENAME), 0);
|
new File(currentDir, LAST_PROMISED_FILENAME), 0);
|
||||||
lastWriterEpoch = new PersistentLongFile(
|
lastWriterEpoch = new PersistentLongFile(
|
||||||
new File(currentDir, LAST_WRITER_EPOCH), 0);
|
new File(currentDir, LAST_WRITER_EPOCH), 0);
|
||||||
|
committedTxnId = new BestEffortLongFile(
|
||||||
|
new File(currentDir, COMMITTED_TXID_FILENAME),
|
||||||
|
HdfsConstants.INVALID_TXID);
|
||||||
|
|
||||||
lastPromisedEpoch.set(prevLastPromisedEpoch.get());
|
try {
|
||||||
lastWriterEpoch.set(prevLastWriterEpoch.get());
|
lastPromisedEpoch.set(prevLastPromisedEpoch.get());
|
||||||
|
lastWriterEpoch.set(prevLastWriterEpoch.get());
|
||||||
|
committedTxnId.set(prevCommittedTxnId.get());
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, prevCommittedTxnId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void doFinalize() throws IOException {
|
public synchronized void doFinalize() throws IOException {
|
||||||
|
@ -1043,7 +1054,7 @@ public class Journal implements Closeable {
|
||||||
public synchronized void doRollback() throws IOException {
|
public synchronized void doRollback() throws IOException {
|
||||||
// Do not hold file lock on committedTxnId, because the containing
|
// Do not hold file lock on committedTxnId, because the containing
|
||||||
// directory will be renamed. It will be reopened lazily on next access.
|
// directory will be renamed. It will be reopened lazily on next access.
|
||||||
committedTxnId.close();
|
IOUtils.cleanup(LOG, committedTxnId);
|
||||||
storage.getJournalManager().doRollback();
|
storage.getJournalManager().doRollback();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -93,7 +94,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||||
return journal;
|
return journal;
|
||||||
}
|
}
|
||||||
|
|
||||||
Journal getOrCreateJournal(String jid) throws IOException {
|
@VisibleForTesting
|
||||||
|
public Journal getOrCreateJournal(String jid) throws IOException {
|
||||||
return getOrCreateJournal(jid, StartupOption.REGULAR);
|
return getOrCreateJournal(jid, StartupOption.REGULAR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,19 +38,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
|
||||||
import org.apache.hadoop.hdfs.qjournal.server.Journal;
|
import org.apache.hadoop.hdfs.qjournal.server.Journal;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
||||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for upgrading with HA enabled.
|
* Tests for upgrading with HA enabled.
|
||||||
|
@ -294,6 +298,16 @@ public class TestDFSUpgradeWithHA {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster)
|
||||||
|
throws IOException {
|
||||||
|
Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0)
|
||||||
|
.getOrCreateJournal(MiniQJMHACluster.NAMESERVICE);
|
||||||
|
BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox
|
||||||
|
.getInternalState(journal1, "committedTxnId");
|
||||||
|
return committedTxnId != null ? committedTxnId.get() :
|
||||||
|
HdfsConstants.INVALID_TXID;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make sure that an HA NN can successfully upgrade when configured using
|
* Make sure that an HA NN can successfully upgrade when configured using
|
||||||
* JournalNodes.
|
* JournalNodes.
|
||||||
|
@ -321,6 +335,9 @@ public class TestDFSUpgradeWithHA {
|
||||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||||
|
|
||||||
|
// get the value of the committedTxnId in journal nodes
|
||||||
|
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||||
|
|
||||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||||
// flag.
|
// flag.
|
||||||
cluster.shutdownNameNode(1);
|
cluster.shutdownNameNode(1);
|
||||||
|
@ -331,6 +348,8 @@ public class TestDFSUpgradeWithHA {
|
||||||
checkNnPreviousDirExistence(cluster, 1, false);
|
checkNnPreviousDirExistence(cluster, 1, false);
|
||||||
checkJnPreviousDirExistence(qjCluster, true);
|
checkJnPreviousDirExistence(qjCluster, true);
|
||||||
|
|
||||||
|
assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
|
||||||
|
|
||||||
// NN0 should come up in the active state when given the -upgrade option,
|
// NN0 should come up in the active state when given the -upgrade option,
|
||||||
// so no need to transition it to active.
|
// so no need to transition it to active.
|
||||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||||
|
@ -343,6 +362,8 @@ public class TestDFSUpgradeWithHA {
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
assertTrue(fs.mkdirs(new Path("/foo3")));
|
assertTrue(fs.mkdirs(new Path("/foo3")));
|
||||||
|
|
||||||
|
assertTrue(getCommittedTxnIdValue(qjCluster) > cidBeforeUpgrade);
|
||||||
|
|
||||||
// Now bootstrap the standby with the upgraded info.
|
// Now bootstrap the standby with the upgraded info.
|
||||||
int rc = BootstrapStandby.run(
|
int rc = BootstrapStandby.run(
|
||||||
new String[]{"-force"},
|
new String[]{"-force"},
|
||||||
|
@ -389,11 +410,14 @@ public class TestDFSUpgradeWithHA {
|
||||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||||
|
|
||||||
|
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||||
|
|
||||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||||
// flag.
|
// flag.
|
||||||
cluster.shutdownNameNode(1);
|
cluster.shutdownNameNode(1);
|
||||||
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
|
cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
|
||||||
cluster.restartNameNode(0, false);
|
cluster.restartNameNode(0, false);
|
||||||
|
assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
|
||||||
|
|
||||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||||
|
|
||||||
|
@ -409,8 +433,12 @@ public class TestDFSUpgradeWithHA {
|
||||||
|
|
||||||
cluster.restartNameNode(1);
|
cluster.restartNameNode(1);
|
||||||
|
|
||||||
|
final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||||
|
assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
|
||||||
|
|
||||||
runFinalizeCommand(cluster);
|
runFinalizeCommand(cluster);
|
||||||
|
|
||||||
|
assertEquals(cidDuringUpgrade, getCommittedTxnIdValue(qjCluster));
|
||||||
checkClusterPreviousDirExistence(cluster, false);
|
checkClusterPreviousDirExistence(cluster, false);
|
||||||
checkJnPreviousDirExistence(qjCluster, false);
|
checkJnPreviousDirExistence(qjCluster, false);
|
||||||
assertCTimesEqual(cluster);
|
assertCTimesEqual(cluster);
|
||||||
|
@ -615,6 +643,8 @@ public class TestDFSUpgradeWithHA {
|
||||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
assertTrue(fs.mkdirs(new Path("/foo1")));
|
assertTrue(fs.mkdirs(new Path("/foo1")));
|
||||||
|
|
||||||
|
final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||||
|
|
||||||
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
// Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
|
||||||
// flag.
|
// flag.
|
||||||
cluster.shutdownNameNode(1);
|
cluster.shutdownNameNode(1);
|
||||||
|
@ -629,6 +659,9 @@ public class TestDFSUpgradeWithHA {
|
||||||
// so no need to transition it to active.
|
// so no need to transition it to active.
|
||||||
assertTrue(fs.mkdirs(new Path("/foo2")));
|
assertTrue(fs.mkdirs(new Path("/foo2")));
|
||||||
|
|
||||||
|
final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
|
||||||
|
assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
|
||||||
|
|
||||||
// Now bootstrap the standby with the upgraded info.
|
// Now bootstrap the standby with the upgraded info.
|
||||||
int rc = BootstrapStandby.run(
|
int rc = BootstrapStandby.run(
|
||||||
new String[]{"-force"},
|
new String[]{"-force"},
|
||||||
|
@ -649,6 +682,11 @@ public class TestDFSUpgradeWithHA {
|
||||||
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
|
conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
|
||||||
NameNode.doRollback(conf, false);
|
NameNode.doRollback(conf, false);
|
||||||
|
|
||||||
|
final long cidAfterRollback = getCommittedTxnIdValue(qjCluster);
|
||||||
|
assertTrue(cidBeforeUpgrade < cidAfterRollback);
|
||||||
|
// make sure the committedTxnId has been reset correctly after rollback
|
||||||
|
assertTrue(cidDuringUpgrade > cidAfterRollback);
|
||||||
|
|
||||||
// The rollback operation should have rolled back the first NN's local
|
// The rollback operation should have rolled back the first NN's local
|
||||||
// dirs, and the shared dir, but not the other NN's dirs. Those have to be
|
// dirs, and the shared dir, but not the other NN's dirs. Those have to be
|
||||||
// done by bootstrapping the standby.
|
// done by bootstrapping the standby.
|
||||||
|
|
Loading…
Reference in New Issue