HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-08-04 12:51:33 -07:00
parent fe3341786a
commit bbc6d254c8
5 changed files with 197 additions and 9 deletions

View File

@ -286,8 +286,7 @@ public class Journal implements Closeable {
fjm.setLastReadableTxId(val); fjm.setLastReadableTxId(val);
} }
@VisibleForTesting JournalMetrics getMetrics() {
JournalMetrics getMetricsForTests() {
return metrics; return metrics;
} }

View File

@ -46,6 +46,9 @@ class JournalMetrics {
@Metric("Number of batches written where this node was lagging") @Metric("Number of batches written where this node was lagging")
MutableCounterLong batchesWrittenWhileLagging; MutableCounterLong batchesWrittenWhileLagging;
@Metric("Number of edit logs downloaded by JournalNodeSyncer")
private MutableCounterLong numEditLogsSynced;
private final int[] QUANTILE_INTERVALS = new int[] { private final int[] QUANTILE_INTERVALS = new int[] {
1*60, // 1m 1*60, // 1m
5*60, // 5m 5*60, // 5m
@ -120,4 +123,12 @@ class JournalMetrics {
q.add(us); q.add(us);
} }
} }
public MutableCounterLong getNumEditLogsSynced() {
return numEditLogsSynced;
}
public void incrNumEditLogsSynced() {
numEditLogsSynced.incr();
}
} }

View File

@ -77,6 +77,7 @@ public class JournalNodeSyncer {
private final long journalSyncInterval; private final long journalSyncInterval;
private final int logSegmentTransferTimeout; private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler; private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
Configuration conf) { Configuration conf) {
@ -93,6 +94,7 @@ public class JournalNodeSyncer {
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
throttler = getThrottler(conf); throttler = getThrottler(conf);
metrics = journal.getMetrics();
} }
void stopSync() { void stopSync() {
@ -411,6 +413,8 @@ public class JournalNodeSyncer {
LOG.warn("Deleting " + tmpEditsFile + " has failed"); LOG.warn("Deleting " + tmpEditsFile + " has failed");
} }
return false; return false;
} else {
metrics.incrNumEditLogsSynced();
} }
return true; return true;
} }

View File

