HDFS-12356. Unit test for JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-08-30 10:29:42 -07:00
parent d04f85f387
commit fd66a243bf
1 changed files with 137 additions and 39 deletions

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -54,10 +56,11 @@ public class TestJournalNodeSync {
private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster dfsCluster;
private MiniJournalCluster jCluster;
private FileSystem fs;
private FSNamesystem namesystem;
private int editsPerformed = 0;
private final String jid = "ns1";
private int activeNNindex=0;
private static final int DFS_HA_TAILEDITS_PERIOD_SECONDS=1;
@Rule
public TestName testName = new TestName();
@ -71,13 +74,16 @@ public class TestJournalNodeSync {
"testSyncAfterJNdowntimeWithoutQJournalQueue")) {
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
}
if (testName.getMethodName().equals("testSyncDuringRollingUpgrade")) {
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
DFS_HA_TAILEDITS_PERIOD_SECONDS);
}
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build();
dfsCluster = qjmhaCluster.getDfsCluster();
jCluster = qjmhaCluster.getJournalCluster();
dfsCluster.transitionToActive(0);
fs = dfsCluster.getFileSystem(0);
namesystem = dfsCluster.getNamesystem(0);
}
@ -192,36 +198,7 @@ public class TestJournalNodeSync {
// the journals.
@Test(timeout=60000)
public void testRandomJournalMissingLogs() throws Exception {
Random randomJournal = new Random();
List<File> journalCurrentDirs = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
jid)).getCurrentDir());
}
int count = 0;
long lastStartTxId;
int journalIndex;
List<File> missingLogs = Lists.newArrayList();
while (count < 5) {
lastStartTxId = generateEditLog();
// Delete the last edit log segment from randomly selected journal node
journalIndex = randomJournal.nextInt(3);
missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
lastStartTxId));
// Delete the last edit log segment from two journals for some logs
if (count % 2 == 0) {
journalIndex = (journalIndex + 1) % 3;
missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
lastStartTxId));
}
count++;
}
List<File> missingLogs = deleteEditLogsFromRandomJN();
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
@ -277,7 +254,8 @@ public class TestJournalNodeSync {
*/
@Test (timeout=300_000)
public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
// Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
// QJournal Queuing is disabled during the cluster setup
// {@link #setUpMiniCluster()}
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
@ -376,11 +354,88 @@ public class TestJournalNodeSync {
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
// Test JournalNode Sync during a Rolling Upgrade of NN.
@Test (timeout=300_000)
public void testSyncDuringRollingUpgrade() throws Exception {
DistributedFileSystem dfsActive;
int standbyNNindex;
if (dfsCluster.getNameNode(0).isActiveState()) {
activeNNindex = 0;
standbyNNindex = 1;
} else {
activeNNindex = 1;
standbyNNindex = 0;
}
dfsActive = dfsCluster.getFileSystem(activeNNindex);
// Prepare for rolling upgrade
final RollingUpgradeInfo info = dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.PREPARE);
//query rolling upgrade
Assert.assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
// Restart the Standby NN with rollingUpgrade option
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
// Do some edits and delete some edit logs
List<File> missingLogs = deleteEditLogsFromRandomJN();
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
// Transition the active NN to standby and standby to active
dfsCluster.transitionToStandby(activeNNindex);
// Let Standby NN catch up tailing edit logs before transitioning it to
// active
Thread.sleep(30*DFS_HA_TAILEDITS_PERIOD_SECONDS*1000);
dfsCluster.transitionToActive(standbyNNindex);
dfsCluster.waitActive();
activeNNindex=standbyNNindex;
standbyNNindex=((activeNNindex+1)%2);
dfsActive = dfsCluster.getFileSystem(activeNNindex);
Assert.assertTrue(dfsCluster.getNameNode(activeNNindex).isActiveState());
Assert.assertFalse(dfsCluster.getNameNode(standbyNNindex).isActiveState());
// Restart the current standby NN (previously active)
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
dfsCluster.waitActive();
// Do some edits and delete some edit logs
missingLogs.addAll(deleteEditLogsFromRandomJN());
// Check that JNSync downloaded the edit logs rolled during rolling upgrade.
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
//finalize rolling upgrade
final RollingUpgradeInfo finalize = dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.FINALIZE);
Assert.assertTrue(finalize.isFinalized());
// Check the missing edit logs exist after finalizing rolling upgrade
for (File editLog : missingLogs) {
Assert.assertTrue("Edit log missing after finalizing rolling upgrade",
editLog.exists());
}
}
private File deleteEditLog(File currentDir, long startTxId)
throws IOException {
EditLogFile logFile = getLogFile(currentDir, startTxId);
while (logFile.isInProgress()) {
dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
logFile = getLogFile(currentDir, startTxId);
}
File deleteFile = logFile.getFile();
@ -389,13 +444,55 @@ public class TestJournalNodeSync {
return deleteFile;
}
private List<File> deleteEditLogsFromRandomJN() throws IOException {
Random random = new Random();
List<File> journalCurrentDirs = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
jid)).getCurrentDir());
}
long[] startTxIds = new long[20];
for (int i = 0; i < 20; i++) {
startTxIds[i] = generateEditLog();
}
int count = 0, startTxIdIndex;
long startTxId;
int journalIndex;
List<File> missingLogs = Lists.newArrayList();
List<Integer> deletedStartTxIds = Lists.newArrayList();
while (count < 5) {
// Select a random edit log to delete
startTxIdIndex = random.nextInt(20);
while (deletedStartTxIds.contains(startTxIdIndex)) {
startTxIdIndex = random.nextInt(20);
}
startTxId = startTxIds[startTxIdIndex];
deletedStartTxIds.add(startTxIdIndex);
// Delete the randomly selected edit log segment from randomly selected
// journal node
journalIndex = random.nextInt(3);
missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
startTxId));
count++;
}
return missingLogs;
}
/**
* Do a mutative metadata operation on the file system.
*
* @return true if the operation was successful, false otherwise.
*/
private boolean doAnEdit() throws IOException {
return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
return dfsCluster.getFileSystem(activeNNindex).mkdirs(
new Path("/tmp", Integer.toString(editsPerformed++)));
}
/**
@ -414,12 +511,13 @@ public class TestJournalNodeSync {
* @return the startTxId of next segment after rolling edits.
*/
private long generateEditLog(int numEdits) throws IOException {
long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
long lastWrittenTxId = dfsCluster.getNameNode(activeNNindex).getFSImage()
.getEditLog().getLastWrittenTxId();
for (int i = 1; i <= numEdits; i++) {
Assert.assertTrue("Failed to do an edit", doAnEdit());
}
dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
return startTxId;
dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
return lastWrittenTxId;
}
private Supplier<Boolean> editLogExists(List<File> editLogs) {