diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 80a6273426c..26590d5f6ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -77,6 +77,8 @@ public class QuorumJournalManager implements JournalManager { // Maximum number of transactions to fetch at a time when using the // RPC edit fetch mechanism private final int maxTxnsPerRpc; + // Whether or not in-progress tailing is enabled in the configuration + private final boolean inProgressTailingEnabled; // Timeouts for which the QJM will wait for each of the following actions. private final int startSegmentTimeoutMs; private final int prepareRecoveryTimeoutMs; @@ -139,6 +141,9 @@ public class QuorumJournalManager implements JournalManager { conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT); Preconditions.checkArgument(maxTxnsPerRpc > 0, "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY); + this.inProgressTailingEnabled = conf.getBoolean( + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); // Configure timeouts. this.startSegmentTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY, @@ -420,11 +425,8 @@ public class QuorumJournalManager implements JournalManager { layoutVersion); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, "startLogSegment(" + txId + ")"); - boolean updateCommittedTxId = conf.getBoolean( - DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, - DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); return new QuorumOutputStream(loggers, txId, outputBufferCapacity, - writeTxnsTimeoutMs, updateCommittedTxId); + writeTxnsTimeoutMs); } @Override @@ -492,7 +494,10 @@ public class QuorumJournalManager implements JournalManager { public void selectInputStreams(Collection streams, long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) throws IOException { - if (inProgressOk) { + // Some calls will use inProgressOK to get in-progress edits even if + // the cache used for RPC calls is not enabled; fall back to using the + // streaming mechanism to serve such requests + if (inProgressOk && inProgressTailingEnabled) { LOG.info("Tailing edits starting from txn ID " + fromTxnId + " via RPC mechanism"); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index 3ffcd3e6dcf..e094b21a632 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -33,17 +33,15 @@ class QuorumOutputStream extends EditLogOutputStream { private EditsDoubleBuffer buf; private final long segmentTxId; private final int writeTimeoutMs; - private final boolean updateCommittedTxId; public QuorumOutputStream(AsyncLoggerSet loggers, long txId, int outputBufferCapacity, - int writeTimeoutMs, boolean updateCommittedTxId) throws IOException { + int writeTimeoutMs) throws IOException { super(); this.buf = new EditsDoubleBuffer(outputBufferCapacity); this.loggers = loggers; this.segmentTxId = txId; this.writeTimeoutMs = writeTimeoutMs; - this.updateCommittedTxId = updateCommittedTxId; } @Override @@ -112,15 +110,6 @@ class QuorumOutputStream extends EditLogOutputStream { // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); - - // If we don't have this dummy send, committed TxId might be one-batch - // stale on the Journal Nodes - if (updateCommittedTxId) { - QuorumCall fakeCall = loggers.sendEdits( - segmentTxId, firstTxToFlush, - 0, new byte[0]); - loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits"); - } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index ffb7e4992d3..9e204cb6e30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -1209,4 +1209,10 @@ public class Journal implements Closeable { public Long getJournalCTime() throws IOException { return storage.getJournalManager().getJournalCTime(); } + + @VisibleForTesting + JournaledEditsCache getJournaledEditsCache() { + return cache; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java index 11510695052..387caa18373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.util.AutoCloseableLock; - /** * An in-memory cache of edits in their serialized form. This is used to serve * the {@link Journal#getJournaledEdits(long, int)} call, used by the @@ -70,6 +69,9 @@ import org.apache.hadoop.util.AutoCloseableLock; */ class JournaledEditsCache { + private static final int INVALID_LAYOUT_VERSION = 0; + private static final long INVALID_TXN_ID = -1; + /** The capacity, in bytes, of this cache. */ private final int capacity; @@ -91,13 +93,13 @@ class JournaledEditsCache { */ private final NavigableMap dataMap = new TreeMap<>(); /** Stores the layout version currently present in the cache. */ - private int layoutVersion = Integer.MAX_VALUE; + private int layoutVersion = INVALID_LAYOUT_VERSION; /** Stores the serialized version of the header for the current version. */ private ByteBuffer layoutHeader; /** - * The lowest/highest transaction IDs present in the cache. -1 if there are no - * transactions in the cache. + * The lowest/highest transaction IDs present in the cache. + * {@value INVALID_TXN_ID} if there are no transactions in the cache. */ private long lowestTxnId; private long highestTxnId; @@ -127,7 +129,7 @@ class JournaledEditsCache { ReadWriteLock lock = new ReentrantReadWriteLock(true); readLock = new AutoCloseableLock(lock.readLock()); writeLock = new AutoCloseableLock(lock.writeLock()); - initialize(-1); + initialize(INVALID_TXN_ID); } /** @@ -144,6 +146,7 @@ class JournaledEditsCache { * transaction count of 0 will be returned. If {@code requestedStartTxn} is * lower than the lowest transaction currently contained in this cache, or no * transactions have yet been added to the cache, an exception will be thrown. + * * @param requestedStartTxn The ID of the first transaction to return. If any * transactions are returned, it is guaranteed that * the first one will have this ID. @@ -160,7 +163,7 @@ class JournaledEditsCache { int txnCount = 0; try (AutoCloseableLock l = readLock.acquire()) { - if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) { + if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) { throw getCacheMissException(requestedStartTxn); } else if (requestedStartTxn > highestTxnId) { return 0; @@ -222,6 +225,7 @@ class JournaledEditsCache { * This attempts to always handle malformed inputs gracefully rather than * throwing an exception, to allow the rest of the Journal's operations * to proceed normally. + * * @param inputData A buffer containing edits in serialized form * @param newStartTxn The txn ID of the first edit in {@code inputData} * @param newEndTxn The txn ID of the last edit in {@code inputData} @@ -246,15 +250,16 @@ class JournaledEditsCache { newStartTxn, newEndTxn, newLayoutVersion), ioe); return; } - } - if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) { - // Cache initialization step - if (lowestTxnId >= 0) { - // Cache is out of sync; clear to avoid storing noncontiguous regions - Journal.LOG.error(String.format("Edits cache is out of sync; " + - "looked for next txn id at %d but got start txn id for " + - "cache put request at %d", highestTxnId + 1, newStartTxn)); - } + } else if (lowestTxnId == INVALID_TXN_ID) { + Journal.LOG.info("Initializing edits cache starting from txn ID " + + newStartTxn); + initialize(newStartTxn); + } else if (highestTxnId + 1 != newStartTxn) { + // Cache is out of sync; clear to avoid storing noncontiguous regions + Journal.LOG.error(String.format("Edits cache is out of sync; " + + "looked for next txn id at %d but got start txn id for " + + "cache put request at %d. Reinitializing at new request.", + highestTxnId + 1, newStartTxn)); initialize(newStartTxn); } @@ -264,11 +269,12 @@ class JournaledEditsCache { totalSize -= lowest.getValue().length; } if (inputData.length > capacity) { - initialize(-1); + initialize(INVALID_TXN_ID); Journal.LOG.warn(String.format("A single batch of edits was too " + "large to fit into the cache: startTxn = %d, endTxn = %d, " + "input length = %d. The capacity of the cache (%s) must be " + - "increased for it to work properly (current capacity %d)", + "increased for it to work properly (current capacity %d)." + + "Cache is now empty.", newStartTxn, newEndTxn, inputData.length, DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity)); return; @@ -289,6 +295,7 @@ class JournaledEditsCache { * Skip through a given stream of edits until the given transaction ID is * found. Return the number of bytes that appear prior to the given * transaction. + * * @param buf A buffer containing a stream of serialized edits * @param txnId The transaction ID to search for * @return The number of bytes appearing in {@code buf} before @@ -312,13 +319,22 @@ class JournaledEditsCache { /** * Update the layout version of the cache. This clears out all existing * entries, and populates the new layout version and header for that version. + * * @param newLayoutVersion The new layout version to be stored in the cache * @param newStartTxn The new lowest transaction in the cache */ private void updateLayoutVersion(int newLayoutVersion, long newStartTxn) throws IOException { - Journal.LOG.info("Updating edits cache to use layout version " + - newLayoutVersion + "; previously was " + layoutVersion); + StringBuilder logMsg = new StringBuilder() + .append("Updating edits cache to use layout version ") + .append(newLayoutVersion) + .append(" starting from txn ID ") + .append(newStartTxn); + if (layoutVersion != INVALID_LAYOUT_VERSION) { + logMsg.append("; previous version was ").append(layoutVersion) + .append("; old entries will be cleared."); + } + Journal.LOG.info(logMsg.toString()); initialize(newStartTxn); ByteArrayOutputStream baos = new ByteArrayOutputStream(); EditLogFileOutputStream.writeHeader(newLayoutVersion, @@ -329,20 +345,23 @@ class JournaledEditsCache { /** * Initialize the cache back to a clear state. + * * @param newInitialTxnId The new lowest transaction ID stored in the cache. - * -1 if the cache is to remain empty at this time. + * This should be {@value INVALID_TXN_ID} if the cache + * is to remain empty at this time. */ private void initialize(long newInitialTxnId) { dataMap.clear(); totalSize = 0; initialTxnId = newInitialTxnId; lowestTxnId = initialTxnId; - highestTxnId = -1; + highestTxnId = INVALID_TXN_ID; // this will be set later } /** * Return the underlying data buffer used to store information about the * given transaction ID. + * * @param txnId Transaction ID whose containing buffer should be fetched. * @return The data buffer for the transaction */ @@ -354,7 +373,7 @@ class JournaledEditsCache { } private CacheMissException getCacheMissException(long requestedTxnId) { - if (lowestTxnId < 0) { + if (lowestTxnId == INVALID_TXN_ID) { return new CacheMissException(0, "Cache is empty; either it was never " + "written to or the last write overflowed the cache capacity."); } else if (requestedTxnId < initialTxnId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md index e4363fbec73..76a9837835a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md @@ -431,6 +431,34 @@ http://NN_HOSTNAME/isActive will return a 200 status code response if the NN is +### In-Progress Edit Log Tailing + +Under the default settings, the Standby NameNode will only apply edits that are present in an edit +log segments which has been finalized. If it is desirable to have a Standby NameNode which has more +up-to-date namespace information, it is possible to enable tailing of in-progress edit segments. +This setting will attempt to fetch edits from an in-memory cache on the JournalNodes and can reduce +the lag time before a transaction is applied on the Standby NameNode to the order of milliseconds. +If an edit cannot be served from the cache, the Standby will still be able to retrieve it, but the +lag time will be much longer. The relevant configurations are: + +* **dfs.ha.tail-edits.in-progress** - Whether or not to enable tailing on in-progress edits logs. + This will also enable the in-memory edit cache on the JournalNodes. Disabled by default. + +* **dfs.journalnode.edit-cache-size.bytes** - The size of the in-memory cache of edits on the + JournalNode. Edits take around 200 bytes each in a typical environment, so, for example, the + default of 1048576 (1MB) can hold around 5000 transactions. It is recommended to monitor the + JournalNode metrics RpcRequestCacheMissAmountNumMisses and RpcRequestCacheMissAmountAvgTxns, + which respectively count the number of requests unable to be served by the cache, and the extra + number of transactions which would have needed to have been in the cache for the request to + succeed. For example, if a request attempted to fetch edits starting at transaction ID 10, but + the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the + average. + +This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this +feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits +provides these nodes with the ability to serve requests with data which is much more fresh. See the +Apache JIRA ticket HDFS-12943 for more information on this feature. + Automatic Failover ------------------ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 9f089c9b16d..f3bb954dab8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -1045,6 +1045,10 @@ public class TestQuorumJournalManager { qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); writeTxns(stm, 1, 10); writeTxns(stm, 11, 1); + // One last sync whose transactions are not expected to be seen in the + // input streams because the JournalNodes have not updated their concept + // of the committed transaction ID yet + writeTxns(stm, 12, 1); futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index 30ef21b6378..837c7d9e7b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -32,6 +32,7 @@ import java.util.List; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; @@ -79,6 +80,7 @@ public class TestQuorumJournalManagerUnit { mockLogger(), mockLogger()); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { @Override protected List createLoggers(AsyncLogger.Factory factory) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java new file mode 100644 index 00000000000..de03b2c9cea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +/** + * Utilities for testing {@link Journal} instances. + */ +public class JournalTestUtil { + + /** + * Corrupt the cache of a {@link Journal} to simulate some corrupt entries + * being present. + * + * @param txid The transaction ID whose containing buffer in the cache + * should be corrupted. + * @param journal The journal whose cache should be corrupted. + */ + public static void corruptJournaledEditsCache(long txid, Journal journal) { + JournaledEditsCache cache = journal.getJournaledEditsCache(); + byte[] buf = cache.getRawDataForTests(txid); + // Change a few arbitrary bytes in the buffer + for (int i = 0; i < buf.length; i += 9) { + buf[i] = 0; + } + for (int i = 3; i < buf.length; i += 9) { + buf[i] += 10; + } + for (int i = 6; i < buf.length; i += 9) { + buf[i] -= 10; + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java index 2bdada45fdf..839407389ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.net.URI; +import java.util.Iterator; import java.util.List; import org.slf4j.Logger; @@ -30,9 +33,11 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.qjournal.server.JournalTestUtil; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; @@ -43,6 +48,7 @@ import org.junit.Before; import org.junit.Test; import com.google.common.base.Joiner; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; /** @@ -64,6 +70,8 @@ public class TestStandbyInProgressTail { // Set period of tail edits to a large value (20 mins) for test purposes conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, + 500); HAUtil.setAllowStandbyReads(conf, true); qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); cluster = qjmhaCluster.getDfsCluster(); @@ -179,12 +187,7 @@ public class TestStandbyInProgressTail { cluster.getNameNode(0).getRpcServer().mkdirs("/test", FsPermission.createImmutable((short) 0755), true); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); - - // After waiting for 5 seconds, StandbyNameNode should finish tailing - // in-progress logs - assertNotNull(getFileInfo(cluster.getNameNode(1), - "/test", true, false, false)); + waitForFileInfo(nn1, "/test"); // Restarting the standby should not finalize any edits files // in the shared directory when it starts up! @@ -227,10 +230,9 @@ public class TestStandbyInProgressTail { cluster.getNameNode(0).getRpcServer().mkdirs("/test", FsPermission.createImmutable((short) 0755), true); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); // StandbyNameNode should tail the in-progress edit - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); + waitForFileInfo(nn1, "/test"); // Create a new edit and finalized it cluster.getNameNode(0).getRpcServer().mkdirs("/test2", @@ -238,17 +240,14 @@ public class TestStandbyInProgressTail { nn0.getRpcServer().rollEditLog(); // StandbyNameNode shouldn't tail the edit since we do not call the method - assertNull(getFileInfo(nn1, "/test2", true, false, false)); + waitForFileInfo(nn1, "/test2"); // Create a new in-progress edit and let SBNN do the tail cluster.getNameNode(0).getRpcServer().mkdirs("/test3", FsPermission.createImmutable((short) 0755), true); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); // StandbyNameNode should tail the finalized edit and the new in-progress - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test2", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test3", true, false, false)); + waitForFileInfo(nn1, "/test", "/test2", "/test3"); } @Test @@ -275,12 +274,8 @@ public class TestStandbyInProgressTail { assertNull(getFileInfo(nn1, "/test2", true, false, false)); assertNull(getFileInfo(nn1, "/test3", true, false, false)); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); - - // StandbyNameNode shoudl tail the finalized edit and the new in-progress - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test2", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test3", true, false, false)); + // StandbyNameNode should tail the finalized edit and the new in-progress + waitForFileInfo(nn1, "/test", "/test2", "/test3"); } @Test @@ -295,19 +290,14 @@ public class TestStandbyInProgressTail { FsPermission.createImmutable((short) 0755), true); cluster.getNameNode(0).getRpcServer().mkdirs("/test2", FsPermission.createImmutable((short) 0755), true); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); + waitForFileInfo(nn1, "/test", "/test2"); nn0.getRpcServer().rollEditLog(); - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test2", true, false, false)); cluster.getNameNode(0).getRpcServer().mkdirs("/test3", FsPermission.createImmutable((short) 0755), true); - nn1.getNamesystem().getEditLogTailer().doTailEdits(); - // StandbyNameNode shoudl tail the finalized edit and the new in-progress - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test2", true, false, false)); - assertNotNull(getFileInfo(nn1, "/test3", true, false, false)); + // StandbyNameNode should tail the finalized edit and the new in-progress + waitForFileInfo(nn1, "/test", "/test2", "/test3"); } @Test @@ -325,8 +315,85 @@ public class TestStandbyInProgressTail { FsPermission.createImmutable((short) 0755), true); cluster.getNameNode(0).getRpcServer().rollEdits(); - cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); - assertNotNull(getFileInfo(nn1, "/test", true, false, false)); + waitForFileInfo(nn1, "/test"); + } + + @Test + public void testEditsServedViaCache() throws Exception { + cluster.transitionToActive(0); + cluster.waitActive(0); + + mkdirs(nn0, "/test", "/test2"); + nn0.getRpcServer().rollEditLog(); + for (int idx = 0; idx < qjmhaCluster.getJournalCluster().getNumNodes(); + idx++) { + File[] startingEditFile = qjmhaCluster.getJournalCluster() + .getCurrentDir(idx, DFSUtil.getNamenodeNameServiceId(conf)) + .listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.matches("edits_0+1-[0-9]+"); + } + }); + assertNotNull(startingEditFile); + assertEquals(1, startingEditFile.length); + // Delete this edit file to ensure that edits can't be served via the + // streaming mechanism - RPC/cache-based only + startingEditFile[0].delete(); + } + // Ensure edits were not tailed before the edit files were deleted; + // quick spot check of a single dir + assertNull(getFileInfo(nn1, "/tmp0", false, false, false)); + + waitForFileInfo(nn1, "/test", "/test2"); + } + + @Test + public void testCorruptJournalCache() throws Exception { + cluster.transitionToActive(0); + cluster.waitActive(0); + + // Shut down one JN so there is only a quorum remaining to make it easier + // to manage the remaining two + qjmhaCluster.getJournalCluster().getJournalNode(0).stopAndJoin(0); + + mkdirs(nn0, "/test", "/test2"); + JournalTestUtil.corruptJournaledEditsCache(1, + qjmhaCluster.getJournalCluster().getJournalNode(1) + .getJournal(DFSUtil.getNamenodeNameServiceId(conf))); + + nn0.getRpcServer().rollEditLog(); + + waitForFileInfo(nn1, "/test", "/test2"); + + mkdirs(nn0, "/test3", "/test4"); + JournalTestUtil.corruptJournaledEditsCache(3, + qjmhaCluster.getJournalCluster().getJournalNode(2) + .getJournal(DFSUtil.getNamenodeNameServiceId(conf))); + + waitForFileInfo(nn1, "/test3", "/test4"); + } + + @Test + public void testTailWithoutCache() throws Exception { + qjmhaCluster.shutdown(); + // Effectively disable the cache by setting its size too small to be used + conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1); + qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = qjmhaCluster.getDfsCluster(); + cluster.transitionToActive(0); + cluster.waitActive(0); + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + + mkdirs(nn0, "/test", "/test2"); + nn0.getRpcServer().rollEditLog(); + + mkdirs(nn0, "/test3", "/test4"); + + // Skip the last directory; the JournalNodes' idea of the committed + // txn ID may not have been updated to include it yet + waitForFileInfo(nn1, "/test", "/test2", "/test3"); } /** @@ -356,4 +423,43 @@ public class TestStandbyInProgressTail { GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files); } } + + /** + * Create the given directories on the provided NameNode. + */ + private static void mkdirs(NameNode nameNode, String... dirNames) + throws Exception { + for (String dirName : dirNames) { + nameNode.getRpcServer().mkdirs(dirName, + FsPermission.createImmutable((short) 0755), true); + } + } + + /** + * Wait up to 1 second until the given NameNode is aware of the existing of + * all of the provided fileNames. + */ + private static void waitForFileInfo(NameNode standbyNN, String... fileNames) + throws Exception { + List remainingFiles = Lists.newArrayList(fileNames); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + standbyNN.getNamesystem().getEditLogTailer().doTailEdits(); + for (Iterator it = remainingFiles.iterator(); it.hasNext();) { + if (getFileInfo(standbyNN, it.next(), true, false, false) == null) { + return false; + } else { + it.remove(); + } + } + return true; + } catch (IOException|InterruptedException e) { + throw new AssertionError("Exception while waiting: " + e); + } + } + }, 10, 1000); + } + }