@ -102,7 +102,7 @@ public class TestJournalNode {
@Test(timeout=100000) @Test(timeout=100000)
public void testJournal() throws Exception { public void testJournal() throws Exception {
MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
journal.getMetricsForTests().getName()); journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@ -117,7 +117,7 @@ public class TestJournalNode {
ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics( metrics = MetricsAsserts.getMetrics(
journal.getMetricsForTests().getName()); journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@ -130,7 +130,7 @@ public class TestJournalNode {
ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics( metrics = MetricsAsserts.getMetrics(
journal.getMetricsForTests().getName()); journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.qjournal; package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -25,17 +25,21 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile; .getLogFile;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -46,6 +50,7 @@ import java.util.Random;
* Unit test for Journal Node formatting upon re-installation and syncing. * Unit test for Journal Node formatting upon re-installation and syncing.
*/ */
public class TestJournalNodeSync { public class TestJournalNodeSync {
private Configuration conf;
private MiniQJMHACluster qjmhaCluster; private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster dfsCluster; private MiniDFSCluster dfsCluster;
private MiniJournalCluster jCluster; private MiniJournalCluster jCluster;
@ -54,11 +59,18 @@ public class TestJournalNodeSync {
private int editsPerformed = 0; private int editsPerformed = 0;
private final String jid = "ns1"; private final String jid = "ns1";
@Rule
public TestName testName = new TestName();
@Before @Before
public void setUpMiniCluster() throws IOException { public void setUpMiniCluster() throws IOException {
final Configuration conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
if (testName.getMethodName().equals(
"testSyncAfterJNdowntimeWithoutQJournalQueue")) {
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
}
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build(); .build();
dfsCluster = qjmhaCluster.getDfsCluster(); dfsCluster = qjmhaCluster.getDfsCluster();
@ -214,6 +226,156 @@ public class TestJournalNodeSync {
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
} }
// Test JournalNode Sync when a JN id down while NN is actively writing
// logs and comes back up after some time.
@Test (timeout=300_000)
public void testSyncAfterJNdowntime() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
File secondJournalDir = jCluster.getJournalDir(1, jid);
File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
.getCurrentDir();
long[] startTxIds = new long[10];
startTxIds[0] = generateEditLog();
startTxIds[1] = generateEditLog();
// Stop the first JN
jCluster.getJournalNode(0).stop(0);
// Roll some more edits while the first JN is down
for (int i = 2; i < 10; i++) {
startTxIds[i] = generateEditLog(5);
}
// Re-start the first JN
jCluster.restartJournalNode(0);
// Roll an edit to update the committed tx id of the first JN
generateEditLog();
// List the edit logs rolled during JN down time.
List<File> missingLogs = Lists.newArrayList();
for (int i = 2; i < 10; i++) {
EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
false);
missingLogs.add(new File(firstJournalCurrentDir,
logFile.getFile().getName()));
}
// Check that JNSync downloaded the edit logs rolled during JN down time.
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
/**
* Test JournalNode Sync when a JN id down while NN is actively writing
* logs and comes back up after some time with no edit log queueing.
* Queuing disabled during the cluster setup {@link #setUpMiniCluster()}
* @throws Exception
*/
@Test (timeout=300_000)
public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
// Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
File secondJournalDir = jCluster.getJournalDir(1, jid);
File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
.getCurrentDir();
long[] startTxIds = new long[10];
startTxIds[0] = generateEditLog();
startTxIds[1] = generateEditLog(2);
// Stop the first JN
jCluster.getJournalNode(0).stop(0);
// Roll some more edits while the first JN is down
for (int i = 2; i < 10; i++) {
startTxIds[i] = generateEditLog(5);
}
// Re-start the first JN
jCluster.restartJournalNode(0);
// After JN restart and before rolling another edit, the missing edit
// logs will not by synced as the committed tx id of the JN will be
// less than the start tx id's of the missing edit logs and edit log queuing
// has been disabled.
// Roll an edit to update the committed tx id of the first JN
generateEditLog(2);
// List the edit logs rolled during JN down time.
List<File> missingLogs = Lists.newArrayList();
for (int i = 2; i < 10; i++) {
EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
false);
missingLogs.add(new File(firstJournalCurrentDir,
logFile.getFile().getName()));
}
// Check that JNSync downloaded the edit logs rolled during JN down time.
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
// Check that all the missing edit logs have been downloaded via
// JournalNodeSyncer alone (as the edit log queueing has been disabled)
long numEditLogsSynced = jCluster.getJournalNode(0).getOrCreateJournal(jid)
.getMetrics().getNumEditLogsSynced().value();
Assert.assertTrue("Edit logs downloaded outside syncer. Expected 8 or " +
"more downloads, got " + numEditLogsSynced + " downloads instead",
numEditLogsSynced >= 8);
}
// Test JournalNode Sync when a JN is formatted while NN is actively writing
// logs.
@Test (timeout=300_000)
public void testSyncAfterJNformat() throws Exception{
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
File secondJournalDir = jCluster.getJournalDir(1, jid);
File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
.getCurrentDir();
long[] startTxIds = new long[10];
startTxIds[0] = generateEditLog(1);
startTxIds[1] = generateEditLog(2);
startTxIds[2] = generateEditLog(4);
startTxIds[3] = generateEditLog(6);
Journal journal1 = jCluster.getJournalNode(0).getOrCreateJournal(jid);
NamespaceInfo nsInfo = journal1.getStorage().getNamespaceInfo();
// Delete contents of current directory of one JN
for (File file : firstJournalCurrentDir.listFiles()) {
file.delete();
}
// Format the JN
journal1.format(nsInfo);
// Roll some more edits
for (int i = 4; i < 10; i++) {
startTxIds[i] = generateEditLog(5);
}
// List the edit logs rolled during JN down time.
List<File> missingLogs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
false);
missingLogs.add(new File(firstJournalCurrentDir,
logFile.getFile().getName()));
}
// Check that the formatted JN has all the edit logs.
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
private File deleteEditLog(File currentDir, long startTxId) private File deleteEditLog(File currentDir, long startTxId)
throws IOException { throws IOException {
EditLogFile logFile = getLogFile(currentDir, startTxId); EditLogFile logFile = getLogFile(currentDir, startTxId);
@ -242,8 +404,20 @@ public class TestJournalNodeSync {
* @return the startTxId of next segment after rolling edits. * @return the startTxId of next segment after rolling edits.
*/ */
private long generateEditLog() throws IOException { private long generateEditLog() throws IOException {
return generateEditLog(1);
}
/**
* Does specified number of edits and rolls the Edit Log.
*
* @param numEdits number of Edits to perform
* @return the startTxId of next segment after rolling edits.
*/
private long generateEditLog(int numEdits) throws IOException {
long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
for (int i = 1; i <= numEdits; i++) {
Assert.assertTrue("Failed to do an edit", doAnEdit()); Assert.assertTrue("Failed to do an edit", doAnEdit());
}
dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
return startTxId; return startTxId;
} }