From e327325a3713233dc40c91c1ae6f13af08d6104d Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 29 Feb 2016 15:34:43 -0800 Subject: [PATCH] HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp. (cherry picked from commit 2151716832ad14932dd65b1a4e47e64d8d6cd767) (cherry picked from commit 3cb7ae11a839c01b8be629774874c1873f51b747) --- .../TestBookKeeperAsHASharedDir.java | 46 ++- .../src/test/resources/log4j.properties | 2 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +- .../hdfs/server/namenode/BackupNode.java | 4 + .../hdfs/server/namenode/FSEditLog.java | 109 +++--- .../hdfs/server/namenode/FSEditLogAsync.java | 322 ++++++++++++++++++ .../hdfs/server/namenode/FSEditLogOp.java | 215 ++++++------ .../server/namenode/FSEditLogOpCodes.java | 108 +++--- .../hadoop/hdfs/server/namenode/FSImage.java | 2 +- .../hadoop/hdfs/server/namenode/NameNode.java | 1 - .../namenode/metrics/NameNodeMetrics.java | 4 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 + .../hdfs/server/namenode/TestAuditLogs.java | 15 +- .../hdfs/server/namenode/TestEditLog.java | 59 +++- .../server/namenode/TestEditLogAutoroll.java | 26 ++ .../namenode/TestEditLogJournalFailures.java | 35 +- .../hdfs/server/namenode/TestEditLogRace.java | 144 ++++---- .../server/namenode/TestFSEditLogLoader.java | 37 +- .../server/namenode/TestNameNodeRecovery.java | 31 +- .../server/namenode/ha/TestEditLogTailer.java | 39 ++- 20 files changed, 902 insertions(+), 306 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java index 5611bb88a26..ff8c00df039 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java @@ -24,6 +24,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.AfterClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; @@ -56,11 +59,14 @@ import org.apache.commons.logging.LogFactory; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; /** * Integration test to ensure that the BookKeeper JournalManager * works for HDFS Namenode HA */ +@RunWith(Parameterized.class) public class TestBookKeeperAsHASharedDir { static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class); @@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir { private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager"; + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestBookKeeperAsHASharedDir(Boolean async) { + useAsyncEditLog = async; + } + + private static Configuration getConf() { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + @BeforeClass public static void setupBookkeeper() throws Exception { bkutil = new BKJMUtil(numBookies); @@ -92,8 +119,7 @@ public class TestBookKeeperAsHASharedDir { public void testFailoverWithBK() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailover").toString()); BKJMUtil.addJournalManagerDefinition(conf); @@ -144,8 +170,7 @@ public class TestBookKeeperAsHASharedDir { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailoverWithFail").toString()); conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, @@ -221,8 +246,7 @@ public class TestBookKeeperAsHASharedDir { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailoverMultiple").toString()); BKJMUtil.addJournalManagerDefinition(conf); @@ -245,7 +269,9 @@ public class TestBookKeeperAsHASharedDir { fs = cluster.getFileSystem(0); // get the older active server. try { - fs.delete(p1, true); + System.out.println("DMS: > *************"); + boolean foo = fs.delete(p1, true); + System.out.println("DMS: < ************* "+foo); fail("Log update on older active should cause it to exit"); } catch (RemoteException re) { assertTrue(re.getClassName().contains("ExitException")); @@ -267,9 +293,8 @@ public class TestBookKeeperAsHASharedDir { public void testInitializeBKSharedEdits() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); + Configuration conf = getConf(); HAUtil.setAllowStandbyReads(conf, true); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(); cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) @@ -358,8 +383,7 @@ public class TestBookKeeperAsHASharedDir { public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil .createJournalURI("/correctEditLogSelection").toString()); BKJMUtil.addJournalManagerDefinition(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties index 93c22f71496..52aac432644 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties @@ -26,7 +26,7 @@ # Format is " (, )+ # DEFAULT: console appender only -log4j.rootLogger=OFF, CONSOLE +log4j.rootLogger=DEBUG, CONSOLE # Example with rolling log file #log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fd42222bf46..fb0854f234c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -273,7 +273,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush"; public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false; - + + public static final String DFS_NAMENODE_EDITS_ASYNC_LOGGING = + "dfs.namenode.edits.asynclogging"; + public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false; + public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 3933c660143..1d582c59d89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -143,6 +143,10 @@ public class BackupNode extends NameNode { @Override // NameNode protected void initialize(Configuration conf) throws IOException { + // async edit logs are incompatible with backup node due to race + // conditions resulting from laxer synchronization + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false); + // Trash is disabled in BackupNameNode, // but should be turned back on if it ever becomes active. conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index f8d92c8a85a..92476424211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -79,7 +79,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op; @@ -116,7 +118,7 @@ import com.google.common.collect.Lists; @InterfaceStability.Evolving public class FSEditLog implements LogsPurgeable { - static final Log LOG = LogFactory.getLog(FSEditLog.class); + public static final Log LOG = LogFactory.getLog(FSEditLog.class); /** * State machine for edit log. @@ -179,17 +181,11 @@ public class FSEditLog implements LogsPurgeable { private final NNStorage storage; private final Configuration conf; - + private final List editsDirs; - private final ThreadLocal cache = - new ThreadLocal() { - @Override - protected OpInstanceCache initialValue() { - return new OpInstanceCache(); - } - }; - + protected final OpInstanceCache cache = new OpInstanceCache(); + /** * The edit directories that are shared between primary and secondary. */ @@ -218,6 +214,17 @@ public class FSEditLog implements LogsPurgeable { } }; + static FSEditLog newInstance(Configuration conf, NNStorage storage, + List editsDirs) { + boolean asyncEditLogging = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT); + LOG.info("Edit logging is async:" + asyncEditLogging); + return asyncEditLogging + ? new FSEditLogAsync(conf, storage, editsDirs) + : new FSEditLog(conf, storage, editsDirs); + } + /** * Constructor for FSEditLog. Underlying journals are constructed, but * no streams are opened until open() is called. @@ -424,33 +431,35 @@ public class FSEditLog implements LogsPurgeable { // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); - - long start = beginTransaction(); - op.setTransactionId(txid); - try { - editLogStream.write(op); - } catch (IOException ex) { - // All journals failed, it is handled in logSync. - } finally { - op.reset(); - } - - endTransaction(start); - // check if it is time to schedule an automatic sync - needsSync = shouldForceSync(); + needsSync = doEditTransaction(op); if (needsSync) { isAutoSyncScheduled = true; } } - + // Sync the log if an automatic sync is required. if (needsSync) { logSync(); } } + synchronized boolean doEditTransaction(final FSEditLogOp op) { + long start = beginTransaction(); + op.setTransactionId(txid); + + try { + editLogStream.write(op); + } catch (IOException ex) { + // All journals failed, it is handled in logSync. + } finally { + op.reset(); + } + endTransaction(start); + return shouldForceSync(); + } + /** * Wait if an automatic sync is scheduled */ @@ -545,15 +554,10 @@ public class FSEditLog implements LogsPurgeable { * else more operations can start writing while this is in progress. */ void logSyncAll() { - // Record the most recent transaction ID as our own id - synchronized (this) { - TransactionId id = myTransactionId.get(); - id.txid = txid; - } - // Then make sure we're synced up to this point - logSync(); + // Make sure we're synced up to the most recent transaction ID. + logSync(getLastWrittenTxId()); } - + /** * Sync all modifications done by this thread. * @@ -583,12 +587,14 @@ public class FSEditLog implements LogsPurgeable { * waitForSyncToFinish() before assuming they are running alone. */ public void logSync() { - long syncStart = 0; + // Fetch the transactionId of this thread. + logSync(myTransactionId.get().txid); + } - // Fetch the transactionId of this thread. - long mytxid = myTransactionId.get().txid; - + protected void logSync(long mytxid) { + long syncStart = 0; boolean sync = false; + long editsBatchedInSync = 0; try { EditLogOutputStream logStream = null; synchronized (this) { @@ -607,19 +613,17 @@ public class FSEditLog implements LogsPurgeable { // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) { - numTransactionsBatchedInSync++; - if (metrics != null) { - // Metrics is non-null only when used inside name node - metrics.incrTransactionsBatchedInSync(); - } return; } - - // now, this thread will do the sync + + // now, this thread will do the sync. track if other edits were + // included in the sync - ie. batched. if this is the only edit + // synced then the batched count is 0 + editsBatchedInSync = txid - synctxid - 1; syncStart = txid; isSyncRunning = true; sync = true; - + // swap buffers try { if (journalSet.isEmpty()) { @@ -668,6 +672,8 @@ public class FSEditLog implements LogsPurgeable { if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); + metrics.incrTransactionsBatchedInSync(editsBatchedInSync); + numTransactionsBatchedInSync += editsBatchedInSync; } } finally { @@ -1139,13 +1145,13 @@ public class FSEditLog implements LogsPurgeable { } void logStartRollingUpgrade(long startTime) { - RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get()); + RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get()); op.setTime(startTime); logEdit(op); } void logFinalizeRollingUpgrade(long finalizeTime) { - RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get()); + RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get()); op.setTime(finalizeTime); logEdit(op); } @@ -1280,8 +1286,9 @@ public class FSEditLog implements LogsPurgeable { if (writeEndTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); - logSync(); } + // always sync to ensure all edits are flushed. + logSyncAll(); printStatistics(true); @@ -1657,6 +1664,12 @@ public class FSEditLog implements LogsPurgeable { } } + @VisibleForTesting + // needed by async impl to restart thread when edit log is replaced by a + // spy because a spy is a shallow copy + public void restart() { + } + /** * Return total number of syncs happened on this edit log. * @return long - count diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java new file mode 100644 index 00000000000..c14a31021ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.util.ExitUtil; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +class FSEditLogAsync extends FSEditLog implements Runnable { + static final Log LOG = LogFactory.getLog(FSEditLog.class); + + // use separate mutex to avoid possible deadlock when stopping the thread. + private final Object syncThreadLock = new Object(); + private Thread syncThread; + private static ThreadLocal threadEdit = new ThreadLocal(); + + // requires concurrent access from caller threads and syncing thread. + private final BlockingQueue editPendingQ = + new ArrayBlockingQueue(4096); + + // only accessed by syncing thread so no synchronization required. + // queue is unbounded because it's effectively limited by the size + // of the edit log buffer - ie. a sync will eventually be forced. + private final Deque syncWaitQ = new ArrayDeque(); + + FSEditLogAsync(Configuration conf, NNStorage storage, List editsDirs) { + super(conf, storage, editsDirs); + // op instances cannot be shared due to queuing for background thread. + cache.disableCache(); + } + + private boolean isSyncThreadAlive() { + synchronized(syncThreadLock) { + return syncThread != null && syncThread.isAlive(); + } + } + + private void startSyncThread() { + synchronized(syncThreadLock) { + if (!isSyncThreadAlive()) { + syncThread = new Thread(this, this.getClass().getSimpleName()); + syncThread.start(); + } + } + } + + private void stopSyncThread() { + synchronized(syncThreadLock) { + if (syncThread != null) { + try { + syncThread.interrupt(); + syncThread.join(); + } catch (InterruptedException e) { + // we're quitting anyway. + } finally { + syncThread = null; + } + } + } + } + + @VisibleForTesting + @Override + public void restart() { + stopSyncThread(); + startSyncThread(); + } + + @Override + void openForWrite(int layoutVersion) throws IOException { + try { + startSyncThread(); + super.openForWrite(layoutVersion); + } catch (IOException ioe) { + stopSyncThread(); + throw ioe; + } + } + + @Override + public void close() { + super.close(); + stopSyncThread(); + } + + @Override + void logEdit(final FSEditLogOp op) { + Edit edit = getEditInstance(op); + threadEdit.set(edit); + enqueueEdit(edit); + } + + @Override + public void logSync() { + Edit edit = threadEdit.get(); + if (edit != null) { + // do NOT remove to avoid expunge & rehash penalties. + threadEdit.set(null); + if (LOG.isDebugEnabled()) { + LOG.debug("logSync " + edit); + } + edit.logSyncWait(); + } + } + + @Override + public void logSyncAll() { + // doesn't actually log anything, just ensures that the queues are + // drained when it returns. + Edit edit = new SyncEdit(this, null){ + @Override + public boolean logEdit() { + return true; + } + }; + enqueueEdit(edit); + edit.logSyncWait(); + } + + private void enqueueEdit(Edit edit) { + if (LOG.isDebugEnabled()) { + LOG.debug("logEdit " + edit); + } + try { + if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) { + Preconditions.checkState( + isSyncThreadAlive(), "sync thread is not alive"); + editPendingQ.put(edit); + } + } catch (Throwable t) { + // should never happen! failure to enqueue an edit is fatal + terminate(t); + } + } + + private Edit dequeueEdit() throws InterruptedException { + // only block for next edit if no pending syncs. + return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll(); + } + + @Override + public void run() { + try { + while (true) { + boolean doSync; + Edit edit = dequeueEdit(); + if (edit != null) { + // sync if requested by edit log. + doSync = edit.logEdit(); + syncWaitQ.add(edit); + } else { + // sync when editq runs dry, but have edits pending a sync. + doSync = !syncWaitQ.isEmpty(); + } + if (doSync) { + // normally edit log exceptions cause the NN to terminate, but tests + // relying on ExitUtil.terminate need to see the exception. + RuntimeException syncEx = null; + try { + logSync(getLastWrittenTxId()); + } catch (RuntimeException ex) { + syncEx = ex; + } + while ((edit = syncWaitQ.poll()) != null) { + edit.logSyncNotify(syncEx); + } + } + } + } catch (InterruptedException ie) { + LOG.info(Thread.currentThread().getName() + " was interrupted, exiting"); + } catch (Throwable t) { + terminate(t); + } + } + + private void terminate(Throwable t) { + String message = "Exception while edit logging: "+t.getMessage(); + LOG.fatal(message, t); + ExitUtil.terminate(1, message); + } + + private Edit getEditInstance(FSEditLogOp op) { + final Edit edit; + final Server.Call rpcCall = Server.getCurCall().get(); + // only rpc calls not explicitly sync'ed on the log will be async. + if (rpcCall != null && !Thread.holdsLock(this)) { + edit = new RpcEdit(this, op, rpcCall); + } else { + edit = new SyncEdit(this, op); + } + return edit; + } + + private abstract static class Edit { + final FSEditLog log; + final FSEditLogOp op; + + Edit(FSEditLog log, FSEditLogOp op) { + this.log = log; + this.op = op; + } + + // return whether edit log wants to sync. + boolean logEdit() { + return log.doEditTransaction(op); + } + + // wait for background thread to finish syncing. + abstract void logSyncWait(); + // wake up the thread in logSyncWait. + abstract void logSyncNotify(RuntimeException ex); + } + + // the calling thread is synchronously waiting for the edit to complete. + private static class SyncEdit extends Edit { + private final Object lock; + private boolean done = false; + private RuntimeException syncEx; + + SyncEdit(FSEditLog log, FSEditLogOp op) { + super(log, op); + // if the log is already sync'ed (ex. log rolling), must wait on it to + // avoid deadlock with sync thread. the fsn lock protects against + // logging during a roll. else lock on this object to avoid sync + // contention on edit log. + lock = Thread.holdsLock(log) ? log : this; + } + + @Override + public void logSyncWait() { + synchronized(lock) { + while (!done) { + try { + lock.wait(10); + } catch (InterruptedException e) {} + } + // only needed by tests that rely on ExitUtil.terminate() since + // normally exceptions terminate the NN. + if (syncEx != null) { + syncEx.fillInStackTrace(); + throw syncEx; + } + } + } + + @Override + public void logSyncNotify(RuntimeException ex) { + synchronized(lock) { + done = true; + syncEx = ex; + lock.notifyAll(); + } + } + + @Override + public String toString() { + return "["+getClass().getSimpleName()+" op:"+op+"]"; + } + } + + // the calling rpc thread will return immediately from logSync but the + // rpc response will not be sent until the edit is durable. + private static class RpcEdit extends Edit { + private final Server.Call call; + + RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) { + super(log, op); + this.call = call; + call.postponeResponse(); + } + + @Override + public void logSyncWait() { + // logSync is a no-op to immediately free up rpc handlers. the + // response is sent when the sync thread calls syncNotify. + } + + @Override + public void logSyncNotify(RuntimeException syncEx) { + try { + if (syncEx == null) { + call.sendResponse(); + } else { + call.abortResponse(syncEx); + } + } catch (Exception e) {} // don't care if not sent. + } + + @Override + public String toString() { + return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]"; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 41be4405985..ae9bfe15ad8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -147,6 +147,55 @@ public abstract class FSEditLogOp { byte[] rpcClientId; int rpcCallId; + public static class OpInstanceCache { + private static ThreadLocal cache = + new ThreadLocal() { + @Override + protected OpInstanceCacheMap initialValue() { + return new OpInstanceCacheMap(); + } + }; + + @SuppressWarnings("serial") + static final class OpInstanceCacheMap extends + EnumMap { + OpInstanceCacheMap() { + super(FSEditLogOpCodes.class); + for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) { + put(opCode, newInstance(opCode)); + } + } + } + + private boolean useCache = true; + + void disableCache() { + useCache = false; + } + + public OpInstanceCache get() { + return this; + } + + @SuppressWarnings("unchecked") + public T get(FSEditLogOpCodes opCode) { + return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode); + } + + private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) { + FSEditLogOp instance = null; + Class clazz = opCode.getOpClass(); + if (clazz != null) { + try { + instance = clazz.newInstance(); + } catch (Exception ex) { + throw new RuntimeException("Failed to instantiate "+opCode, ex); + } + } + return instance; + } + } + final void reset() { txid = HdfsServerConstants.INVALID_TXID; rpcClientId = RpcConstants.DUMMY_CLIENT_ID; @@ -156,72 +205,6 @@ public abstract class FSEditLogOp { abstract void resetSubFields(); - final public static class OpInstanceCache { - private final EnumMap inst = - new EnumMap(FSEditLogOpCodes.class); - - public OpInstanceCache() { - inst.put(OP_ADD, new AddOp()); - inst.put(OP_CLOSE, new CloseOp()); - inst.put(OP_SET_REPLICATION, new SetReplicationOp()); - inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); - inst.put(OP_RENAME_OLD, new RenameOldOp()); - inst.put(OP_DELETE, new DeleteOp()); - inst.put(OP_MKDIR, new MkdirOp()); - inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op()); - inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); - inst.put(OP_SET_OWNER, new SetOwnerOp()); - inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); - inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); - inst.put(OP_SET_QUOTA, new SetQuotaOp()); - inst.put(OP_TIMES, new TimesOp()); - inst.put(OP_SYMLINK, new SymlinkOp()); - inst.put(OP_RENAME, new RenameOp()); - inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); - inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); - inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); - inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp()); - inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); - inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT)); - inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT)); - inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp()); - inst.put(OP_TRUNCATE, new TruncateOp()); - - inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp()); - inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp()); - inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp()); - inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp()); - inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp()); - inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op()); - inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp()); - inst.put(OP_ADD_BLOCK, new AddBlockOp()); - inst.put(OP_ADD_CACHE_DIRECTIVE, - new AddCacheDirectiveInfoOp()); - inst.put(OP_MODIFY_CACHE_DIRECTIVE, - new ModifyCacheDirectiveInfoOp()); - inst.put(OP_REMOVE_CACHE_DIRECTIVE, - new RemoveCacheDirectiveInfoOp()); - inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp()); - inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp()); - inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp()); - - inst.put(OP_SET_ACL, new SetAclOp()); - inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp( - OP_ROLLING_UPGRADE_START, "start")); - inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp( - OP_ROLLING_UPGRADE_FINALIZE, "finalize")); - inst.put(OP_SET_XATTR, new SetXAttrOp()); - inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp()); - inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp()); - inst.put(OP_APPEND, new AppendOp()); - inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp()); - } - - public FSEditLogOp get(FSEditLogOpCodes opcode) { - return inst.get(opcode); - } - } - private static ImmutableMap fsActionMap() { ImmutableMap.Builder b = ImmutableMap.builder(); for (FsAction v : FsAction.values()) @@ -776,7 +759,7 @@ public abstract class FSEditLogOp { * {@link ClientProtocol#append} */ static class AddOp extends AddCloseOp { - private AddOp() { + AddOp() { super(OP_ADD); } @@ -804,7 +787,7 @@ public abstract class FSEditLogOp { * finally log an AddOp. */ static class CloseOp extends AddCloseOp { - private CloseOp() { + CloseOp() { super(OP_CLOSE); } @@ -832,7 +815,7 @@ public abstract class FSEditLogOp { String clientMachine; boolean newBlock; - private AppendOp() { + AppendOp() { super(OP_APPEND); } @@ -922,7 +905,7 @@ public abstract class FSEditLogOp { private Block penultimateBlock; private Block lastBlock; - private AddBlockOp() { + AddBlockOp() { super(OP_ADD_BLOCK); } @@ -1034,7 +1017,7 @@ public abstract class FSEditLogOp { String path; Block[] blocks; - private UpdateBlocksOp() { + UpdateBlocksOp() { super(OP_UPDATE_BLOCKS); } @@ -1128,7 +1111,7 @@ public abstract class FSEditLogOp { String path; short replication; - private SetReplicationOp() { + SetReplicationOp() { super(OP_SET_REPLICATION); } @@ -1207,7 +1190,7 @@ public abstract class FSEditLogOp { long timestamp; final static public int MAX_CONCAT_SRC = 1024 * 1024; - private ConcatDeleteOp() { + ConcatDeleteOp() { super(OP_CONCAT_DELETE); } @@ -1365,7 +1348,7 @@ public abstract class FSEditLogOp { String dst; long timestamp; - private RenameOldOp() { + RenameOldOp() { super(OP_RENAME_OLD); } @@ -1477,7 +1460,7 @@ public abstract class FSEditLogOp { String path; long timestamp; - private DeleteOp() { + DeleteOp() { super(OP_DELETE); } @@ -1578,7 +1561,7 @@ public abstract class FSEditLogOp { List aclEntries; List xAttrs; - private MkdirOp() { + MkdirOp() { super(OP_MKDIR); } @@ -1751,7 +1734,7 @@ public abstract class FSEditLogOp { static class SetGenstampV1Op extends FSEditLogOp { long genStampV1; - private SetGenstampV1Op() { + SetGenstampV1Op() { super(OP_SET_GENSTAMP_V1); } @@ -1809,7 +1792,7 @@ public abstract class FSEditLogOp { static class SetGenstampV2Op extends FSEditLogOp { long genStampV2; - private SetGenstampV2Op() { + SetGenstampV2Op() { super(OP_SET_GENSTAMP_V2); } @@ -1867,7 +1850,7 @@ public abstract class FSEditLogOp { static class AllocateBlockIdOp extends FSEditLogOp { long blockId; - private AllocateBlockIdOp() { + AllocateBlockIdOp() { super(OP_ALLOCATE_BLOCK_ID); } @@ -1926,7 +1909,7 @@ public abstract class FSEditLogOp { String src; FsPermission permissions; - private SetPermissionsOp() { + SetPermissionsOp() { super(OP_SET_PERMISSIONS); } @@ -1999,7 +1982,7 @@ public abstract class FSEditLogOp { String username; String groupname; - private SetOwnerOp() { + SetOwnerOp() { super(OP_SET_OWNER); } @@ -2086,7 +2069,7 @@ public abstract class FSEditLogOp { String src; long nsQuota; - private SetNSQuotaOp() { + SetNSQuotaOp() { super(OP_SET_NS_QUOTA); } @@ -2144,7 +2127,7 @@ public abstract class FSEditLogOp { static class ClearNSQuotaOp extends FSEditLogOp { String src; - private ClearNSQuotaOp() { + ClearNSQuotaOp() { super(OP_CLEAR_NS_QUOTA); } @@ -2198,7 +2181,7 @@ public abstract class FSEditLogOp { long nsQuota; long dsQuota; - private SetQuotaOp() { + SetQuotaOp() { super(OP_SET_QUOTA); } @@ -2283,7 +2266,7 @@ public abstract class FSEditLogOp { long dsQuota; StorageType type; - private SetQuotaByStorageTypeOp() { + SetQuotaByStorageTypeOp() { super(OP_SET_QUOTA_BY_STORAGETYPE); } @@ -2366,7 +2349,7 @@ public abstract class FSEditLogOp { long mtime; long atime; - private TimesOp() { + TimesOp() { super(OP_TIMES); } @@ -2475,7 +2458,7 @@ public abstract class FSEditLogOp { long atime; PermissionStatus permissionStatus; - private SymlinkOp() { + SymlinkOp() { super(OP_SYMLINK); } @@ -2634,7 +2617,7 @@ public abstract class FSEditLogOp { long timestamp; Rename[] options; - private RenameOp() { + RenameOp() { super(OP_RENAME); } @@ -2799,7 +2782,7 @@ public abstract class FSEditLogOp { long timestamp; Block truncateBlock; - private TruncateOp() { + TruncateOp() { super(OP_TRUNCATE); } @@ -2932,7 +2915,7 @@ public abstract class FSEditLogOp { String path; String newHolder; - private ReassignLeaseOp() { + ReassignLeaseOp() { super(OP_REASSIGN_LEASE); } @@ -3014,7 +2997,7 @@ public abstract class FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; - private GetDelegationTokenOp() { + GetDelegationTokenOp() { super(OP_GET_DELEGATION_TOKEN); } @@ -3093,7 +3076,7 @@ public abstract class FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; - private RenewDelegationTokenOp() { + RenewDelegationTokenOp() { super(OP_RENEW_DELEGATION_TOKEN); } @@ -3171,7 +3154,7 @@ public abstract class FSEditLogOp { static class CancelDelegationTokenOp extends FSEditLogOp { DelegationTokenIdentifier token; - private CancelDelegationTokenOp() { + CancelDelegationTokenOp() { super(OP_CANCEL_DELEGATION_TOKEN); } @@ -3230,7 +3213,7 @@ public abstract class FSEditLogOp { static class UpdateMasterKeyOp extends FSEditLogOp { DelegationKey key; - private UpdateMasterKeyOp() { + UpdateMasterKeyOp() { super(OP_UPDATE_MASTER_KEY); } @@ -3335,8 +3318,20 @@ public abstract class FSEditLogOp { } } + static class StartLogSegmentOp extends LogSegmentOp { + StartLogSegmentOp() { + super(OP_START_LOG_SEGMENT); + } + } + + static class EndLogSegmentOp extends LogSegmentOp { + EndLogSegmentOp() { + super(OP_END_LOG_SEGMENT); + } + } + static class InvalidOp extends FSEditLogOp { - private InvalidOp() { + InvalidOp() { super(OP_INVALID); } @@ -4147,7 +4142,7 @@ public abstract class FSEditLogOp { List xAttrs; String src; - private RemoveXAttrOp() { + RemoveXAttrOp() { super(OP_REMOVE_XATTR); } @@ -4200,7 +4195,7 @@ public abstract class FSEditLogOp { List xAttrs; String src; - private SetXAttrOp() { + SetXAttrOp() { super(OP_SET_XATTR); } @@ -4253,7 +4248,7 @@ public abstract class FSEditLogOp { List aclEntries = Lists.newArrayList(); String src; - private SetAclOp() { + SetAclOp() { super(OP_SET_ACL); } @@ -4350,7 +4345,7 @@ public abstract class FSEditLogOp { /** * Operation corresponding to upgrade */ - static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent + abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent private final String name; private long time; @@ -4417,7 +4412,7 @@ public abstract class FSEditLogOp { String path; byte policyId; - private SetStoragePolicyOp() { + SetStoragePolicyOp() { super(OP_SET_STORAGE_POLICY); } @@ -4483,6 +4478,26 @@ public abstract class FSEditLogOp { } } + static class RollingUpgradeStartOp extends RollingUpgradeOp { + RollingUpgradeStartOp() { + super(OP_ROLLING_UPGRADE_START, "start"); + } + + static RollingUpgradeStartOp getInstance(OpInstanceCache cache) { + return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START); + } + } + + static class RollingUpgradeFinalizeOp extends RollingUpgradeOp { + RollingUpgradeFinalizeOp() { + super(OP_ROLLING_UPGRADE_FINALIZE, "finalize"); + } + + static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) { + return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE); + } + } + /** * Class for writing editlog ops */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java index 1a0a296f8c9..3f8febac35a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*; /** * Op codes for edits file @@ -27,60 +28,64 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum FSEditLogOpCodes { // last op code in file - OP_ADD ((byte) 0), - OP_RENAME_OLD ((byte) 1), // deprecated operation - OP_DELETE ((byte) 2), - OP_MKDIR ((byte) 3), - OP_SET_REPLICATION ((byte) 4), + OP_ADD ((byte) 0, AddOp.class), + // deprecated operation + OP_RENAME_OLD ((byte) 1, RenameOldOp.class), + OP_DELETE ((byte) 2, DeleteOp.class), + OP_MKDIR ((byte) 3, MkdirOp.class), + OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class), @Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete @Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete - OP_SET_PERMISSIONS ((byte) 7), - OP_SET_OWNER ((byte) 8), - OP_CLOSE ((byte) 9), - OP_SET_GENSTAMP_V1 ((byte) 10), - OP_SET_NS_QUOTA ((byte) 11), // obsolete - OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete - OP_TIMES ((byte) 13), // set atime, mtime - OP_SET_QUOTA ((byte) 14), - OP_RENAME ((byte) 15), // filecontext rename - OP_CONCAT_DELETE ((byte) 16), // concat files - OP_SYMLINK ((byte) 17), - OP_GET_DELEGATION_TOKEN ((byte) 18), - OP_RENEW_DELEGATION_TOKEN ((byte) 19), - OP_CANCEL_DELEGATION_TOKEN ((byte) 20), - OP_UPDATE_MASTER_KEY ((byte) 21), - OP_REASSIGN_LEASE ((byte) 22), - OP_END_LOG_SEGMENT ((byte) 23), - OP_START_LOG_SEGMENT ((byte) 24), - OP_UPDATE_BLOCKS ((byte) 25), - OP_CREATE_SNAPSHOT ((byte) 26), - OP_DELETE_SNAPSHOT ((byte) 27), - OP_RENAME_SNAPSHOT ((byte) 28), - OP_ALLOW_SNAPSHOT ((byte) 29), - OP_DISALLOW_SNAPSHOT ((byte) 30), - OP_SET_GENSTAMP_V2 ((byte) 31), - OP_ALLOCATE_BLOCK_ID ((byte) 32), - OP_ADD_BLOCK ((byte) 33), - OP_ADD_CACHE_DIRECTIVE ((byte) 34), - OP_REMOVE_CACHE_DIRECTIVE ((byte) 35), - OP_ADD_CACHE_POOL ((byte) 36), - OP_MODIFY_CACHE_POOL ((byte) 37), - OP_REMOVE_CACHE_POOL ((byte) 38), - OP_MODIFY_CACHE_DIRECTIVE ((byte) 39), - OP_SET_ACL ((byte) 40), - OP_ROLLING_UPGRADE_START ((byte) 41), - OP_ROLLING_UPGRADE_FINALIZE ((byte) 42), - OP_SET_XATTR ((byte) 43), - OP_REMOVE_XATTR ((byte) 44), - OP_SET_STORAGE_POLICY ((byte) 45), - OP_TRUNCATE ((byte) 46), - OP_APPEND ((byte) 47), - OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48), + OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class), + OP_SET_OWNER ((byte) 8, SetOwnerOp.class), + OP_CLOSE ((byte) 9, CloseOp.class), + OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class), + OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete + OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete + OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime + OP_SET_QUOTA ((byte) 14, SetQuotaOp.class), + // filecontext rename + OP_RENAME ((byte) 15, RenameOp.class), + // concat files + OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class), + OP_SYMLINK ((byte) 17, SymlinkOp.class), + OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class), + OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class), + OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class), + OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class), + OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class), + OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class), + OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class), + OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class), + OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class), + OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class), + OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class), + OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class), + OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class), + OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class), + OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class), + OP_ADD_BLOCK ((byte) 33, AddBlockOp.class), + OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class), + OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class), + OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class), + OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class), + OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class), + OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class), + OP_SET_ACL ((byte) 40, SetAclOp.class), + OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class), + OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class), + OP_SET_XATTR ((byte) 43, SetXAttrOp.class), + OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class), + OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class), + OP_TRUNCATE ((byte) 46, TruncateOp.class), + OP_APPEND ((byte) 47, AppendOp.class), + OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class), // Note that the current range of the valid OP code is 0~127 OP_INVALID ((byte) -1); private final byte opCode; + private final Class opClass; /** * Constructor @@ -88,7 +93,12 @@ public enum FSEditLogOpCodes { * @param opCode byte value of constructed enum */ FSEditLogOpCodes(byte opCode) { + this(opCode, null); + } + + FSEditLogOpCodes(byte opCode, Class opClass) { this.opCode = opCode; + this.opClass = opClass; } /** @@ -100,6 +110,10 @@ public enum FSEditLogOpCodes { return opCode; } + public Class getOpClass() { + return opClass; + } + private static final FSEditLogOpCodes[] VALUES; static { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index e0f174ec067..778d0538128 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -140,7 +140,7 @@ public class FSImage implements Closeable { storage.setRestoreFailedStorage(true); } - this.editLog = new FSEditLog(conf, storage, editsDirs); + this.editLog = FSEditLog.newInstance(conf, storage, editsDirs); archivalManager = new NNStorageRetentionManager(conf, storage, editLog); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 563440e4c2a..8dd519bd474 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -1275,7 +1275,6 @@ public class NameNode implements NameNodeStatusMXBean { newSharedEditLog.logEdit(op); if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) { - newSharedEditLog.logSync(); newSharedEditLog.endCurrentLogSegment(false); LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index e214451a4bf..8341c7a5684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -295,8 +295,8 @@ public class NameNodeMetrics { transactions.add(latency); } - public void incrTransactionsBatchedInSync() { - transactionsBatchedInSync.incr(); + public void incrTransactionsBatchedInSync(long count) { + transactionsBatchedInSync.incr(count); } public void addSync(long elapsed) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index b9aa1e79cd8..3eda0086ee7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -267,6 +267,9 @@ public class DFSTestUtil { } public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) { + // spies are shallow copies, must allow async log to restart its thread + // so it has the new copy + newLog.restart(); Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog); Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java index 8c53ad61316..7faa66828f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java @@ -71,17 +71,21 @@ import org.junit.runners.Parameterized.Parameters; public class TestAuditLogs { static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log"; final boolean useAsyncLog; - + final boolean useAsyncEdits; + @Parameters public static Collection data() { Collection params = new ArrayList(); - params.add(new Object[]{new Boolean(false)}); - params.add(new Object[]{new Boolean(true)}); + params.add(new Object[]{Boolean.FALSE, Boolean.FALSE}); + params.add(new Object[]{Boolean.TRUE, Boolean.FALSE}); + params.add(new Object[]{Boolean.FALSE, Boolean.TRUE}); + params.add(new Object[]{Boolean.TRUE, Boolean.TRUE}); return params; } - - public TestAuditLogs(boolean useAsyncLog) { + + public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) { this.useAsyncLog = useAsyncLog; + this.useAsyncEdits = useAsyncEdits; } // Pattern for: @@ -119,6 +123,7 @@ public class TestAuditLogs { conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits); util = new DFSTestUtil.Builder().setName("TestAuditAllowed"). setNumFiles(20).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 226f395e704..140c9c17354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -88,6 +88,9 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.LogManager; import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -98,12 +101,33 @@ import com.google.common.collect.Lists; /** * This class tests the creation and validation of a checkpoint. */ +@RunWith(Parameterized.class) public class TestEditLog { - + static { GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); } + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestEditLog(Boolean async) { + useAsyncEditLog = async; + } + + public static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + /** * A garbage mkdir op which is used for testing * {@link EditLogFileInputStream#scanEditLog(File)} @@ -225,11 +249,12 @@ public class TestEditLog { * @param storage Storage object used by namenode */ private static FSEditLog getFSEditLog(NNStorage storage) throws IOException { - Configuration conf = new Configuration(); + Configuration conf = getConf(); // Make sure the edits dirs are set in the provided configuration object. conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, StringUtils.join(",", storage.getEditsDirectories())); - FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf)); + FSEditLog log = FSEditLog.newInstance( + conf, storage, FSNamesystem.getNamespaceEditsDirs(conf)); return log; } @@ -252,7 +277,7 @@ public class TestEditLog { */ @Test public void testPreTxidEditLogWithEdits() throws Exception { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; try { @@ -282,7 +307,7 @@ public class TestEditLog { @Test public void testSimpleEditLog() throws IOException { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { @@ -351,7 +376,7 @@ public class TestEditLog { private void testEditLog(int initialSize) throws IOException { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; @@ -482,8 +507,12 @@ public class TestEditLog { @Test public void testSyncBatching() throws Exception { - // start a cluster - Configuration conf = new HdfsConfiguration(); + if (useAsyncEditLog) { + // semantics are completely differently since edits will be auto-synced + return; + } + // start a cluster + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; ExecutorService threadA = Executors.newSingleThreadExecutor(); @@ -546,7 +575,7 @@ public class TestEditLog { @Test public void testBatchedSyncWithClosedLogs() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; ExecutorService threadA = Executors.newSingleThreadExecutor(); @@ -585,7 +614,7 @@ public class TestEditLog { @Test public void testEditChecksum() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); @@ -657,7 +686,7 @@ public class TestEditLog { */ private void testCrashRecovery(int numTransactions) throws Exception { MiniDFSCluster cluster = null; - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, CHECKPOINT_ON_STARTUP_MIN_TXNS); @@ -802,7 +831,7 @@ public class TestEditLog { boolean updateTransactionIdFile, boolean shouldSucceed) throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(NUM_DATA_NODES).build(); @@ -1133,7 +1162,7 @@ public class TestEditLog { public static NNStorage setupEdits(List editUris, int numrolls, boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException { List aborts = new ArrayList(Arrays.asList(abortAtRolls)); - NNStorage storage = new NNStorage(new Configuration(), + NNStorage storage = new NNStorage(getConf(), Collections.emptyList(), editUris); storage.format(new NamespaceInfo()); @@ -1295,7 +1324,7 @@ public class TestEditLog { EditLogFileOutputStream elfos = null; EditLogFileInputStream elfis = null; try { - elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0); + elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0); elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); elfos.writeRaw(garbage, 0, garbage.length); elfos.setReadyToFlush(); @@ -1471,7 +1500,7 @@ public class TestEditLog { public void testManyEditLogSegments() throws IOException { final int NUM_EDIT_LOG_ROLLS = 1000; // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java index f22ee2fcc24..c60d79f6513 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import java.net.BindException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Random; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY; @@ -30,18 +32,40 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +@RunWith(Parameterized.class) public class TestEditLogAutoroll { + static { + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); + } + + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestEditLogAutoroll(Boolean async) { + useAsyncEditLog = async; + } private Configuration conf; private MiniDFSCluster cluster; @@ -61,6 +85,8 @@ public class TestEditLogAutoroll { // Make it autoroll after 10 edits conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f); conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); int retryCount = 0; while (true) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java index 51dfc3e993d..28169bbf160 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java @@ -21,12 +21,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -43,13 +44,37 @@ import org.apache.hadoop.util.ExitUtil.ExitException; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class TestEditLogJournalFailures { private int editsPerformed = 0; private MiniDFSCluster cluster; private FileSystem fs; + private boolean useAsyncEdits; + + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{Boolean.FALSE}); + params.add(new Object[]{Boolean.TRUE}); + return params; + } + + public TestEditLogJournalFailures(boolean useAsyncEdits) { + this.useAsyncEdits = useAsyncEdits; + } + + private Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEdits); + return conf; + } /** * Create the mini cluster for testing and sub in a custom runtime so that @@ -57,9 +82,9 @@ public class TestEditLogJournalFailures { */ @Before public void setUpMiniCluster() throws IOException { - setUpMiniCluster(new HdfsConfiguration(), true); + setUpMiniCluster(getConf(), true); } - + public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs) throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) @@ -153,7 +178,7 @@ public class TestEditLogJournalFailures { String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings( DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); shutDownMiniCluster(); - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]); conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0); @@ -193,7 +218,7 @@ public class TestEditLogJournalFailures { throws IOException { // Set up 4 name/edits dirs. shutDownMiniCluster(); - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); String[] nameDirs = new String[4]; for (int i = 0; i < nameDirs.length; i++) { File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index 9d54a27acac..24ae17a4a86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer; * This class tests various synchronization bugs in FSEditLog rolling * and namespace saving. */ +@RunWith(Parameterized.class) public class TestEditLogRace { static { GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); } - private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ false }); + params.add(new Object[]{ true }); + return params; + } - private static final String NAME_DIR = - MiniDFSCluster.getBaseDirectory() + "name1"; + private static boolean useAsyncEditLog; + + public TestEditLogRace(boolean useAsyncEditLog) { + TestEditLogRace.useAsyncEditLog = useAsyncEditLog; + } + + private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); // This test creates NUM_THREADS threads and each thread continuously writes // transactions @@ -94,21 +113,29 @@ public class TestEditLogRace { * This value needs to be significantly longer than the average * time for an fsync() or enterSafeMode(). */ - private static final int BLOCK_TIME = 10; - + private static final int BLOCK_TIME = 4; // 4 sec pretty generous + // // an object that does a bunch of transactions // static class Transactions implements Runnable { final NamenodeProtocols nn; + final MiniDFSCluster cluster; + FileSystem fs; short replication = 3; long blockSize = 64; volatile boolean stopped = false; volatile Thread thr; final AtomicReference caught; - Transactions(NamenodeProtocols ns, AtomicReference caught) { - nn = ns; + Transactions(MiniDFSCluster cluster, AtomicReference caught) { + this.cluster = cluster; + this.nn = cluster.getNameNodeRpc(); + try { + this.fs = cluster.getFileSystem(); + } catch (IOException e) { + caught.set(e); + } this.caught = caught; } @@ -122,11 +149,23 @@ public class TestEditLogRace { while (!stopped) { try { String dirname = "/thr-" + thr.getId() + "-dir-" + i; - nn.mkdirs(dirname, p, true); - nn.delete(dirname, true); + if (i % 2 == 0) { + Path dirnamePath = new Path(dirname); + fs.mkdirs(dirnamePath); + fs.delete(dirnamePath, true); + } else { + nn.mkdirs(dirname, p, true); + nn.delete(dirname, true); + } } catch (SafeModeException sme) { // This is OK - the tests will bring NN in and out of safemode } catch (Throwable e) { + // This is OK - the tests will bring NN in and out of safemode + if (e instanceof RemoteException && + ((RemoteException)e).getClassName() + .contains("SafeModeException")) { + return; + } LOG.warn("Got error in transaction thread", e); caught.compareAndSet(null, e); break; @@ -144,11 +183,11 @@ public class TestEditLogRace { } } - private void startTransactionWorkers(NamenodeProtocols namesystem, + private void startTransactionWorkers(MiniDFSCluster cluster, AtomicReference caughtErr) { // Create threads and make them run transactions concurrently. for (int i = 0; i < NUM_THREADS; i++) { - Transactions trans = new Transactions(namesystem, caughtErr); + Transactions trans = new Transactions(cluster, caughtErr); new Thread(trans, "TransactionThread-" + i).start(); workers.add(trans); } @@ -174,21 +213,21 @@ public class TestEditLogRace { @Test public void testEditLogRolling() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = null; + Configuration conf = getConf(); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); FileSystem fileSys = null; AtomicReference caughtErr = new AtomicReference(); try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); cluster.waitActive(); fileSys = cluster.getFileSystem(); final NamenodeProtocols nn = cluster.getNameNode().getRpcServer(); FSImage fsimage = cluster.getNamesystem().getFSImage(); StorageDirectory sd = fsimage.getStorage().getStorageDir(0); - startTransactionWorkers(nn, caughtErr); + startTransactionWorkers(cluster, caughtErr); long previousLogTxId = 1; @@ -256,7 +295,7 @@ public class TestEditLogRace { @Test public void testSaveNamespace() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; @@ -266,12 +305,11 @@ public class TestEditLogRace { cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); - final NamenodeProtocols nn = cluster.getNameNodeRpc(); FSImage fsimage = namesystem.getFSImage(); FSEditLog editLog = fsimage.getEditLog(); - startTransactionWorkers(nn, caughtErr); + startTransactionWorkers(cluster, caughtErr); for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) { try { @@ -321,11 +359,13 @@ public class TestEditLogRace { private Configuration getConf() { Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); - conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); + //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); return conf; } @@ -389,7 +429,7 @@ public class TestEditLogRace { @Override public Void answer(InvocationOnMock invocation) throws Throwable { LOG.info("Flush called"); - if (Thread.currentThread() == doAnEditThread) { + if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) { LOG.info("edit thread: Telling main thread we made it to flush section..."); // Signal to main thread that the edit thread is in the racy section waitToEnterFlush.countDown(); @@ -457,62 +497,52 @@ public class TestEditLogRace { try { FSImage fsimage = namesystem.getFSImage(); - FSEditLog editLog = spy(fsimage.getEditLog()); - DFSTestUtil.setEditLogForTesting(namesystem, editLog); + final FSEditLog editLog = fsimage.getEditLog(); final AtomicReference deferredException = new AtomicReference(); - final CountDownLatch waitToEnterSync = new CountDownLatch(1); - + final CountDownLatch sleepingBeforeSync = new CountDownLatch(1); + final Thread doAnEditThread = new Thread() { @Override public void run() { try { - LOG.info("Starting mkdirs"); - namesystem.mkdirs("/test", - new PermissionStatus("test","test", new FsPermission((short)00755)), - true); - LOG.info("mkdirs complete"); + LOG.info("Starting setOwner"); + namesystem.writeLock(); + try { + editLog.logSetOwner("/","test","test"); + } finally { + namesystem.writeUnlock(); + } + sleepingBeforeSync.countDown(); + LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); + Thread.sleep(BLOCK_TIME*1000); + editLog.logSync(); + LOG.info("edit thread: logSync complete"); } catch (Throwable ioe) { LOG.fatal("Got exception", ioe); deferredException.set(ioe); - waitToEnterSync.countDown(); + sleepingBeforeSync.countDown(); } } }; - - Answer blockingSync = new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - LOG.info("logSync called"); - if (Thread.currentThread() == doAnEditThread) { - LOG.info("edit thread: Telling main thread we made it just before logSync..."); - waitToEnterSync.countDown(); - LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); - Thread.sleep(BLOCK_TIME*1000); - LOG.info("Going through to logSync. This will allow the main thread to continue."); - } - invocation.callRealMethod(); - LOG.info("logSync complete"); - return null; - } - }; - doAnswer(blockingSync).when(editLog).logSync(); - + doAnEditThread.setDaemon(true); doAnEditThread.start(); LOG.info("Main thread: waiting to just before logSync..."); - waitToEnterSync.await(); + sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS); assertNull(deferredException.get()); LOG.info("Main thread: detected that logSync about to be called."); LOG.info("Trying to enter safe mode."); - LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits"); - + long st = Time.now(); namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); long et = Time.now(); - LOG.info("Entered safe mode"); - // Make sure we really waited for the flush to complete! - assertTrue(et - st > (BLOCK_TIME - 1)*1000); + LOG.info("Entered safe mode after "+(et-st)+"ms"); + + // Make sure we didn't wait for the thread that did a logEdit but + // not logSync. Going into safemode does a logSyncAll that will flush + // its edit. + assertTrue(et - st < (BLOCK_TIME/2)*1000); // Once we're in safe mode, save namespace. namesystem.saveNamespace(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index df07a62d021..97b9225cfeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -31,6 +31,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; @@ -51,25 +53,48 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Maps; import com.google.common.io.Files; +@RunWith(Parameterized.class) public class TestFSEditLogLoader { - + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestFSEditLogLoader(Boolean async) { + useAsyncEditLog = async; + } + + private static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + static { GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL); } - + private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final int NUM_DATA_NODES = 0; - + @Test public void testDisplayRecentEditLogOpCodes() throws IOException { - // start a cluster - Configuration conf = new HdfsConfiguration(); + // start a cluster + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) @@ -119,7 +144,7 @@ public class TestFSEditLogLoader { @Test public void testReplicationAdjusted() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); // Replicate and heartbeat fast to shave a few seconds off test conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java index 885abc3b231..cb6a09dd1c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -27,6 +27,8 @@ import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -50,13 +52,38 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.StringUtils; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Sets; /** * This tests data recovery mode for the NameNode. */ + +@RunWith(Parameterized.class) public class TestNameNodeRecovery { + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestNameNodeRecovery(Boolean async) { + useAsyncEditLog = async; + } + + private static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class); private static final StartupOption recoverStartOpt = StartupOption.RECOVER; private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class); @@ -73,7 +100,7 @@ public class TestNameNodeRecovery { EditLogFileOutputStream elfos = null; EditLogFileInputStream elfis = null; try { - elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0); + elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0); elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); elts.addTransactionsToLog(elfos, cache); @@ -519,7 +546,7 @@ public class TestNameNodeRecovery { final boolean needRecovery = corruptor.needRecovery(finalize); // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); setupRecoveryTestConf(conf); MiniDFSCluster cluster = null; FileSystem fileSys = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index c981c3adeb8..8238b73936d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; import java.net.BindException; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; @@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -43,11 +46,31 @@ import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +@RunWith(Parameterized.class) public class TestEditLogTailer { - + static { + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); + } + + @Parameters + public static Collection data() { + Collection params = new ArrayList(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestEditLogTailer(Boolean async) { + useAsyncEditLog = async; + } + private static final String DIR_PREFIX = "/dir"; private static final int DIRS_TO_MAKE = 20; static final long SLEEP_TIME = 1000; @@ -55,13 +78,21 @@ public class TestEditLogTailer { static { GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL); + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL); } - + + private static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + @Test public void testTailer() throws IOException, InterruptedException, ServiceFailedException { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); HAUtil.setAllowStandbyReads(conf, true); @@ -119,7 +150,7 @@ public class TestEditLogTailer { private static void testStandbyTriggersLogRolls(int activeIndex) throws Exception { - Configuration conf = new Configuration(); + Configuration conf = getConf(); // Roll every 1s conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);