HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.

(cherry picked from commit 2151716832ad14932dd65b1a4e47e64d8d6cd767)
This commit is contained in:
Jing Zhao 2016-02-29 15:34:43 -08:00
parent c5db4ab0b4
commit 3cb7ae11a8
21 changed files with 904 additions and 306 deletions

View File

@ -70,6 +70,8 @@ Release 2.9.0 - UNRELEASED
HDFS-9754. Avoid unnecessary getBlockCollection calls in BlockManager. HDFS-9754. Avoid unnecessary getBlockCollection calls in BlockManager.
(jing9) (jing9)
HDFS-7964. Add support for async edit logging. (Daryn Sharp)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,6 +24,9 @@
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
@ -56,11 +59,14 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
/** /**
* Integration test to ensure that the BookKeeper JournalManager * Integration test to ensure that the BookKeeper JournalManager
* works for HDFS Namenode HA * works for HDFS Namenode HA
*/ */
@RunWith(Parameterized.class)
public class TestBookKeeperAsHASharedDir { public class TestBookKeeperAsHASharedDir {
static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class); static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir {
private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager"; private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestBookKeeperAsHASharedDir(Boolean async) {
useAsyncEditLog = async;
}
private static Configuration getConf() {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
return conf;
}
@BeforeClass @BeforeClass
public static void setupBookkeeper() throws Exception { public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies); bkutil = new BKJMUtil(numBookies);
@ -92,8 +119,7 @@ public static void teardownBookkeeper() throws Exception {
public void testFailoverWithBK() throws Exception { public void testFailoverWithBK() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailover").toString()); BKJMUtil.createJournalURI("/hotfailover").toString());
BKJMUtil.addJournalManagerDefinition(conf); BKJMUtil.addJournalManagerDefinition(conf);
@ -144,8 +170,7 @@ public void testFailoverWithFailingBKCluster() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverWithFail").toString()); BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
@ -221,8 +246,7 @@ public void testMultiplePrimariesStarted() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverMultiple").toString()); BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
BKJMUtil.addJournalManagerDefinition(conf); BKJMUtil.addJournalManagerDefinition(conf);
@ -245,7 +269,9 @@ public void testMultiplePrimariesStarted() throws Exception {
fs = cluster.getFileSystem(0); // get the older active server. fs = cluster.getFileSystem(0); // get the older active server.
try { try {
fs.delete(p1, true); System.out.println("DMS: > *************");
boolean foo = fs.delete(p1, true);
System.out.println("DMS: < ************* "+foo);
fail("Log update on older active should cause it to exit"); fail("Log update on older active should cause it to exit");
} catch (RemoteException re) { } catch (RemoteException re) {
assertTrue(re.getClassName().contains("ExitException")); assertTrue(re.getClassName().contains("ExitException"));
@ -267,9 +293,8 @@ public void testMultiplePrimariesStarted() throws Exception {
public void testInitializeBKSharedEdits() throws Exception { public void testInitializeBKSharedEdits() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = getConf();
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(); MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
@ -358,8 +383,7 @@ private void assertCanStartHANameNodes(MiniDFSCluster cluster,
public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception { public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/correctEditLogSelection").toString()); .createJournalURI("/correctEditLogSelection").toString());
BKJMUtil.addJournalManagerDefinition(conf); BKJMUtil.addJournalManagerDefinition(conf);

View File

@ -26,7 +26,7 @@
# Format is "<default threshold> (, <appender>)+ # Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only # DEFAULT: console appender only
log4j.rootLogger=OFF, CONSOLE log4j.rootLogger=DEBUG, CONSOLE
# Example with rolling log file # Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE #log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

View File

@ -272,7 +272,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush"; public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false; public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
public static final String DFS_NAMENODE_EDITS_ASYNC_LOGGING =
"dfs.namenode.edits.asynclogging";
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

View File

@ -143,6 +143,10 @@ protected void loadNamesystem(Configuration conf) throws IOException {
@Override // NameNode @Override // NameNode
protected void initialize(Configuration conf) throws IOException { protected void initialize(Configuration conf) throws IOException {
// async edit logs are incompatible with backup node due to race
// conditions resulting from laxer synchronization
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false);
// Trash is disabled in BackupNameNode, // Trash is disabled in BackupNameNode,
// but should be turned back on if it ever becomes active. // but should be turned back on if it ever becomes active.
conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,

View File

@ -79,7 +79,9 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@ -116,7 +118,7 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSEditLog implements LogsPurgeable { public class FSEditLog implements LogsPurgeable {
static final Log LOG = LogFactory.getLog(FSEditLog.class); public static final Log LOG = LogFactory.getLog(FSEditLog.class);
/** /**
* State machine for edit log. * State machine for edit log.
@ -179,17 +181,11 @@ private enum State {
private final NNStorage storage; private final NNStorage storage;
private final Configuration conf; private final Configuration conf;
private final List<URI> editsDirs; private final List<URI> editsDirs;
private final ThreadLocal<OpInstanceCache> cache = protected final OpInstanceCache cache = new OpInstanceCache();
new ThreadLocal<OpInstanceCache>() {
@Override
protected OpInstanceCache initialValue() {
return new OpInstanceCache();
}
};
/** /**
* The edit directories that are shared between primary and secondary. * The edit directories that are shared between primary and secondary.
*/ */
@ -218,6 +214,17 @@ protected synchronized TransactionId initialValue() {
} }
}; };
static FSEditLog newInstance(Configuration conf, NNStorage storage,
List<URI> editsDirs) {
boolean asyncEditLogging = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
LOG.info("Edit logging is async:" + asyncEditLogging);
return asyncEditLogging
? new FSEditLogAsync(conf, storage, editsDirs)
: new FSEditLog(conf, storage, editsDirs);
}
/** /**
* Constructor for FSEditLog. Underlying journals are constructed, but * Constructor for FSEditLog. Underlying journals are constructed, but
* no streams are opened until open() is called. * no streams are opened until open() is called.
@ -424,33 +431,35 @@ assert isOpenForWrite() :
// wait if an automatic sync is scheduled // wait if an automatic sync is scheduled
waitIfAutoSyncScheduled(); waitIfAutoSyncScheduled();
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
// check if it is time to schedule an automatic sync // check if it is time to schedule an automatic sync
needsSync = shouldForceSync(); needsSync = doEditTransaction(op);
if (needsSync) { if (needsSync) {
isAutoSyncScheduled = true; isAutoSyncScheduled = true;
} }
} }
// Sync the log if an automatic sync is required. // Sync the log if an automatic sync is required.
if (needsSync) { if (needsSync) {
logSync(); logSync();
} }
} }
synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
return shouldForceSync();
}
/** /**
* Wait if an automatic sync is scheduled * Wait if an automatic sync is scheduled
*/ */
@ -545,15 +554,10 @@ synchronized void setNextTxId(long nextTxId) {
* else more operations can start writing while this is in progress. * else more operations can start writing while this is in progress.
*/ */
void logSyncAll() { void logSyncAll() {
// Record the most recent transaction ID as our own id // Make sure we're synced up to the most recent transaction ID.
synchronized (this) { logSync(getLastWrittenTxId());
TransactionId id = myTransactionId.get();
id.txid = txid;
}
// Then make sure we're synced up to this point
logSync();
} }
/** /**
* Sync all modifications done by this thread. * Sync all modifications done by this thread.
* *
@ -583,12 +587,14 @@ void logSyncAll() {
* waitForSyncToFinish() before assuming they are running alone. * waitForSyncToFinish() before assuming they are running alone.
*/ */
public void logSync() { public void logSync() {
long syncStart = 0; // Fetch the transactionId of this thread.
logSync(myTransactionId.get().txid);
}
// Fetch the transactionId of this thread. protected void logSync(long mytxid) {
long mytxid = myTransactionId.get().txid; long syncStart = 0;
boolean sync = false; boolean sync = false;
long editsBatchedInSync = 0;
try { try {
EditLogOutputStream logStream = null; EditLogOutputStream logStream = null;
synchronized (this) { synchronized (this) {
@ -607,19 +613,17 @@ public void logSync() {
// If this transaction was already flushed, then nothing to do // If this transaction was already flushed, then nothing to do
// //
if (mytxid <= synctxid) { if (mytxid <= synctxid) {
numTransactionsBatchedInSync++;
if (metrics != null) {
// Metrics is non-null only when used inside name node
metrics.incrTransactionsBatchedInSync();
}
return; return;
} }
// now, this thread will do the sync // now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid; syncStart = txid;
isSyncRunning = true; isSyncRunning = true;
sync = true; sync = true;
// swap buffers // swap buffers
try { try {
if (journalSet.isEmpty()) { if (journalSet.isEmpty()) {
@ -668,6 +672,8 @@ public void logSync() {
if (metrics != null) { // Metrics non-null only when used inside name node if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed); metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync += editsBatchedInSync;
} }
} finally { } finally {
@ -1139,13 +1145,13 @@ void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
} }
void logStartRollingUpgrade(long startTime) { void logStartRollingUpgrade(long startTime) {
RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get()); RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get());
op.setTime(startTime); op.setTime(startTime);
logEdit(op); logEdit(op);
} }
void logFinalizeRollingUpgrade(long finalizeTime) { void logFinalizeRollingUpgrade(long finalizeTime) {
RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get()); RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get());
op.setTime(finalizeTime); op.setTime(finalizeTime);
logEdit(op); logEdit(op);
} }
@ -1280,8 +1286,9 @@ public synchronized void endCurrentLogSegment(boolean writeEndTxn) {
if (writeEndTxn) { if (writeEndTxn) {
logEdit(LogSegmentOp.getInstance(cache.get(), logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_END_LOG_SEGMENT)); FSEditLogOpCodes.OP_END_LOG_SEGMENT));
logSync();
} }
// always sync to ensure all edits are flushed.
logSyncAll();
printStatistics(true); printStatistics(true);
@ -1657,6 +1664,12 @@ private JournalManager createJournal(URI uri) {
} }
} }
@VisibleForTesting
// needed by async impl to restart thread when edit log is replaced by a
// spy because a spy is a shallow copy
public void restart() {
}
/** /**
* Return total number of syncs happened on this edit log. * Return total number of syncs happened on this edit log.
* @return long - count * @return long - count

View File

@ -0,0 +1,322 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.ExitUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
class FSEditLogAsync extends FSEditLog implements Runnable {
static final Log LOG = LogFactory.getLog(FSEditLog.class);
// use separate mutex to avoid possible deadlock when stopping the thread.
private final Object syncThreadLock = new Object();
private Thread syncThread;
private static ThreadLocal<Edit> threadEdit = new ThreadLocal<Edit>();
// requires concurrent access from caller threads and syncing thread.
private final BlockingQueue<Edit> editPendingQ =
new ArrayBlockingQueue<Edit>(4096);
// only accessed by syncing thread so no synchronization required.
// queue is unbounded because it's effectively limited by the size
// of the edit log buffer - ie. a sync will eventually be forced.
private final Deque<Edit> syncWaitQ = new ArrayDeque<Edit>();
FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
super(conf, storage, editsDirs);
// op instances cannot be shared due to queuing for background thread.
cache.disableCache();
}
private boolean isSyncThreadAlive() {
synchronized(syncThreadLock) {
return syncThread != null && syncThread.isAlive();
}
}
private void startSyncThread() {
synchronized(syncThreadLock) {
if (!isSyncThreadAlive()) {
syncThread = new Thread(this, this.getClass().getSimpleName());
syncThread.start();
}
}
}
private void stopSyncThread() {
synchronized(syncThreadLock) {
if (syncThread != null) {
try {
syncThread.interrupt();
syncThread.join();
} catch (InterruptedException e) {
// we're quitting anyway.
} finally {
syncThread = null;
}
}
}
}
@VisibleForTesting
@Override
public void restart() {
stopSyncThread();
startSyncThread();
}
@Override
void openForWrite(int layoutVersion) throws IOException {
try {
startSyncThread();
super.openForWrite(layoutVersion);
} catch (IOException ioe) {
stopSyncThread();
throw ioe;
}
}
@Override
public void close() {
super.close();
stopSyncThread();
}
@Override
void logEdit(final FSEditLogOp op) {
Edit edit = getEditInstance(op);
threadEdit.set(edit);
enqueueEdit(edit);
}
@Override
public void logSync() {
Edit edit = threadEdit.get();
if (edit != null) {
// do NOT remove to avoid expunge & rehash penalties.
threadEdit.set(null);
if (LOG.isDebugEnabled()) {
LOG.debug("logSync " + edit);
}
edit.logSyncWait();
}
}
@Override
public void logSyncAll() {
// doesn't actually log anything, just ensures that the queues are
// drained when it returns.
Edit edit = new SyncEdit(this, null){
@Override
public boolean logEdit() {
return true;
}
};
enqueueEdit(edit);
edit.logSyncWait();
}
private void enqueueEdit(Edit edit) {
if (LOG.isDebugEnabled()) {
LOG.debug("logEdit " + edit);
}
try {
if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
Preconditions.checkState(
isSyncThreadAlive(), "sync thread is not alive");
editPendingQ.put(edit);
}
} catch (Throwable t) {
// should never happen! failure to enqueue an edit is fatal
terminate(t);
}
}
private Edit dequeueEdit() throws InterruptedException {
// only block for next edit if no pending syncs.
return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll();
}
@Override
public void run() {
try {
while (true) {
boolean doSync;
Edit edit = dequeueEdit();
if (edit != null) {
// sync if requested by edit log.
doSync = edit.logEdit();
syncWaitQ.add(edit);
} else {
// sync when editq runs dry, but have edits pending a sync.
doSync = !syncWaitQ.isEmpty();
}
if (doSync) {
// normally edit log exceptions cause the NN to terminate, but tests
// relying on ExitUtil.terminate need to see the exception.
RuntimeException syncEx = null;
try {
logSync(getLastWrittenTxId());
} catch (RuntimeException ex) {
syncEx = ex;
}
while ((edit = syncWaitQ.poll()) != null) {
edit.logSyncNotify(syncEx);
}
}
}
} catch (InterruptedException ie) {
LOG.info(Thread.currentThread().getName() + " was interrupted, exiting");
} catch (Throwable t) {
terminate(t);
}
}
private void terminate(Throwable t) {
String message = "Exception while edit logging: "+t.getMessage();
LOG.fatal(message, t);
ExitUtil.terminate(1, message);
}
private Edit getEditInstance(FSEditLogOp op) {
final Edit edit;
final Server.Call rpcCall = Server.getCurCall().get();
// only rpc calls not explicitly sync'ed on the log will be async.
if (rpcCall != null && !Thread.holdsLock(this)) {
edit = new RpcEdit(this, op, rpcCall);
} else {
edit = new SyncEdit(this, op);
}
return edit;
}
private abstract static class Edit {
final FSEditLog log;
final FSEditLogOp op;
Edit(FSEditLog log, FSEditLogOp op) {
this.log = log;
this.op = op;
}
// return whether edit log wants to sync.
boolean logEdit() {
return log.doEditTransaction(op);
}
// wait for background thread to finish syncing.
abstract void logSyncWait();
// wake up the thread in logSyncWait.
abstract void logSyncNotify(RuntimeException ex);
}
// the calling thread is synchronously waiting for the edit to complete.
private static class SyncEdit extends Edit {
private final Object lock;
private boolean done = false;
private RuntimeException syncEx;
SyncEdit(FSEditLog log, FSEditLogOp op) {
super(log, op);
// if the log is already sync'ed (ex. log rolling), must wait on it to
// avoid deadlock with sync thread. the fsn lock protects against
// logging during a roll. else lock on this object to avoid sync
// contention on edit log.
lock = Thread.holdsLock(log) ? log : this;
}
@Override
public void logSyncWait() {
synchronized(lock) {
while (!done) {
try {
lock.wait(10);
} catch (InterruptedException e) {}
}
// only needed by tests that rely on ExitUtil.terminate() since
// normally exceptions terminate the NN.
if (syncEx != null) {
syncEx.fillInStackTrace();
throw syncEx;
}
}
}
@Override
public void logSyncNotify(RuntimeException ex) {
synchronized(lock) {
done = true;
syncEx = ex;
lock.notifyAll();
}
}
@Override
public String toString() {
return "["+getClass().getSimpleName()+" op:"+op+"]";
}
}
// the calling rpc thread will return immediately from logSync but the
// rpc response will not be sent until the edit is durable.
private static class RpcEdit extends Edit {
private final Server.Call call;
RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) {
super(log, op);
this.call = call;
call.postponeResponse();
}
@Override
public void logSyncWait() {
// logSync is a no-op to immediately free up rpc handlers. the
// response is sent when the sync thread calls syncNotify.
}
@Override
public void logSyncNotify(RuntimeException syncEx) {
try {
if (syncEx == null) {
call.sendResponse();
} else {
call.abortResponse(syncEx);
}
} catch (Exception e) {} // don't care if not sent.
}
@Override
public String toString() {
return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]";
}
}
}

View File

@ -147,6 +147,55 @@ public abstract class FSEditLogOp {
byte[] rpcClientId; byte[] rpcClientId;
int rpcCallId; int rpcCallId;
public static class OpInstanceCache {
private static ThreadLocal<OpInstanceCacheMap> cache =
new ThreadLocal<OpInstanceCacheMap>() {
@Override
protected OpInstanceCacheMap initialValue() {
return new OpInstanceCacheMap();
}
};
@SuppressWarnings("serial")
static final class OpInstanceCacheMap extends
EnumMap<FSEditLogOpCodes, FSEditLogOp> {
OpInstanceCacheMap() {
super(FSEditLogOpCodes.class);
for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
put(opCode, newInstance(opCode));
}
}
}
private boolean useCache = true;
void disableCache() {
useCache = false;
}
public OpInstanceCache get() {
return this;
}
@SuppressWarnings("unchecked")
public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) {
return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode);
}
private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) {
FSEditLogOp instance = null;
Class<? extends FSEditLogOp> clazz = opCode.getOpClass();
if (clazz != null) {
try {
instance = clazz.newInstance();
} catch (Exception ex) {
throw new RuntimeException("Failed to instantiate "+opCode, ex);
}
}
return instance;
}
}
final void reset() { final void reset() {
txid = HdfsServerConstants.INVALID_TXID; txid = HdfsServerConstants.INVALID_TXID;
rpcClientId = RpcConstants.DUMMY_CLIENT_ID; rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
@ -156,72 +205,6 @@ final void reset() {
abstract void resetSubFields(); abstract void resetSubFields();
final public static class OpInstanceCache {
private final EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
public OpInstanceCache() {
inst.put(OP_ADD, new AddOp());
inst.put(OP_CLOSE, new CloseOp());
inst.put(OP_SET_REPLICATION, new SetReplicationOp());
inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
inst.put(OP_RENAME_OLD, new RenameOldOp());
inst.put(OP_DELETE, new DeleteOp());
inst.put(OP_MKDIR, new MkdirOp());
inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
inst.put(OP_SET_OWNER, new SetOwnerOp());
inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
inst.put(OP_SET_QUOTA, new SetQuotaOp());
inst.put(OP_TIMES, new TimesOp());
inst.put(OP_SYMLINK, new SymlinkOp());
inst.put(OP_RENAME, new RenameOp());
inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp());
inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
inst.put(OP_TRUNCATE, new TruncateOp());
inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
inst.put(OP_ADD_BLOCK, new AddBlockOp());
inst.put(OP_ADD_CACHE_DIRECTIVE,
new AddCacheDirectiveInfoOp());
inst.put(OP_MODIFY_CACHE_DIRECTIVE,
new ModifyCacheDirectiveInfoOp());
inst.put(OP_REMOVE_CACHE_DIRECTIVE,
new RemoveCacheDirectiveInfoOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
inst.put(OP_SET_ACL, new SetAclOp());
inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
OP_ROLLING_UPGRADE_START, "start"));
inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
inst.put(OP_SET_XATTR, new SetXAttrOp());
inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
inst.put(OP_APPEND, new AppendOp());
inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
return inst.get(opcode);
}
}
private static ImmutableMap<String, FsAction> fsActionMap() { private static ImmutableMap<String, FsAction> fsActionMap() {
ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder(); ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
for (FsAction v : FsAction.values()) for (FsAction v : FsAction.values())
@ -776,7 +759,7 @@ void fromXml(Stanza st) throws InvalidXmlException {
* {@link ClientProtocol#append} * {@link ClientProtocol#append}
*/ */
static class AddOp extends AddCloseOp { static class AddOp extends AddCloseOp {
private AddOp() { AddOp() {
super(OP_ADD); super(OP_ADD);
} }
@ -804,7 +787,7 @@ public String toString() {
* finally log an AddOp. * finally log an AddOp.
*/ */
static class CloseOp extends AddCloseOp { static class CloseOp extends AddCloseOp {
private CloseOp() { CloseOp() {
super(OP_CLOSE); super(OP_CLOSE);
} }
@ -832,7 +815,7 @@ static class AppendOp extends FSEditLogOp {
String clientMachine; String clientMachine;
boolean newBlock; boolean newBlock;
private AppendOp() { AppendOp() {
super(OP_APPEND); super(OP_APPEND);
} }
@ -922,7 +905,7 @@ static class AddBlockOp extends FSEditLogOp {
private Block penultimateBlock; private Block penultimateBlock;
private Block lastBlock; private Block lastBlock;
private AddBlockOp() { AddBlockOp() {
super(OP_ADD_BLOCK); super(OP_ADD_BLOCK);
} }
@ -1034,7 +1017,7 @@ static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
String path; String path;
Block[] blocks; Block[] blocks;
private UpdateBlocksOp() { UpdateBlocksOp() {
super(OP_UPDATE_BLOCKS); super(OP_UPDATE_BLOCKS);
} }
@ -1127,7 +1110,7 @@ static class SetReplicationOp extends FSEditLogOp {
String path; String path;
short replication; short replication;
private SetReplicationOp() { SetReplicationOp() {
super(OP_SET_REPLICATION); super(OP_SET_REPLICATION);
} }
@ -1206,7 +1189,7 @@ static class ConcatDeleteOp extends FSEditLogOp {
long timestamp; long timestamp;
final static public int MAX_CONCAT_SRC = 1024 * 1024; final static public int MAX_CONCAT_SRC = 1024 * 1024;
private ConcatDeleteOp() { ConcatDeleteOp() {
super(OP_CONCAT_DELETE); super(OP_CONCAT_DELETE);
} }
@ -1364,7 +1347,7 @@ static class RenameOldOp extends FSEditLogOp {
String dst; String dst;
long timestamp; long timestamp;
private RenameOldOp() { RenameOldOp() {
super(OP_RENAME_OLD); super(OP_RENAME_OLD);
} }
@ -1476,7 +1459,7 @@ static class DeleteOp extends FSEditLogOp {
String path; String path;
long timestamp; long timestamp;
private DeleteOp() { DeleteOp() {
super(OP_DELETE); super(OP_DELETE);
} }
@ -1577,7 +1560,7 @@ static class MkdirOp extends FSEditLogOp {
List<AclEntry> aclEntries; List<AclEntry> aclEntries;
List<XAttr> xAttrs; List<XAttr> xAttrs;
private MkdirOp() { MkdirOp() {
super(OP_MKDIR); super(OP_MKDIR);
} }
@ -1750,7 +1733,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class SetGenstampV1Op extends FSEditLogOp { static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1; long genStampV1;
private SetGenstampV1Op() { SetGenstampV1Op() {
super(OP_SET_GENSTAMP_V1); super(OP_SET_GENSTAMP_V1);
} }
@ -1808,7 +1791,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class SetGenstampV2Op extends FSEditLogOp { static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2; long genStampV2;
private SetGenstampV2Op() { SetGenstampV2Op() {
super(OP_SET_GENSTAMP_V2); super(OP_SET_GENSTAMP_V2);
} }
@ -1866,7 +1849,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class AllocateBlockIdOp extends FSEditLogOp { static class AllocateBlockIdOp extends FSEditLogOp {
long blockId; long blockId;
private AllocateBlockIdOp() { AllocateBlockIdOp() {
super(OP_ALLOCATE_BLOCK_ID); super(OP_ALLOCATE_BLOCK_ID);
} }
@ -1925,7 +1908,7 @@ static class SetPermissionsOp extends FSEditLogOp {
String src; String src;
FsPermission permissions; FsPermission permissions;
private SetPermissionsOp() { SetPermissionsOp() {
super(OP_SET_PERMISSIONS); super(OP_SET_PERMISSIONS);
} }
@ -1998,7 +1981,7 @@ static class SetOwnerOp extends FSEditLogOp {
String username; String username;
String groupname; String groupname;
private SetOwnerOp() { SetOwnerOp() {
super(OP_SET_OWNER); super(OP_SET_OWNER);
} }
@ -2085,7 +2068,7 @@ static class SetNSQuotaOp extends FSEditLogOp {
String src; String src;
long nsQuota; long nsQuota;
private SetNSQuotaOp() { SetNSQuotaOp() {
super(OP_SET_NS_QUOTA); super(OP_SET_NS_QUOTA);
} }
@ -2143,7 +2126,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class ClearNSQuotaOp extends FSEditLogOp { static class ClearNSQuotaOp extends FSEditLogOp {
String src; String src;
private ClearNSQuotaOp() { ClearNSQuotaOp() {
super(OP_CLEAR_NS_QUOTA); super(OP_CLEAR_NS_QUOTA);
} }
@ -2197,7 +2180,7 @@ static class SetQuotaOp extends FSEditLogOp {
long nsQuota; long nsQuota;
long dsQuota; long dsQuota;
private SetQuotaOp() { SetQuotaOp() {
super(OP_SET_QUOTA); super(OP_SET_QUOTA);
} }
@ -2282,7 +2265,7 @@ static class SetQuotaByStorageTypeOp extends FSEditLogOp {
long dsQuota; long dsQuota;
StorageType type; StorageType type;
private SetQuotaByStorageTypeOp() { SetQuotaByStorageTypeOp() {
super(OP_SET_QUOTA_BY_STORAGETYPE); super(OP_SET_QUOTA_BY_STORAGETYPE);
} }
@ -2365,7 +2348,7 @@ static class TimesOp extends FSEditLogOp {
long mtime; long mtime;
long atime; long atime;
private TimesOp() { TimesOp() {
super(OP_TIMES); super(OP_TIMES);
} }
@ -2474,7 +2457,7 @@ static class SymlinkOp extends FSEditLogOp {
long atime; long atime;
PermissionStatus permissionStatus; PermissionStatus permissionStatus;
private SymlinkOp() { SymlinkOp() {
super(OP_SYMLINK); super(OP_SYMLINK);
} }
@ -2633,7 +2616,7 @@ static class RenameOp extends FSEditLogOp {
long timestamp; long timestamp;
Rename[] options; Rename[] options;
private RenameOp() { RenameOp() {
super(OP_RENAME); super(OP_RENAME);
} }
@ -2798,7 +2781,7 @@ static class TruncateOp extends FSEditLogOp {
long timestamp; long timestamp;
Block truncateBlock; Block truncateBlock;
private TruncateOp() { TruncateOp() {
super(OP_TRUNCATE); super(OP_TRUNCATE);
} }
@ -2931,7 +2914,7 @@ static class ReassignLeaseOp extends FSEditLogOp {
String path; String path;
String newHolder; String newHolder;
private ReassignLeaseOp() { ReassignLeaseOp() {
super(OP_REASSIGN_LEASE); super(OP_REASSIGN_LEASE);
} }
@ -3013,7 +2996,7 @@ static class GetDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token; DelegationTokenIdentifier token;
long expiryTime; long expiryTime;
private GetDelegationTokenOp() { GetDelegationTokenOp() {
super(OP_GET_DELEGATION_TOKEN); super(OP_GET_DELEGATION_TOKEN);
} }
@ -3092,7 +3075,7 @@ static class RenewDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token; DelegationTokenIdentifier token;
long expiryTime; long expiryTime;
private RenewDelegationTokenOp() { RenewDelegationTokenOp() {
super(OP_RENEW_DELEGATION_TOKEN); super(OP_RENEW_DELEGATION_TOKEN);
} }
@ -3170,7 +3153,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class CancelDelegationTokenOp extends FSEditLogOp { static class CancelDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token; DelegationTokenIdentifier token;
private CancelDelegationTokenOp() { CancelDelegationTokenOp() {
super(OP_CANCEL_DELEGATION_TOKEN); super(OP_CANCEL_DELEGATION_TOKEN);
} }
@ -3229,7 +3212,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
static class UpdateMasterKeyOp extends FSEditLogOp { static class UpdateMasterKeyOp extends FSEditLogOp {
DelegationKey key; DelegationKey key;
private UpdateMasterKeyOp() { UpdateMasterKeyOp() {
super(OP_UPDATE_MASTER_KEY); super(OP_UPDATE_MASTER_KEY);
} }
@ -3334,8 +3317,20 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
} }
} }
static class StartLogSegmentOp extends LogSegmentOp {
StartLogSegmentOp() {
super(OP_START_LOG_SEGMENT);
}
}
static class EndLogSegmentOp extends LogSegmentOp {
EndLogSegmentOp() {
super(OP_END_LOG_SEGMENT);
}
}
static class InvalidOp extends FSEditLogOp { static class InvalidOp extends FSEditLogOp {
private InvalidOp() { InvalidOp() {
super(OP_INVALID); super(OP_INVALID);
} }
@ -4146,7 +4141,7 @@ static class RemoveXAttrOp extends FSEditLogOp {
List<XAttr> xAttrs; List<XAttr> xAttrs;
String src; String src;
private RemoveXAttrOp() { RemoveXAttrOp() {
super(OP_REMOVE_XATTR); super(OP_REMOVE_XATTR);
} }
@ -4199,7 +4194,7 @@ static class SetXAttrOp extends FSEditLogOp {
List<XAttr> xAttrs; List<XAttr> xAttrs;
String src; String src;
private SetXAttrOp() { SetXAttrOp() {
super(OP_SET_XATTR); super(OP_SET_XATTR);
} }
@ -4252,7 +4247,7 @@ static class SetAclOp extends FSEditLogOp {
List<AclEntry> aclEntries = Lists.newArrayList(); List<AclEntry> aclEntries = Lists.newArrayList();
String src; String src;
private SetAclOp() { SetAclOp() {
super(OP_SET_ACL); super(OP_SET_ACL);
} }
@ -4349,7 +4344,7 @@ public void readFields(DataInput in) throws IOException {
/** /**
* Operation corresponding to upgrade * Operation corresponding to upgrade
*/ */
static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
private final String name; private final String name;
private long time; private long time;
@ -4416,7 +4411,7 @@ static class SetStoragePolicyOp extends FSEditLogOp {
String path; String path;
byte policyId; byte policyId;
private SetStoragePolicyOp() { SetStoragePolicyOp() {
super(OP_SET_STORAGE_POLICY); super(OP_SET_STORAGE_POLICY);
} }
@ -4482,6 +4477,26 @@ void fromXml(Stanza st) throws InvalidXmlException {
} }
} }
static class RollingUpgradeStartOp extends RollingUpgradeOp {
RollingUpgradeStartOp() {
super(OP_ROLLING_UPGRADE_START, "start");
}
static RollingUpgradeStartOp getInstance(OpInstanceCache cache) {
return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START);
}
}
static class RollingUpgradeFinalizeOp extends RollingUpgradeOp {
RollingUpgradeFinalizeOp() {
super(OP_ROLLING_UPGRADE_FINALIZE, "finalize");
}
static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
}
}
/** /**
* Class for writing editlog ops * Class for writing editlog ops
*/ */

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/** /**
* Op codes for edits file * Op codes for edits file
@ -27,60 +28,64 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public enum FSEditLogOpCodes { public enum FSEditLogOpCodes {
// last op code in file // last op code in file
OP_ADD ((byte) 0), OP_ADD ((byte) 0, AddOp.class),
OP_RENAME_OLD ((byte) 1), // deprecated operation // deprecated operation
OP_DELETE ((byte) 2), OP_RENAME_OLD ((byte) 1, RenameOldOp.class),
OP_MKDIR ((byte) 3), OP_DELETE ((byte) 2, DeleteOp.class),
OP_SET_REPLICATION ((byte) 4), OP_MKDIR ((byte) 3, MkdirOp.class),
OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class),
@Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete @Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete
@Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete @Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete
OP_SET_PERMISSIONS ((byte) 7), OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class),
OP_SET_OWNER ((byte) 8), OP_SET_OWNER ((byte) 8, SetOwnerOp.class),
OP_CLOSE ((byte) 9), OP_CLOSE ((byte) 9, CloseOp.class),
OP_SET_GENSTAMP_V1 ((byte) 10), OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class),
OP_SET_NS_QUOTA ((byte) 11), // obsolete OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete
OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete
OP_TIMES ((byte) 13), // set atime, mtime OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime
OP_SET_QUOTA ((byte) 14), OP_SET_QUOTA ((byte) 14, SetQuotaOp.class),
OP_RENAME ((byte) 15), // filecontext rename // filecontext rename
OP_CONCAT_DELETE ((byte) 16), // concat files OP_RENAME ((byte) 15, RenameOp.class),
OP_SYMLINK ((byte) 17), // concat files
OP_GET_DELEGATION_TOKEN ((byte) 18), OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class),
OP_RENEW_DELEGATION_TOKEN ((byte) 19), OP_SYMLINK ((byte) 17, SymlinkOp.class),
OP_CANCEL_DELEGATION_TOKEN ((byte) 20), OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class),
OP_UPDATE_MASTER_KEY ((byte) 21), OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class),
OP_REASSIGN_LEASE ((byte) 22), OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class),
OP_END_LOG_SEGMENT ((byte) 23), OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class),
OP_START_LOG_SEGMENT ((byte) 24), OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class),
OP_UPDATE_BLOCKS ((byte) 25), OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class),
OP_CREATE_SNAPSHOT ((byte) 26), OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class),
OP_DELETE_SNAPSHOT ((byte) 27), OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class),
OP_RENAME_SNAPSHOT ((byte) 28), OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class),
OP_ALLOW_SNAPSHOT ((byte) 29), OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class),
OP_DISALLOW_SNAPSHOT ((byte) 30), OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class),
OP_SET_GENSTAMP_V2 ((byte) 31), OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class),
OP_ALLOCATE_BLOCK_ID ((byte) 32), OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class),
OP_ADD_BLOCK ((byte) 33), OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class),
OP_ADD_CACHE_DIRECTIVE ((byte) 34), OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class),
OP_REMOVE_CACHE_DIRECTIVE ((byte) 35), OP_ADD_BLOCK ((byte) 33, AddBlockOp.class),
OP_ADD_CACHE_POOL ((byte) 36), OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class),
OP_MODIFY_CACHE_POOL ((byte) 37), OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class),
OP_REMOVE_CACHE_POOL ((byte) 38), OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class),
OP_MODIFY_CACHE_DIRECTIVE ((byte) 39), OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class),
OP_SET_ACL ((byte) 40), OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class),
OP_ROLLING_UPGRADE_START ((byte) 41), OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class),
OP_ROLLING_UPGRADE_FINALIZE ((byte) 42), OP_SET_ACL ((byte) 40, SetAclOp.class),
OP_SET_XATTR ((byte) 43), OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class),
OP_REMOVE_XATTR ((byte) 44), OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class),
OP_SET_STORAGE_POLICY ((byte) 45), OP_SET_XATTR ((byte) 43, SetXAttrOp.class),
OP_TRUNCATE ((byte) 46), OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class),
OP_APPEND ((byte) 47), OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class),
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48), OP_TRUNCATE ((byte) 46, TruncateOp.class),
OP_APPEND ((byte) 47, AppendOp.class),
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),
// Note that the current range of the valid OP code is 0~127 // Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1); OP_INVALID ((byte) -1);
private final byte opCode; private final byte opCode;
private final Class<? extends FSEditLogOp> opClass;
/** /**
* Constructor * Constructor
@ -88,7 +93,12 @@ public enum FSEditLogOpCodes {
* @param opCode byte value of constructed enum * @param opCode byte value of constructed enum
*/ */
FSEditLogOpCodes(byte opCode) { FSEditLogOpCodes(byte opCode) {
this(opCode, null);
}
FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {
this.opCode = opCode; this.opCode = opCode;
this.opClass = opClass;
} }
/** /**
@ -100,6 +110,10 @@ public byte getOpCode() {
return opCode; return opCode;
} }
public Class<? extends FSEditLogOp> getOpClass() {
return opClass;
}
private static final FSEditLogOpCodes[] VALUES; private static final FSEditLogOpCodes[] VALUES;
static { static {

View File

@ -140,7 +140,7 @@ protected FSImage(Configuration conf,
storage.setRestoreFailedStorage(true); storage.setRestoreFailedStorage(true);
} }
this.editLog = new FSEditLog(conf, storage, editsDirs); this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog); archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
} }

View File

@ -1265,7 +1265,6 @@ private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
newSharedEditLog.logEdit(op); newSharedEditLog.logEdit(op);
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) { if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false); newSharedEditLog.endCurrentLogSegment(false);
LOG.debug("ending log segment because of END_LOG_SEGMENT op in " LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
+ stream); + stream);

View File

@ -283,8 +283,8 @@ public void addTransaction(long latency) {
transactions.add(latency); transactions.add(latency);
} }
public void incrTransactionsBatchedInSync() { public void incrTransactionsBatchedInSync(long count) {
transactionsBatchedInSync.incr(); transactionsBatchedInSync.incr(count);
} }
public void addSync(long elapsed) { public void addSync(long elapsed) {

View File

@ -263,6 +263,9 @@ public static void setFakeHttpAddresses(Configuration conf,
} }
public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) { public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
// spies are shallow copies, must allow async log to restart its thread
// so it has the new copy
newLog.restart();
Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog); Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog); Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
} }

View File

@ -71,17 +71,21 @@
public class TestAuditLogs { public class TestAuditLogs {
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log"; static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
final boolean useAsyncLog; final boolean useAsyncLog;
final boolean useAsyncEdits;
@Parameters @Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>(); Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{new Boolean(false)}); params.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
params.add(new Object[]{new Boolean(true)}); params.add(new Object[]{Boolean.TRUE, Boolean.FALSE});
params.add(new Object[]{Boolean.FALSE, Boolean.TRUE});
params.add(new Object[]{Boolean.TRUE, Boolean.TRUE});
return params; return params;
} }
public TestAuditLogs(boolean useAsyncLog) { public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
this.useAsyncLog = useAsyncLog; this.useAsyncLog = useAsyncLog;
this.useAsyncEdits = useAsyncEdits;
} }
// Pattern for: // Pattern for:
@ -119,6 +123,7 @@ public void setupCluster() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits);
util = new DFSTestUtil.Builder().setName("TestAuditAllowed"). util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
setNumFiles(20).build(); setNumFiles(20).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();

View File

@ -88,6 +88,9 @@
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.xml.sax.ContentHandler; import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
@ -98,12 +101,33 @@
/** /**
* This class tests the creation and validation of a checkpoint. * This class tests the creation and validation of a checkpoint.
*/ */
@RunWith(Parameterized.class)
public class TestEditLog { public class TestEditLog {
static { static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
} }
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestEditLog(Boolean async) {
useAsyncEditLog = async;
}
public static Configuration getConf() {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
return conf;
}
/** /**
* A garbage mkdir op which is used for testing * A garbage mkdir op which is used for testing
* {@link EditLogFileInputStream#scanEditLog(File)} * {@link EditLogFileInputStream#scanEditLog(File)}
@ -225,11 +249,12 @@ public void run() {
* @param storage Storage object used by namenode * @param storage Storage object used by namenode
*/ */
private static FSEditLog getFSEditLog(NNStorage storage) throws IOException { private static FSEditLog getFSEditLog(NNStorage storage) throws IOException {
Configuration conf = new Configuration(); Configuration conf = getConf();
// Make sure the edits dirs are set in the provided configuration object. // Make sure the edits dirs are set in the provided configuration object.
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
StringUtils.join(",", storage.getEditsDirectories())); StringUtils.join(",", storage.getEditsDirectories()));
FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf)); FSEditLog log = FSEditLog.newInstance(
conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
return log; return log;
} }
@ -252,7 +277,7 @@ public void testPreTxIdEditLogNoEdits() throws Exception {
*/ */
@Test @Test
public void testPreTxidEditLogWithEdits() throws Exception { public void testPreTxidEditLogWithEdits() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
@ -282,7 +307,7 @@ private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
@Test @Test
public void testSimpleEditLog() throws IOException { public void testSimpleEditLog() throws IOException {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
try { try {
@ -351,7 +376,7 @@ private void assertExistsInStorageDirs(MiniDFSCluster cluster,
private void testEditLog(int initialSize) throws IOException { private void testEditLog(int initialSize) throws IOException {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
@ -482,8 +507,12 @@ public Void call() throws Exception {
@Test @Test
public void testSyncBatching() throws Exception { public void testSyncBatching() throws Exception {
// start a cluster if (useAsyncEditLog) {
Configuration conf = new HdfsConfiguration(); // semantics are completely differently since edits will be auto-synced
return;
}
// start a cluster
Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
ExecutorService threadA = Executors.newSingleThreadExecutor(); ExecutorService threadA = Executors.newSingleThreadExecutor();
@ -546,7 +575,7 @@ public void testSyncBatching() throws Exception {
@Test @Test
public void testBatchedSyncWithClosedLogs() throws Exception { public void testBatchedSyncWithClosedLogs() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
ExecutorService threadA = Executors.newSingleThreadExecutor(); ExecutorService threadA = Executors.newSingleThreadExecutor();
@ -586,7 +615,7 @@ public void testBatchedSyncWithClosedLogs() throws Exception {
@Test @Test
public void testEditChecksum() throws Exception { public void testEditChecksum() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
@ -658,7 +687,7 @@ public void testCrashRecoveryWithTransactions() throws Exception {
*/ */
private void testCrashRecovery(int numTransactions) throws Exception { private void testCrashRecovery(int numTransactions) throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
CHECKPOINT_ON_STARTUP_MIN_TXNS); CHECKPOINT_ON_STARTUP_MIN_TXNS);
@ -803,7 +832,7 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs,
boolean updateTransactionIdFile, boolean shouldSucceed) boolean updateTransactionIdFile, boolean shouldSucceed)
throws Exception { throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATA_NODES).build(); .numDataNodes(NUM_DATA_NODES).build();
@ -1134,7 +1163,7 @@ static class AbortSpec {
public static NNStorage setupEdits(List<URI> editUris, int numrolls, public static NNStorage setupEdits(List<URI> editUris, int numrolls,
boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException { boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException {
List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls)); List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
NNStorage storage = new NNStorage(new Configuration(), NNStorage storage = new NNStorage(getConf(),
Collections.<URI>emptyList(), Collections.<URI>emptyList(),
editUris); editUris);
storage.format(new NamespaceInfo()); storage.format(new NamespaceInfo());
@ -1296,7 +1325,7 @@ static void validateNoCrash(byte garbage[]) throws IOException {
EditLogFileOutputStream elfos = null; EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null; EditLogFileInputStream elfis = null;
try { try {
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0); elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elfos.writeRaw(garbage, 0, garbage.length); elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush(); elfos.setReadyToFlush();
@ -1472,7 +1501,7 @@ public boolean accept(File dir, String name) {
public void testManyEditLogSegments() throws IOException { public void testManyEditLogSegments() throws IOException {
final int NUM_EDIT_LOG_ROLLS = 1000; final int NUM_EDIT_LOG_ROLLS = 1000;
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
try { try {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.net.BindException; import java.net.BindException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random; import java.util.Random;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
@ -30,18 +32,40 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@RunWith(Parameterized.class)
public class TestEditLogAutoroll { public class TestEditLogAutoroll {
static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
}
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestEditLogAutoroll(Boolean async) {
useAsyncEditLog = async;
}
private Configuration conf; private Configuration conf;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
@ -61,6 +85,8 @@ public void setUp() throws Exception {
// Make it autoroll after 10 edits // Make it autoroll after 10 edits
conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f); conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f);
conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100); conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
int retryCount = 0; int retryCount = 0;
while (true) { while (true) {

View File

@ -21,12 +21,13 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,13 +44,37 @@
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito; import org.mockito.Mockito;
@RunWith(Parameterized.class)
public class TestEditLogJournalFailures { public class TestEditLogJournalFailures {
private int editsPerformed = 0; private int editsPerformed = 0;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private FileSystem fs; private FileSystem fs;
private boolean useAsyncEdits;
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{Boolean.FALSE});
params.add(new Object[]{Boolean.TRUE});
return params;
}
public TestEditLogJournalFailures(boolean useAsyncEdits) {
this.useAsyncEdits = useAsyncEdits;
}
private Configuration getConf() {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEdits);
return conf;
}
/** /**
* Create the mini cluster for testing and sub in a custom runtime so that * Create the mini cluster for testing and sub in a custom runtime so that
@ -57,9 +82,9 @@ public class TestEditLogJournalFailures {
*/ */
@Before @Before
public void setUpMiniCluster() throws IOException { public void setUpMiniCluster() throws IOException {
setUpMiniCluster(new HdfsConfiguration(), true); setUpMiniCluster(getConf(), true);
} }
public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs) public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs)
throws IOException { throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
@ -153,7 +178,7 @@ public void testSingleRequiredFailedEditsDirOnSetReadyToFlush()
String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings( String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings(
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
shutDownMiniCluster(); shutDownMiniCluster();
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
@ -193,7 +218,7 @@ public void testMultipleRedundantFailedEditsDirOnSetReadyToFlush()
throws IOException { throws IOException {
// Set up 4 name/edits dirs. // Set up 4 name/edits dirs.
shutDownMiniCluster(); shutDownMiniCluster();
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
String[] nameDirs = new String[4]; String[] nameDirs = new String[4];
for (int i = 0; i < nameDirs.length; i++) { for (int i = 0; i < nameDirs.length; i++) {
File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i); File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i);

View File

@ -26,14 +26,17 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -46,10 +49,14 @@
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -57,15 +64,27 @@
* This class tests various synchronization bugs in FSEditLog rolling * This class tests various synchronization bugs in FSEditLog rolling
* and namespace saving. * and namespace saving.
*/ */
@RunWith(Parameterized.class)
public class TestEditLogRace { public class TestEditLogRace {
static { static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
} }
private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); @Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ false });
params.add(new Object[]{ true });
return params;
}
private static final String NAME_DIR = private static boolean useAsyncEditLog;
MiniDFSCluster.getBaseDirectory() + "name1";
public TestEditLogRace(boolean useAsyncEditLog) {
TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
}
private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
// This test creates NUM_THREADS threads and each thread continuously writes // This test creates NUM_THREADS threads and each thread continuously writes
// transactions // transactions
@ -94,21 +113,29 @@ public class TestEditLogRace {
* This value needs to be significantly longer than the average * This value needs to be significantly longer than the average
* time for an fsync() or enterSafeMode(). * time for an fsync() or enterSafeMode().
*/ */
private static final int BLOCK_TIME = 10; private static final int BLOCK_TIME = 4; // 4 sec pretty generous
// //
// an object that does a bunch of transactions // an object that does a bunch of transactions
// //
static class Transactions implements Runnable { static class Transactions implements Runnable {
final NamenodeProtocols nn; final NamenodeProtocols nn;
final MiniDFSCluster cluster;
FileSystem fs;
short replication = 3; short replication = 3;
long blockSize = 64; long blockSize = 64;
volatile boolean stopped = false; volatile boolean stopped = false;
volatile Thread thr; volatile Thread thr;
final AtomicReference<Throwable> caught; final AtomicReference<Throwable> caught;
Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) { Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) {
nn = ns; this.cluster = cluster;
this.nn = cluster.getNameNodeRpc();
try {
this.fs = cluster.getFileSystem();
} catch (IOException e) {
caught.set(e);
}
this.caught = caught; this.caught = caught;
} }
@ -122,11 +149,23 @@ public void run() {
while (!stopped) { while (!stopped) {
try { try {
String dirname = "/thr-" + thr.getId() + "-dir-" + i; String dirname = "/thr-" + thr.getId() + "-dir-" + i;
nn.mkdirs(dirname, p, true); if (i % 2 == 0) {
nn.delete(dirname, true); Path dirnamePath = new Path(dirname);
fs.mkdirs(dirnamePath);
fs.delete(dirnamePath, true);
} else {
nn.mkdirs(dirname, p, true);
nn.delete(dirname, true);
}
} catch (SafeModeException sme) { } catch (SafeModeException sme) {
// This is OK - the tests will bring NN in and out of safemode // This is OK - the tests will bring NN in and out of safemode
} catch (Throwable e) { } catch (Throwable e) {
// This is OK - the tests will bring NN in and out of safemode
if (e instanceof RemoteException &&
((RemoteException)e).getClassName()
.contains("SafeModeException")) {
return;
}
LOG.warn("Got error in transaction thread", e); LOG.warn("Got error in transaction thread", e);
caught.compareAndSet(null, e); caught.compareAndSet(null, e);
break; break;
@ -144,11 +183,11 @@ public Thread getThread() {
} }
} }
private void startTransactionWorkers(NamenodeProtocols namesystem, private void startTransactionWorkers(MiniDFSCluster cluster,
AtomicReference<Throwable> caughtErr) { AtomicReference<Throwable> caughtErr) {
// Create threads and make them run transactions concurrently. // Create threads and make them run transactions concurrently.
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
Transactions trans = new Transactions(namesystem, caughtErr); Transactions trans = new Transactions(cluster, caughtErr);
new Thread(trans, "TransactionThread-" + i).start(); new Thread(trans, "TransactionThread-" + i).start();
workers.add(trans); workers.add(trans);
} }
@ -174,21 +213,21 @@ private void stopTransactionWorkers() {
@Test @Test
public void testEditLogRolling() throws Exception { public void testEditLogRolling() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
FileSystem fileSys = null; FileSystem fileSys = null;
AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>(); AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
cluster.waitActive(); cluster.waitActive();
fileSys = cluster.getFileSystem(); fileSys = cluster.getFileSystem();
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer(); final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
FSImage fsimage = cluster.getNamesystem().getFSImage(); FSImage fsimage = cluster.getNamesystem().getFSImage();
StorageDirectory sd = fsimage.getStorage().getStorageDir(0); StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
startTransactionWorkers(nn, caughtErr); startTransactionWorkers(cluster, caughtErr);
long previousLogTxId = 1; long previousLogTxId = 1;
@ -256,7 +295,7 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage,
@Test @Test
public void testSaveNamespace() throws Exception { public void testSaveNamespace() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
@ -266,12 +305,11 @@ public void testSaveNamespace() throws Exception {
cluster.waitActive(); cluster.waitActive();
fileSys = cluster.getFileSystem(); fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem(); final FSNamesystem namesystem = cluster.getNamesystem();
final NamenodeProtocols nn = cluster.getNameNodeRpc();
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = fsimage.getEditLog(); FSEditLog editLog = fsimage.getEditLog();
startTransactionWorkers(nn, caughtErr); startTransactionWorkers(cluster, caughtErr);
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) { for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
try { try {
@ -321,11 +359,13 @@ public void testSaveNamespace() throws Exception {
private Configuration getConf() { private Configuration getConf() {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
return conf; return conf;
} }
@ -389,7 +429,7 @@ public void run() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Flush called"); LOG.info("Flush called");
if (Thread.currentThread() == doAnEditThread) { if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) {
LOG.info("edit thread: Telling main thread we made it to flush section..."); LOG.info("edit thread: Telling main thread we made it to flush section...");
// Signal to main thread that the edit thread is in the racy section // Signal to main thread that the edit thread is in the racy section
waitToEnterFlush.countDown(); waitToEnterFlush.countDown();
@ -457,62 +497,52 @@ public void testSaveRightBeforeSync() throws Exception {
try { try {
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = spy(fsimage.getEditLog()); final FSEditLog editLog = fsimage.getEditLog();
DFSTestUtil.setEditLogForTesting(namesystem, editLog);
final AtomicReference<Throwable> deferredException = final AtomicReference<Throwable> deferredException =
new AtomicReference<Throwable>(); new AtomicReference<Throwable>();
final CountDownLatch waitToEnterSync = new CountDownLatch(1); final CountDownLatch sleepingBeforeSync = new CountDownLatch(1);
final Thread doAnEditThread = new Thread() { final Thread doAnEditThread = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
LOG.info("Starting mkdirs"); LOG.info("Starting setOwner");
namesystem.mkdirs("/test", namesystem.writeLock();
new PermissionStatus("test","test", new FsPermission((short)00755)), try {
true); editLog.logSetOwner("/","test","test");
LOG.info("mkdirs complete"); } finally {
namesystem.writeUnlock();
}
sleepingBeforeSync.countDown();
LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
Thread.sleep(BLOCK_TIME*1000);
editLog.logSync();
LOG.info("edit thread: logSync complete");
} catch (Throwable ioe) { } catch (Throwable ioe) {
LOG.fatal("Got exception", ioe); LOG.fatal("Got exception", ioe);
deferredException.set(ioe); deferredException.set(ioe);
waitToEnterSync.countDown(); sleepingBeforeSync.countDown();
} }
} }
}; };
doAnEditThread.setDaemon(true);
Answer<Void> blockingSync = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
LOG.info("logSync called");
if (Thread.currentThread() == doAnEditThread) {
LOG.info("edit thread: Telling main thread we made it just before logSync...");
waitToEnterSync.countDown();
LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
Thread.sleep(BLOCK_TIME*1000);
LOG.info("Going through to logSync. This will allow the main thread to continue.");
}
invocation.callRealMethod();
LOG.info("logSync complete");
return null;
}
};
doAnswer(blockingSync).when(editLog).logSync();
doAnEditThread.start(); doAnEditThread.start();
LOG.info("Main thread: waiting to just before logSync..."); LOG.info("Main thread: waiting to just before logSync...");
waitToEnterSync.await(); sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS);
assertNull(deferredException.get()); assertNull(deferredException.get());
LOG.info("Main thread: detected that logSync about to be called."); LOG.info("Main thread: detected that logSync about to be called.");
LOG.info("Trying to enter safe mode."); LOG.info("Trying to enter safe mode.");
LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
long st = Time.now(); long st = Time.now();
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
long et = Time.now(); long et = Time.now();
LOG.info("Entered safe mode"); LOG.info("Entered safe mode after "+(et-st)+"ms");
// Make sure we really waited for the flush to complete!
assertTrue(et - st > (BLOCK_TIME - 1)*1000); // Make sure we didn't wait for the thread that did a logEdit but
// not logSync. Going into safemode does a logSyncAll that will flush
// its edit.
assertTrue(et - st < (BLOCK_TIME/2)*1000);
// Once we're in safe mode, save namespace. // Once we're in safe mode, save namespace.
namesystem.saveNamespace(); namesystem.saveNamespace();

View File

@ -31,6 +31,8 @@
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
@ -51,25 +53,48 @@
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
@RunWith(Parameterized.class)
public class TestFSEditLogLoader { public class TestFSEditLogLoader {
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestFSEditLogLoader(Boolean async) {
useAsyncEditLog = async;
}
private static Configuration getConf() {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
return conf;
}
static { static {
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
} }
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0; private static final int NUM_DATA_NODES = 0;
@Test @Test
public void testDisplayRecentEditLogOpCodes() throws IOException { public void testDisplayRecentEditLogOpCodes() throws IOException {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
@ -119,7 +144,7 @@ public void testDisplayRecentEditLogOpCodes() throws IOException {
@Test @Test
public void testReplicationAdjusted() throws Exception { public void testReplicationAdjusted() throws Exception {
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
// Replicate and heartbeat fast to shave a few seconds off test // Replicate and heartbeat fast to shave a few seconds off test
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);

View File

@ -27,6 +27,8 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -50,13 +52,38 @@
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
/** /**
* This tests data recovery mode for the NameNode. * This tests data recovery mode for the NameNode.
*/ */
@RunWith(Parameterized.class)
public class TestNameNodeRecovery { public class TestNameNodeRecovery {
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestNameNodeRecovery(Boolean async) {
useAsyncEditLog = async;
}
private static Configuration getConf() {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
return conf;
}
private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class); private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
private static final StartupOption recoverStartOpt = StartupOption.RECOVER; private static final StartupOption recoverStartOpt = StartupOption.RECOVER;
private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class); private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class);
@ -73,7 +100,7 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
EditLogFileOutputStream elfos = null; EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null; EditLogFileInputStream elfis = null;
try { try {
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0); elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elts.addTransactionsToLog(elfos, cache); elts.addTransactionsToLog(elfos, cache);
@ -519,7 +546,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
final boolean needRecovery = corruptor.needRecovery(finalize); final boolean needRecovery = corruptor.needRecovery(finalize);
// start a cluster // start a cluster
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
setupRecoveryTestConf(conf); setupRecoveryTestConf(conf);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;

View File

@ -22,6 +22,8 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -32,6 +34,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -40,11 +43,31 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@RunWith(Parameterized.class)
public class TestEditLogTailer { public class TestEditLogTailer {
static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
}
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{ Boolean.FALSE });
params.add(new Object[]{ Boolean.TRUE });
return params;
}
private static boolean useAsyncEditLog;
public TestEditLogTailer(Boolean async) {
useAsyncEditLog = async;
}
private static final String DIR_PREFIX = "/dir"; private static final String DIR_PREFIX = "/dir";
private static final int DIRS_TO_MAKE = 20; private static final int DIRS_TO_MAKE = 20;
static final long SLEEP_TIME = 1000; static final long SLEEP_TIME = 1000;
@ -52,13 +75,21 @@ public class TestEditLogTailer {
static { static {
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL); GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL);
} }
private static Configuration getConf() {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
useAsyncEditLog);
return conf;
}
@Test @Test
public void testTailer() throws IOException, InterruptedException, public void testTailer() throws IOException, InterruptedException,
ServiceFailedException { ServiceFailedException {
Configuration conf = new HdfsConfiguration(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
@ -116,7 +147,7 @@ public void testNN1TriggersLogRolls() throws Exception {
private static void testStandbyTriggersLogRolls(int activeIndex) private static void testStandbyTriggersLogRolls(int activeIndex)
throws Exception { throws Exception {
Configuration conf = new Configuration(); Configuration conf = getConf();
// Roll every 1s // Roll every 1s
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);