HDFS-13610. [SBN read] Edit Tail Fast Path Part 4: Cleanup. Integration test, documentation, remove unnecessary dummy sync, minors fixups. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2018-05-11 13:23:38 -07:00 committed by Konstantin V Shvachko
parent e27708c2da
commit 1e22f2bfbb
9 changed files with 275 additions and 68 deletions

View File

@ -77,6 +77,8 @@ public class QuorumJournalManager implements JournalManager {
// Maximum number of transactions to fetch at a time when using the // Maximum number of transactions to fetch at a time when using the
// RPC edit fetch mechanism // RPC edit fetch mechanism
private final int maxTxnsPerRpc; private final int maxTxnsPerRpc;
// Whether or not in-progress tailing is enabled in the configuration
private final boolean inProgressTailingEnabled;
// Timeouts for which the QJM will wait for each of the following actions. // Timeouts for which the QJM will wait for each of the following actions.
private final int startSegmentTimeoutMs; private final int startSegmentTimeoutMs;
private final int prepareRecoveryTimeoutMs; private final int prepareRecoveryTimeoutMs;
@ -139,6 +141,9 @@ public class QuorumJournalManager implements JournalManager {
conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT); conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
Preconditions.checkArgument(maxTxnsPerRpc > 0, Preconditions.checkArgument(maxTxnsPerRpc > 0,
"Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY); "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
this.inProgressTailingEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
// Configure timeouts. // Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt( this.startSegmentTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@ -420,11 +425,8 @@ public class QuorumJournalManager implements JournalManager {
layoutVersion); layoutVersion);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
"startLogSegment(" + txId + ")"); "startLogSegment(" + txId + ")");
boolean updateCommittedTxId = conf.getBoolean(
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
return new QuorumOutputStream(loggers, txId, outputBufferCapacity, return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
writeTxnsTimeoutMs, updateCommittedTxId); writeTxnsTimeoutMs);
} }
@Override @Override
@ -492,7 +494,10 @@ public class QuorumJournalManager implements JournalManager {
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk, long fromTxnId, boolean inProgressOk,
boolean onlyDurableTxns) throws IOException { boolean onlyDurableTxns) throws IOException {
if (inProgressOk) { // Some calls will use inProgressOK to get in-progress edits even if
// the cache used for RPC calls is not enabled; fall back to using the
// streaming mechanism to serve such requests
if (inProgressOk && inProgressTailingEnabled) {
LOG.info("Tailing edits starting from txn ID " + fromTxnId + LOG.info("Tailing edits starting from txn ID " + fromTxnId +
" via RPC mechanism"); " via RPC mechanism");
try { try {

View File

@ -33,17 +33,15 @@ class QuorumOutputStream extends EditLogOutputStream {
private EditsDoubleBuffer buf; private EditsDoubleBuffer buf;
private final long segmentTxId; private final long segmentTxId;
private final int writeTimeoutMs; private final int writeTimeoutMs;
private final boolean updateCommittedTxId;
public QuorumOutputStream(AsyncLoggerSet loggers, public QuorumOutputStream(AsyncLoggerSet loggers,
long txId, int outputBufferCapacity, long txId, int outputBufferCapacity,
int writeTimeoutMs, boolean updateCommittedTxId) throws IOException { int writeTimeoutMs) throws IOException {
super(); super();
this.buf = new EditsDoubleBuffer(outputBufferCapacity); this.buf = new EditsDoubleBuffer(outputBufferCapacity);
this.loggers = loggers; this.loggers = loggers;
this.segmentTxId = txId; this.segmentTxId = txId;
this.writeTimeoutMs = writeTimeoutMs; this.writeTimeoutMs = writeTimeoutMs;
this.updateCommittedTxId = updateCommittedTxId;
} }
@Override @Override
@ -112,15 +110,6 @@ class QuorumOutputStream extends EditLogOutputStream {
// RPCs will thus let the loggers know of the most recent transaction, even // RPCs will thus let the loggers know of the most recent transaction, even
// if a logger has fallen behind. // if a logger has fallen behind.
loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
// If we don't have this dummy send, committed TxId might be one-batch
// stale on the Journal Nodes
if (updateCommittedTxId) {
QuorumCall<AsyncLogger, Void> fakeCall = loggers.sendEdits(
segmentTxId, firstTxToFlush,
0, new byte[0]);
loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits");
}
} }
} }

View File

@ -1209,4 +1209,10 @@ public class Journal implements Closeable {
public Long getJournalCTime() throws IOException { public Long getJournalCTime() throws IOException {
return storage.getJournalManager().getJournalCTime(); return storage.getJournalManager().getJournalCTime();
} }
@VisibleForTesting
JournaledEditsCache getJournaledEditsCache() {
return cache;
}
} }

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
/** /**
* An in-memory cache of edits in their serialized form. This is used to serve * An in-memory cache of edits in their serialized form. This is used to serve
* the {@link Journal#getJournaledEdits(long, int)} call, used by the * the {@link Journal#getJournaledEdits(long, int)} call, used by the
@ -70,6 +69,9 @@ import org.apache.hadoop.util.AutoCloseableLock;
*/ */
class JournaledEditsCache { class JournaledEditsCache {
private static final int INVALID_LAYOUT_VERSION = 0;
private static final long INVALID_TXN_ID = -1;
/** The capacity, in bytes, of this cache. */ /** The capacity, in bytes, of this cache. */
private final int capacity; private final int capacity;
@ -91,13 +93,13 @@ class JournaledEditsCache {
*/ */
private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>(); private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
/** Stores the layout version currently present in the cache. */ /** Stores the layout version currently present in the cache. */
private int layoutVersion = Integer.MAX_VALUE; private int layoutVersion = INVALID_LAYOUT_VERSION;
/** Stores the serialized version of the header for the current version. */ /** Stores the serialized version of the header for the current version. */
private ByteBuffer layoutHeader; private ByteBuffer layoutHeader;
/** /**
* The lowest/highest transaction IDs present in the cache. -1 if there are no * The lowest/highest transaction IDs present in the cache.
* transactions in the cache. * {@value INVALID_TXN_ID} if there are no transactions in the cache.
*/ */
private long lowestTxnId; private long lowestTxnId;
private long highestTxnId; private long highestTxnId;
@ -127,7 +129,7 @@ class JournaledEditsCache {
ReadWriteLock lock = new ReentrantReadWriteLock(true); ReadWriteLock lock = new ReentrantReadWriteLock(true);
readLock = new AutoCloseableLock(lock.readLock()); readLock = new AutoCloseableLock(lock.readLock());
writeLock = new AutoCloseableLock(lock.writeLock()); writeLock = new AutoCloseableLock(lock.writeLock());
initialize(-1); initialize(INVALID_TXN_ID);
} }
/** /**
@ -144,6 +146,7 @@ class JournaledEditsCache {
* transaction count of 0 will be returned. If {@code requestedStartTxn} is * transaction count of 0 will be returned. If {@code requestedStartTxn} is
* lower than the lowest transaction currently contained in this cache, or no * lower than the lowest transaction currently contained in this cache, or no
* transactions have yet been added to the cache, an exception will be thrown. * transactions have yet been added to the cache, an exception will be thrown.
*
* @param requestedStartTxn The ID of the first transaction to return. If any * @param requestedStartTxn The ID of the first transaction to return. If any
* transactions are returned, it is guaranteed that * transactions are returned, it is guaranteed that
* the first one will have this ID. * the first one will have this ID.
@ -160,7 +163,7 @@ class JournaledEditsCache {
int txnCount = 0; int txnCount = 0;
try (AutoCloseableLock l = readLock.acquire()) { try (AutoCloseableLock l = readLock.acquire()) {
if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) { if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
throw getCacheMissException(requestedStartTxn); throw getCacheMissException(requestedStartTxn);
} else if (requestedStartTxn > highestTxnId) { } else if (requestedStartTxn > highestTxnId) {
return 0; return 0;
@ -222,6 +225,7 @@ class JournaledEditsCache {
* This attempts to always handle malformed inputs gracefully rather than * This attempts to always handle malformed inputs gracefully rather than
* throwing an exception, to allow the rest of the Journal's operations * throwing an exception, to allow the rest of the Journal's operations
* to proceed normally. * to proceed normally.
*
* @param inputData A buffer containing edits in serialized form * @param inputData A buffer containing edits in serialized form
* @param newStartTxn The txn ID of the first edit in {@code inputData} * @param newStartTxn The txn ID of the first edit in {@code inputData}
* @param newEndTxn The txn ID of the last edit in {@code inputData} * @param newEndTxn The txn ID of the last edit in {@code inputData}
@ -246,15 +250,16 @@ class JournaledEditsCache {
newStartTxn, newEndTxn, newLayoutVersion), ioe); newStartTxn, newEndTxn, newLayoutVersion), ioe);
return; return;
} }
} } else if (lowestTxnId == INVALID_TXN_ID) {
if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) { Journal.LOG.info("Initializing edits cache starting from txn ID " +
// Cache initialization step newStartTxn);
if (lowestTxnId >= 0) { initialize(newStartTxn);
} else if (highestTxnId + 1 != newStartTxn) {
// Cache is out of sync; clear to avoid storing noncontiguous regions // Cache is out of sync; clear to avoid storing noncontiguous regions
Journal.LOG.error(String.format("Edits cache is out of sync; " + Journal.LOG.error(String.format("Edits cache is out of sync; " +
"looked for next txn id at %d but got start txn id for " + "looked for next txn id at %d but got start txn id for " +
"cache put request at %d", highestTxnId + 1, newStartTxn)); "cache put request at %d. Reinitializing at new request.",
} highestTxnId + 1, newStartTxn));
initialize(newStartTxn); initialize(newStartTxn);
} }
@ -264,11 +269,12 @@ class JournaledEditsCache {
totalSize -= lowest.getValue().length; totalSize -= lowest.getValue().length;
} }
if (inputData.length > capacity) { if (inputData.length > capacity) {
initialize(-1); initialize(INVALID_TXN_ID);
Journal.LOG.warn(String.format("A single batch of edits was too " + Journal.LOG.warn(String.format("A single batch of edits was too " +
"large to fit into the cache: startTxn = %d, endTxn = %d, " + "large to fit into the cache: startTxn = %d, endTxn = %d, " +
"input length = %d. The capacity of the cache (%s) must be " + "input length = %d. The capacity of the cache (%s) must be " +
"increased for it to work properly (current capacity %d)", "increased for it to work properly (current capacity %d)." +
"Cache is now empty.",
newStartTxn, newEndTxn, inputData.length, newStartTxn, newEndTxn, inputData.length,
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity)); DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
return; return;
@ -289,6 +295,7 @@ class JournaledEditsCache {
* Skip through a given stream of edits until the given transaction ID is * Skip through a given stream of edits until the given transaction ID is
* found. Return the number of bytes that appear prior to the given * found. Return the number of bytes that appear prior to the given
* transaction. * transaction.
*
* @param buf A buffer containing a stream of serialized edits * @param buf A buffer containing a stream of serialized edits
* @param txnId The transaction ID to search for * @param txnId The transaction ID to search for
* @return The number of bytes appearing in {@code buf} <i>before</i> * @return The number of bytes appearing in {@code buf} <i>before</i>
@ -312,13 +319,22 @@ class JournaledEditsCache {
/** /**
* Update the layout version of the cache. This clears out all existing * Update the layout version of the cache. This clears out all existing
* entries, and populates the new layout version and header for that version. * entries, and populates the new layout version and header for that version.
*
* @param newLayoutVersion The new layout version to be stored in the cache * @param newLayoutVersion The new layout version to be stored in the cache
* @param newStartTxn The new lowest transaction in the cache * @param newStartTxn The new lowest transaction in the cache
*/ */
private void updateLayoutVersion(int newLayoutVersion, long newStartTxn) private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
throws IOException { throws IOException {
Journal.LOG.info("Updating edits cache to use layout version " + StringBuilder logMsg = new StringBuilder()
newLayoutVersion + "; previously was " + layoutVersion); .append("Updating edits cache to use layout version ")
.append(newLayoutVersion)
.append(" starting from txn ID ")
.append(newStartTxn);
if (layoutVersion != INVALID_LAYOUT_VERSION) {
logMsg.append("; previous version was ").append(layoutVersion)
.append("; old entries will be cleared.");
}
Journal.LOG.info(logMsg.toString());
initialize(newStartTxn); initialize(newStartTxn);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
EditLogFileOutputStream.writeHeader(newLayoutVersion, EditLogFileOutputStream.writeHeader(newLayoutVersion,
@ -329,20 +345,23 @@ class JournaledEditsCache {
/** /**
* Initialize the cache back to a clear state. * Initialize the cache back to a clear state.
*
* @param newInitialTxnId The new lowest transaction ID stored in the cache. * @param newInitialTxnId The new lowest transaction ID stored in the cache.
* -1 if the cache is to remain empty at this time. * This should be {@value INVALID_TXN_ID} if the cache
* is to remain empty at this time.
*/ */
private void initialize(long newInitialTxnId) { private void initialize(long newInitialTxnId) {
dataMap.clear(); dataMap.clear();
totalSize = 0; totalSize = 0;
initialTxnId = newInitialTxnId; initialTxnId = newInitialTxnId;
lowestTxnId = initialTxnId; lowestTxnId = initialTxnId;
highestTxnId = -1; highestTxnId = INVALID_TXN_ID; // this will be set later
} }
/** /**
* Return the underlying data buffer used to store information about the * Return the underlying data buffer used to store information about the
* given transaction ID. * given transaction ID.
*
* @param txnId Transaction ID whose containing buffer should be fetched. * @param txnId Transaction ID whose containing buffer should be fetched.
* @return The data buffer for the transaction * @return The data buffer for the transaction
*/ */
@ -354,7 +373,7 @@ class JournaledEditsCache {
} }
private CacheMissException getCacheMissException(long requestedTxnId) { private CacheMissException getCacheMissException(long requestedTxnId) {
if (lowestTxnId < 0) { if (lowestTxnId == INVALID_TXN_ID) {
return new CacheMissException(0, "Cache is empty; either it was never " + return new CacheMissException(0, "Cache is empty; either it was never " +
"written to or the last write overflowed the cache capacity."); "written to or the last write overflowed the cache capacity.");
} else if (requestedTxnId < initialTxnId) { } else if (requestedTxnId < initialTxnId) {

View File

@ -431,6 +431,34 @@ http://NN_HOSTNAME/isActive will return a 200 status code response if the NN is
### In-Progress Edit Log Tailing
Under the default settings, the Standby NameNode will only apply edits that are present in an edit
log segments which has been finalized. If it is desirable to have a Standby NameNode which has more
up-to-date namespace information, it is possible to enable tailing of in-progress edit segments.
This setting will attempt to fetch edits from an in-memory cache on the JournalNodes and can reduce
the lag time before a transaction is applied on the Standby NameNode to the order of milliseconds.
If an edit cannot be served from the cache, the Standby will still be able to retrieve it, but the
lag time will be much longer. The relevant configurations are:
* **dfs.ha.tail-edits.in-progress** - Whether or not to enable tailing on in-progress edits logs.
This will also enable the in-memory edit cache on the JournalNodes. Disabled by default.
* **dfs.journalnode.edit-cache-size.bytes** - The size of the in-memory cache of edits on the
JournalNode. Edits take around 200 bytes each in a typical environment, so, for example, the
default of 1048576 (1MB) can hold around 5000 transactions. It is recommended to monitor the
JournalNode metrics RpcRequestCacheMissAmountNumMisses and RpcRequestCacheMissAmountAvgTxns,
which respectively count the number of requests unable to be served by the cache, and the extra
number of transactions which would have needed to have been in the cache for the request to
succeed. For example, if a request attempted to fetch edits starting at transaction ID 10, but
the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the
average.
This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this
feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits
provides these nodes with the ability to serve requests with data which is much more fresh. See the
Apache JIRA ticket HDFS-12943 for more information on this feature.
Automatic Failover Automatic Failover
------------------ ------------------

View File

@ -1045,6 +1045,10 @@ public class TestQuorumJournalManager {
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
writeTxns(stm, 1, 10); writeTxns(stm, 1, 10);
writeTxns(stm, 11, 1); writeTxns(stm, 11, 1);
// One last sync whose transactions are not expected to be seen in the
// input streams because the JournalNodes have not updated their concept
// of the committed transaction ID yet
writeTxns(stm, 12, 1);
futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1, futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

View File

@ -32,6 +32,7 @@ import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@ -79,6 +80,7 @@ public class TestQuorumJournalManagerUnit {
mockLogger(), mockLogger(),
mockLogger()); mockLogger());
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
@Override @Override
protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) { protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
/**
* Utilities for testing {@link Journal} instances.
*/
public class JournalTestUtil {
/**
* Corrupt the cache of a {@link Journal} to simulate some corrupt entries
* being present.
*
* @param txid The transaction ID whose containing buffer in the cache
* should be corrupted.
* @param journal The journal whose cache should be corrupted.
*/
public static void corruptJournaledEditsCache(long txid, Journal journal) {
JournaledEditsCache cache = journal.getJournaledEditsCache();
byte[] buf = cache.getRawDataForTests(txid);
// Change a few arbitrary bytes in the buffer
for (int i = 0; i < buf.length; i += 9) {
buf[i] = 0;
}
for (int i = 3; i < buf.length; i += 9) {
buf[i] += 10;
}
for (int i = 6; i < buf.length; i += 9) {
buf[i] -= 10;
}
}
}

View File

@ -17,12 +17,15 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -30,9 +33,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.qjournal.server.JournalTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -43,6 +48,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/** /**
@ -64,6 +70,8 @@ public class TestStandbyInProgressTail {
// Set period of tail edits to a large value (20 mins) for test purposes // Set period of tail edits to a large value (20 mins) for test purposes
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
500);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
cluster = qjmhaCluster.getDfsCluster(); cluster = qjmhaCluster.getDfsCluster();
@ -179,12 +187,7 @@ public class TestStandbyInProgressTail {
cluster.getNameNode(0).getRpcServer().mkdirs("/test", cluster.getNameNode(0).getRpcServer().mkdirs("/test",
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
nn1.getNamesystem().getEditLogTailer().doTailEdits(); waitForFileInfo(nn1, "/test");
// After waiting for 5 seconds, StandbyNameNode should finish tailing
// in-progress logs
assertNotNull(getFileInfo(cluster.getNameNode(1),
"/test", true, false, false));
// Restarting the standby should not finalize any edits files // Restarting the standby should not finalize any edits files
// in the shared directory when it starts up! // in the shared directory when it starts up!
@ -227,10 +230,9 @@ public class TestStandbyInProgressTail {
cluster.getNameNode(0).getRpcServer().mkdirs("/test", cluster.getNameNode(0).getRpcServer().mkdirs("/test",
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
nn1.getNamesystem().getEditLogTailer().doTailEdits();
// StandbyNameNode should tail the in-progress edit // StandbyNameNode should tail the in-progress edit
assertNotNull(getFileInfo(nn1, "/test", true, false, false)); waitForFileInfo(nn1, "/test");
// Create a new edit and finalized it // Create a new edit and finalized it
cluster.getNameNode(0).getRpcServer().mkdirs("/test2", cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
@ -238,17 +240,14 @@ public class TestStandbyInProgressTail {
nn0.getRpcServer().rollEditLog(); nn0.getRpcServer().rollEditLog();
// StandbyNameNode shouldn't tail the edit since we do not call the method // StandbyNameNode shouldn't tail the edit since we do not call the method
assertNull(getFileInfo(nn1, "/test2", true, false, false)); waitForFileInfo(nn1, "/test2");
// Create a new in-progress edit and let SBNN do the tail // Create a new in-progress edit and let SBNN do the tail
cluster.getNameNode(0).getRpcServer().mkdirs("/test3", cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
nn1.getNamesystem().getEditLogTailer().doTailEdits();
// StandbyNameNode should tail the finalized edit and the new in-progress // StandbyNameNode should tail the finalized edit and the new in-progress
assertNotNull(getFileInfo(nn1, "/test", true, false, false)); waitForFileInfo(nn1, "/test", "/test2", "/test3");
assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
} }
@Test @Test
@ -275,12 +274,8 @@ public class TestStandbyInProgressTail {
assertNull(getFileInfo(nn1, "/test2", true, false, false)); assertNull(getFileInfo(nn1, "/test2", true, false, false));
assertNull(getFileInfo(nn1, "/test3", true, false, false)); assertNull(getFileInfo(nn1, "/test3", true, false, false));
nn1.getNamesystem().getEditLogTailer().doTailEdits(); // StandbyNameNode should tail the finalized edit and the new in-progress
waitForFileInfo(nn1, "/test", "/test2", "/test3");
// StandbyNameNode shoudl tail the finalized edit and the new in-progress
assertNotNull(getFileInfo(nn1, "/test", true, false, false));
assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
} }
@Test @Test
@ -295,19 +290,14 @@ public class TestStandbyInProgressTail {
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
cluster.getNameNode(0).getRpcServer().mkdirs("/test2", cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
nn1.getNamesystem().getEditLogTailer().doTailEdits(); waitForFileInfo(nn1, "/test", "/test2");
nn0.getRpcServer().rollEditLog(); nn0.getRpcServer().rollEditLog();
assertNotNull(getFileInfo(nn1, "/test", true, false, false));
assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
cluster.getNameNode(0).getRpcServer().mkdirs("/test3", cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
nn1.getNamesystem().getEditLogTailer().doTailEdits();
// StandbyNameNode shoudl tail the finalized edit and the new in-progress // StandbyNameNode should tail the finalized edit and the new in-progress
assertNotNull(getFileInfo(nn1, "/test", true, false, false)); waitForFileInfo(nn1, "/test", "/test2", "/test3");
assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
} }
@Test @Test
@ -325,8 +315,85 @@ public class TestStandbyInProgressTail {
FsPermission.createImmutable((short) 0755), true); FsPermission.createImmutable((short) 0755), true);
cluster.getNameNode(0).getRpcServer().rollEdits(); cluster.getNameNode(0).getRpcServer().rollEdits();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); waitForFileInfo(nn1, "/test");
assertNotNull(getFileInfo(nn1, "/test", true, false, false)); }
@Test
public void testEditsServedViaCache() throws Exception {
cluster.transitionToActive(0);
cluster.waitActive(0);
mkdirs(nn0, "/test", "/test2");
nn0.getRpcServer().rollEditLog();
for (int idx = 0; idx < qjmhaCluster.getJournalCluster().getNumNodes();
idx++) {
File[] startingEditFile = qjmhaCluster.getJournalCluster()
.getCurrentDir(idx, DFSUtil.getNamenodeNameServiceId(conf))
.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.matches("edits_0+1-[0-9]+");
}
});
assertNotNull(startingEditFile);
assertEquals(1, startingEditFile.length);
// Delete this edit file to ensure that edits can't be served via the
// streaming mechanism - RPC/cache-based only
startingEditFile[0].delete();
}
// Ensure edits were not tailed before the edit files were deleted;
// quick spot check of a single dir
assertNull(getFileInfo(nn1, "/tmp0", false, false, false));
waitForFileInfo(nn1, "/test", "/test2");
}
@Test
public void testCorruptJournalCache() throws Exception {
cluster.transitionToActive(0);
cluster.waitActive(0);
// Shut down one JN so there is only a quorum remaining to make it easier
// to manage the remaining two
qjmhaCluster.getJournalCluster().getJournalNode(0).stopAndJoin(0);
mkdirs(nn0, "/test", "/test2");
JournalTestUtil.corruptJournaledEditsCache(1,
qjmhaCluster.getJournalCluster().getJournalNode(1)
.getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
nn0.getRpcServer().rollEditLog();
waitForFileInfo(nn1, "/test", "/test2");
mkdirs(nn0, "/test3", "/test4");
JournalTestUtil.corruptJournaledEditsCache(3,
qjmhaCluster.getJournalCluster().getJournalNode(2)
.getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
waitForFileInfo(nn1, "/test3", "/test4");
}
@Test
public void testTailWithoutCache() throws Exception {
qjmhaCluster.shutdown();
// Effectively disable the cache by setting its size too small to be used
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
cluster = qjmhaCluster.getDfsCluster();
cluster.transitionToActive(0);
cluster.waitActive(0);
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
mkdirs(nn0, "/test", "/test2");
nn0.getRpcServer().rollEditLog();
mkdirs(nn0, "/test3", "/test4");
// Skip the last directory; the JournalNodes' idea of the committed
// txn ID may not have been updated to include it yet
waitForFileInfo(nn1, "/test", "/test2", "/test3");
} }
/** /**
@ -356,4 +423,43 @@ public class TestStandbyInProgressTail {
GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files); GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
} }
} }
/**
* Create the given directories on the provided NameNode.
*/
private static void mkdirs(NameNode nameNode, String... dirNames)
throws Exception {
for (String dirName : dirNames) {
nameNode.getRpcServer().mkdirs(dirName,
FsPermission.createImmutable((short) 0755), true);
}
}
/**
* Wait up to 1 second until the given NameNode is aware of the existing of
* all of the provided fileNames.
*/
private static void waitForFileInfo(NameNode standbyNN, String... fileNames)
throws Exception {
List<String> remainingFiles = Lists.newArrayList(fileNames);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
standbyNN.getNamesystem().getEditLogTailer().doTailEdits();
for (Iterator<String> it = remainingFiles.iterator(); it.hasNext();) {
if (getFileInfo(standbyNN, it.next(), true, false, false) == null) {
return false;
} else {
it.remove();
}
}
return true;
} catch (IOException|InterruptedException e) {
throw new AssertionError("Exception while waiting: " + e);
}
}
}, 10, 1000);
}
} }