HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.
(cherry picked from commit2151716832
) (cherry picked from commit3cb7ae11a8
)
This commit is contained in:
parent
fece3c881b
commit
e327325a37
|
@ -24,6 +24,9 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
|
@ -56,11 +59,14 @@ import org.apache.commons.logging.LogFactory;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Integration test to ensure that the BookKeeper JournalManager
|
||||
* works for HDFS Namenode HA
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestBookKeeperAsHASharedDir {
|
||||
static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
|
||||
|
||||
|
@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir {
|
|||
|
||||
private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestBookKeeperAsHASharedDir(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
private static Configuration getConf() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBookkeeper() throws Exception {
|
||||
bkutil = new BKJMUtil(numBookies);
|
||||
|
@ -92,8 +119,7 @@ public class TestBookKeeperAsHASharedDir {
|
|||
public void testFailoverWithBK() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
BKJMUtil.createJournalURI("/hotfailover").toString());
|
||||
BKJMUtil.addJournalManagerDefinition(conf);
|
||||
|
@ -144,8 +170,7 @@ public class TestBookKeeperAsHASharedDir {
|
|||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
|
||||
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
||||
|
@ -221,8 +246,7 @@ public class TestBookKeeperAsHASharedDir {
|
|||
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
|
||||
BKJMUtil.addJournalManagerDefinition(conf);
|
||||
|
@ -245,7 +269,9 @@ public class TestBookKeeperAsHASharedDir {
|
|||
fs = cluster.getFileSystem(0); // get the older active server.
|
||||
|
||||
try {
|
||||
fs.delete(p1, true);
|
||||
System.out.println("DMS: > *************");
|
||||
boolean foo = fs.delete(p1, true);
|
||||
System.out.println("DMS: < ************* "+foo);
|
||||
fail("Log update on older active should cause it to exit");
|
||||
} catch (RemoteException re) {
|
||||
assertTrue(re.getClassName().contains("ExitException"));
|
||||
|
@ -267,9 +293,8 @@ public class TestBookKeeperAsHASharedDir {
|
|||
public void testInitializeBKSharedEdits() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
Configuration conf = getConf();
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
|
||||
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
|
||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
|
||||
|
@ -358,8 +383,7 @@ public class TestBookKeeperAsHASharedDir {
|
|||
public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
|
||||
.createJournalURI("/correctEditLogSelection").toString());
|
||||
BKJMUtil.addJournalManagerDefinition(conf);
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
# Format is "<default threshold> (, <appender>)+
|
||||
|
||||
# DEFAULT: console appender only
|
||||
log4j.rootLogger=OFF, CONSOLE
|
||||
log4j.rootLogger=DEBUG, CONSOLE
|
||||
|
||||
# Example with rolling log file
|
||||
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
|
||||
|
|
|
@ -273,7 +273,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
|
||||
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
|
||||
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
|
||||
|
||||
|
||||
public static final String DFS_NAMENODE_EDITS_ASYNC_LOGGING =
|
||||
"dfs.namenode.edits.asynclogging";
|
||||
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
|
||||
|
||||
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
|
||||
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
|
||||
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
|
||||
|
|
|
@ -143,6 +143,10 @@ public class BackupNode extends NameNode {
|
|||
|
||||
@Override // NameNode
|
||||
protected void initialize(Configuration conf) throws IOException {
|
||||
// async edit logs are incompatible with backup node due to race
|
||||
// conditions resulting from laxer synchronization
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false);
|
||||
|
||||
// Trash is disabled in BackupNameNode,
|
||||
// but should be turned back on if it ever becomes active.
|
||||
conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,
|
||||
|
|
|
@ -79,7 +79,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
|
||||
|
@ -116,7 +118,7 @@ import com.google.common.collect.Lists;
|
|||
@InterfaceStability.Evolving
|
||||
public class FSEditLog implements LogsPurgeable {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(FSEditLog.class);
|
||||
public static final Log LOG = LogFactory.getLog(FSEditLog.class);
|
||||
|
||||
/**
|
||||
* State machine for edit log.
|
||||
|
@ -179,17 +181,11 @@ public class FSEditLog implements LogsPurgeable {
|
|||
|
||||
private final NNStorage storage;
|
||||
private final Configuration conf;
|
||||
|
||||
|
||||
private final List<URI> editsDirs;
|
||||
|
||||
private final ThreadLocal<OpInstanceCache> cache =
|
||||
new ThreadLocal<OpInstanceCache>() {
|
||||
@Override
|
||||
protected OpInstanceCache initialValue() {
|
||||
return new OpInstanceCache();
|
||||
}
|
||||
};
|
||||
|
||||
protected final OpInstanceCache cache = new OpInstanceCache();
|
||||
|
||||
/**
|
||||
* The edit directories that are shared between primary and secondary.
|
||||
*/
|
||||
|
@ -218,6 +214,17 @@ public class FSEditLog implements LogsPurgeable {
|
|||
}
|
||||
};
|
||||
|
||||
static FSEditLog newInstance(Configuration conf, NNStorage storage,
|
||||
List<URI> editsDirs) {
|
||||
boolean asyncEditLogging = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
|
||||
LOG.info("Edit logging is async:" + asyncEditLogging);
|
||||
return asyncEditLogging
|
||||
? new FSEditLogAsync(conf, storage, editsDirs)
|
||||
: new FSEditLog(conf, storage, editsDirs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for FSEditLog. Underlying journals are constructed, but
|
||||
* no streams are opened until open() is called.
|
||||
|
@ -424,33 +431,35 @@ public class FSEditLog implements LogsPurgeable {
|
|||
|
||||
// wait if an automatic sync is scheduled
|
||||
waitIfAutoSyncScheduled();
|
||||
|
||||
long start = beginTransaction();
|
||||
op.setTransactionId(txid);
|
||||
|
||||
try {
|
||||
editLogStream.write(op);
|
||||
} catch (IOException ex) {
|
||||
// All journals failed, it is handled in logSync.
|
||||
} finally {
|
||||
op.reset();
|
||||
}
|
||||
|
||||
endTransaction(start);
|
||||
|
||||
// check if it is time to schedule an automatic sync
|
||||
needsSync = shouldForceSync();
|
||||
needsSync = doEditTransaction(op);
|
||||
if (needsSync) {
|
||||
isAutoSyncScheduled = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Sync the log if an automatic sync is required.
|
||||
if (needsSync) {
|
||||
logSync();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized boolean doEditTransaction(final FSEditLogOp op) {
|
||||
long start = beginTransaction();
|
||||
op.setTransactionId(txid);
|
||||
|
||||
try {
|
||||
editLogStream.write(op);
|
||||
} catch (IOException ex) {
|
||||
// All journals failed, it is handled in logSync.
|
||||
} finally {
|
||||
op.reset();
|
||||
}
|
||||
endTransaction(start);
|
||||
return shouldForceSync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait if an automatic sync is scheduled
|
||||
*/
|
||||
|
@ -545,15 +554,10 @@ public class FSEditLog implements LogsPurgeable {
|
|||
* else more operations can start writing while this is in progress.
|
||||
*/
|
||||
void logSyncAll() {
|
||||
// Record the most recent transaction ID as our own id
|
||||
synchronized (this) {
|
||||
TransactionId id = myTransactionId.get();
|
||||
id.txid = txid;
|
||||
}
|
||||
// Then make sure we're synced up to this point
|
||||
logSync();
|
||||
// Make sure we're synced up to the most recent transaction ID.
|
||||
logSync(getLastWrittenTxId());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sync all modifications done by this thread.
|
||||
*
|
||||
|
@ -583,12 +587,14 @@ public class FSEditLog implements LogsPurgeable {
|
|||
* waitForSyncToFinish() before assuming they are running alone.
|
||||
*/
|
||||
public void logSync() {
|
||||
long syncStart = 0;
|
||||
// Fetch the transactionId of this thread.
|
||||
logSync(myTransactionId.get().txid);
|
||||
}
|
||||
|
||||
// Fetch the transactionId of this thread.
|
||||
long mytxid = myTransactionId.get().txid;
|
||||
|
||||
protected void logSync(long mytxid) {
|
||||
long syncStart = 0;
|
||||
boolean sync = false;
|
||||
long editsBatchedInSync = 0;
|
||||
try {
|
||||
EditLogOutputStream logStream = null;
|
||||
synchronized (this) {
|
||||
|
@ -607,19 +613,17 @@ public class FSEditLog implements LogsPurgeable {
|
|||
// If this transaction was already flushed, then nothing to do
|
||||
//
|
||||
if (mytxid <= synctxid) {
|
||||
numTransactionsBatchedInSync++;
|
||||
if (metrics != null) {
|
||||
// Metrics is non-null only when used inside name node
|
||||
metrics.incrTransactionsBatchedInSync();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// now, this thread will do the sync
|
||||
|
||||
// now, this thread will do the sync. track if other edits were
|
||||
// included in the sync - ie. batched. if this is the only edit
|
||||
// synced then the batched count is 0
|
||||
editsBatchedInSync = txid - synctxid - 1;
|
||||
syncStart = txid;
|
||||
isSyncRunning = true;
|
||||
sync = true;
|
||||
|
||||
|
||||
// swap buffers
|
||||
try {
|
||||
if (journalSet.isEmpty()) {
|
||||
|
@ -668,6 +672,8 @@ public class FSEditLog implements LogsPurgeable {
|
|||
|
||||
if (metrics != null) { // Metrics non-null only when used inside name node
|
||||
metrics.addSync(elapsed);
|
||||
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
|
||||
numTransactionsBatchedInSync += editsBatchedInSync;
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
@ -1139,13 +1145,13 @@ public class FSEditLog implements LogsPurgeable {
|
|||
}
|
||||
|
||||
void logStartRollingUpgrade(long startTime) {
|
||||
RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get());
|
||||
RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get());
|
||||
op.setTime(startTime);
|
||||
logEdit(op);
|
||||
}
|
||||
|
||||
void logFinalizeRollingUpgrade(long finalizeTime) {
|
||||
RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get());
|
||||
RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get());
|
||||
op.setTime(finalizeTime);
|
||||
logEdit(op);
|
||||
}
|
||||
|
@ -1280,8 +1286,9 @@ public class FSEditLog implements LogsPurgeable {
|
|||
if (writeEndTxn) {
|
||||
logEdit(LogSegmentOp.getInstance(cache.get(),
|
||||
FSEditLogOpCodes.OP_END_LOG_SEGMENT));
|
||||
logSync();
|
||||
}
|
||||
// always sync to ensure all edits are flushed.
|
||||
logSyncAll();
|
||||
|
||||
printStatistics(true);
|
||||
|
||||
|
@ -1657,6 +1664,12 @@ public class FSEditLog implements LogsPurgeable {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
// needed by async impl to restart thread when edit log is replaced by a
|
||||
// spy because a spy is a shallow copy
|
||||
public void restart() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return total number of syncs happened on this edit log.
|
||||
* @return long - count
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
class FSEditLogAsync extends FSEditLog implements Runnable {
|
||||
static final Log LOG = LogFactory.getLog(FSEditLog.class);
|
||||
|
||||
// use separate mutex to avoid possible deadlock when stopping the thread.
|
||||
private final Object syncThreadLock = new Object();
|
||||
private Thread syncThread;
|
||||
private static ThreadLocal<Edit> threadEdit = new ThreadLocal<Edit>();
|
||||
|
||||
// requires concurrent access from caller threads and syncing thread.
|
||||
private final BlockingQueue<Edit> editPendingQ =
|
||||
new ArrayBlockingQueue<Edit>(4096);
|
||||
|
||||
// only accessed by syncing thread so no synchronization required.
|
||||
// queue is unbounded because it's effectively limited by the size
|
||||
// of the edit log buffer - ie. a sync will eventually be forced.
|
||||
private final Deque<Edit> syncWaitQ = new ArrayDeque<Edit>();
|
||||
|
||||
FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
|
||||
super(conf, storage, editsDirs);
|
||||
// op instances cannot be shared due to queuing for background thread.
|
||||
cache.disableCache();
|
||||
}
|
||||
|
||||
private boolean isSyncThreadAlive() {
|
||||
synchronized(syncThreadLock) {
|
||||
return syncThread != null && syncThread.isAlive();
|
||||
}
|
||||
}
|
||||
|
||||
private void startSyncThread() {
|
||||
synchronized(syncThreadLock) {
|
||||
if (!isSyncThreadAlive()) {
|
||||
syncThread = new Thread(this, this.getClass().getSimpleName());
|
||||
syncThread.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void stopSyncThread() {
|
||||
synchronized(syncThreadLock) {
|
||||
if (syncThread != null) {
|
||||
try {
|
||||
syncThread.interrupt();
|
||||
syncThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
// we're quitting anyway.
|
||||
} finally {
|
||||
syncThread = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public void restart() {
|
||||
stopSyncThread();
|
||||
startSyncThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
void openForWrite(int layoutVersion) throws IOException {
|
||||
try {
|
||||
startSyncThread();
|
||||
super.openForWrite(layoutVersion);
|
||||
} catch (IOException ioe) {
|
||||
stopSyncThread();
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
stopSyncThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
void logEdit(final FSEditLogOp op) {
|
||||
Edit edit = getEditInstance(op);
|
||||
threadEdit.set(edit);
|
||||
enqueueEdit(edit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSync() {
|
||||
Edit edit = threadEdit.get();
|
||||
if (edit != null) {
|
||||
// do NOT remove to avoid expunge & rehash penalties.
|
||||
threadEdit.set(null);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("logSync " + edit);
|
||||
}
|
||||
edit.logSyncWait();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSyncAll() {
|
||||
// doesn't actually log anything, just ensures that the queues are
|
||||
// drained when it returns.
|
||||
Edit edit = new SyncEdit(this, null){
|
||||
@Override
|
||||
public boolean logEdit() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
enqueueEdit(edit);
|
||||
edit.logSyncWait();
|
||||
}
|
||||
|
||||
private void enqueueEdit(Edit edit) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("logEdit " + edit);
|
||||
}
|
||||
try {
|
||||
if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
|
||||
Preconditions.checkState(
|
||||
isSyncThreadAlive(), "sync thread is not alive");
|
||||
editPendingQ.put(edit);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// should never happen! failure to enqueue an edit is fatal
|
||||
terminate(t);
|
||||
}
|
||||
}
|
||||
|
||||
private Edit dequeueEdit() throws InterruptedException {
|
||||
// only block for next edit if no pending syncs.
|
||||
return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
boolean doSync;
|
||||
Edit edit = dequeueEdit();
|
||||
if (edit != null) {
|
||||
// sync if requested by edit log.
|
||||
doSync = edit.logEdit();
|
||||
syncWaitQ.add(edit);
|
||||
} else {
|
||||
// sync when editq runs dry, but have edits pending a sync.
|
||||
doSync = !syncWaitQ.isEmpty();
|
||||
}
|
||||
if (doSync) {
|
||||
// normally edit log exceptions cause the NN to terminate, but tests
|
||||
// relying on ExitUtil.terminate need to see the exception.
|
||||
RuntimeException syncEx = null;
|
||||
try {
|
||||
logSync(getLastWrittenTxId());
|
||||
} catch (RuntimeException ex) {
|
||||
syncEx = ex;
|
||||
}
|
||||
while ((edit = syncWaitQ.poll()) != null) {
|
||||
edit.logSyncNotify(syncEx);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info(Thread.currentThread().getName() + " was interrupted, exiting");
|
||||
} catch (Throwable t) {
|
||||
terminate(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void terminate(Throwable t) {
|
||||
String message = "Exception while edit logging: "+t.getMessage();
|
||||
LOG.fatal(message, t);
|
||||
ExitUtil.terminate(1, message);
|
||||
}
|
||||
|
||||
private Edit getEditInstance(FSEditLogOp op) {
|
||||
final Edit edit;
|
||||
final Server.Call rpcCall = Server.getCurCall().get();
|
||||
// only rpc calls not explicitly sync'ed on the log will be async.
|
||||
if (rpcCall != null && !Thread.holdsLock(this)) {
|
||||
edit = new RpcEdit(this, op, rpcCall);
|
||||
} else {
|
||||
edit = new SyncEdit(this, op);
|
||||
}
|
||||
return edit;
|
||||
}
|
||||
|
||||
private abstract static class Edit {
|
||||
final FSEditLog log;
|
||||
final FSEditLogOp op;
|
||||
|
||||
Edit(FSEditLog log, FSEditLogOp op) {
|
||||
this.log = log;
|
||||
this.op = op;
|
||||
}
|
||||
|
||||
// return whether edit log wants to sync.
|
||||
boolean logEdit() {
|
||||
return log.doEditTransaction(op);
|
||||
}
|
||||
|
||||
// wait for background thread to finish syncing.
|
||||
abstract void logSyncWait();
|
||||
// wake up the thread in logSyncWait.
|
||||
abstract void logSyncNotify(RuntimeException ex);
|
||||
}
|
||||
|
||||
// the calling thread is synchronously waiting for the edit to complete.
|
||||
private static class SyncEdit extends Edit {
|
||||
private final Object lock;
|
||||
private boolean done = false;
|
||||
private RuntimeException syncEx;
|
||||
|
||||
SyncEdit(FSEditLog log, FSEditLogOp op) {
|
||||
super(log, op);
|
||||
// if the log is already sync'ed (ex. log rolling), must wait on it to
|
||||
// avoid deadlock with sync thread. the fsn lock protects against
|
||||
// logging during a roll. else lock on this object to avoid sync
|
||||
// contention on edit log.
|
||||
lock = Thread.holdsLock(log) ? log : this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSyncWait() {
|
||||
synchronized(lock) {
|
||||
while (!done) {
|
||||
try {
|
||||
lock.wait(10);
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
// only needed by tests that rely on ExitUtil.terminate() since
|
||||
// normally exceptions terminate the NN.
|
||||
if (syncEx != null) {
|
||||
syncEx.fillInStackTrace();
|
||||
throw syncEx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSyncNotify(RuntimeException ex) {
|
||||
synchronized(lock) {
|
||||
done = true;
|
||||
syncEx = ex;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "["+getClass().getSimpleName()+" op:"+op+"]";
|
||||
}
|
||||
}
|
||||
|
||||
// the calling rpc thread will return immediately from logSync but the
|
||||
// rpc response will not be sent until the edit is durable.
|
||||
private static class RpcEdit extends Edit {
|
||||
private final Server.Call call;
|
||||
|
||||
RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) {
|
||||
super(log, op);
|
||||
this.call = call;
|
||||
call.postponeResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSyncWait() {
|
||||
// logSync is a no-op to immediately free up rpc handlers. the
|
||||
// response is sent when the sync thread calls syncNotify.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logSyncNotify(RuntimeException syncEx) {
|
||||
try {
|
||||
if (syncEx == null) {
|
||||
call.sendResponse();
|
||||
} else {
|
||||
call.abortResponse(syncEx);
|
||||
}
|
||||
} catch (Exception e) {} // don't care if not sent.
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -147,6 +147,55 @@ public abstract class FSEditLogOp {
|
|||
byte[] rpcClientId;
|
||||
int rpcCallId;
|
||||
|
||||
public static class OpInstanceCache {
|
||||
private static ThreadLocal<OpInstanceCacheMap> cache =
|
||||
new ThreadLocal<OpInstanceCacheMap>() {
|
||||
@Override
|
||||
protected OpInstanceCacheMap initialValue() {
|
||||
return new OpInstanceCacheMap();
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
static final class OpInstanceCacheMap extends
|
||||
EnumMap<FSEditLogOpCodes, FSEditLogOp> {
|
||||
OpInstanceCacheMap() {
|
||||
super(FSEditLogOpCodes.class);
|
||||
for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
|
||||
put(opCode, newInstance(opCode));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean useCache = true;
|
||||
|
||||
void disableCache() {
|
||||
useCache = false;
|
||||
}
|
||||
|
||||
public OpInstanceCache get() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) {
|
||||
return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode);
|
||||
}
|
||||
|
||||
private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) {
|
||||
FSEditLogOp instance = null;
|
||||
Class<? extends FSEditLogOp> clazz = opCode.getOpClass();
|
||||
if (clazz != null) {
|
||||
try {
|
||||
instance = clazz.newInstance();
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException("Failed to instantiate "+opCode, ex);
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
|
||||
final void reset() {
|
||||
txid = HdfsServerConstants.INVALID_TXID;
|
||||
rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
|
||||
|
@ -156,72 +205,6 @@ public abstract class FSEditLogOp {
|
|||
|
||||
abstract void resetSubFields();
|
||||
|
||||
final public static class OpInstanceCache {
|
||||
private final EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
|
||||
new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
|
||||
|
||||
public OpInstanceCache() {
|
||||
inst.put(OP_ADD, new AddOp());
|
||||
inst.put(OP_CLOSE, new CloseOp());
|
||||
inst.put(OP_SET_REPLICATION, new SetReplicationOp());
|
||||
inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
|
||||
inst.put(OP_RENAME_OLD, new RenameOldOp());
|
||||
inst.put(OP_DELETE, new DeleteOp());
|
||||
inst.put(OP_MKDIR, new MkdirOp());
|
||||
inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
|
||||
inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
||||
inst.put(OP_SET_OWNER, new SetOwnerOp());
|
||||
inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
||||
inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
|
||||
inst.put(OP_SET_QUOTA, new SetQuotaOp());
|
||||
inst.put(OP_TIMES, new TimesOp());
|
||||
inst.put(OP_SYMLINK, new SymlinkOp());
|
||||
inst.put(OP_RENAME, new RenameOp());
|
||||
inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
|
||||
inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
|
||||
inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
|
||||
inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp());
|
||||
inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
|
||||
inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
|
||||
inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
|
||||
inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
|
||||
inst.put(OP_TRUNCATE, new TruncateOp());
|
||||
|
||||
inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
|
||||
inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
|
||||
inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
|
||||
inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
|
||||
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
|
||||
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
|
||||
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
|
||||
inst.put(OP_ADD_BLOCK, new AddBlockOp());
|
||||
inst.put(OP_ADD_CACHE_DIRECTIVE,
|
||||
new AddCacheDirectiveInfoOp());
|
||||
inst.put(OP_MODIFY_CACHE_DIRECTIVE,
|
||||
new ModifyCacheDirectiveInfoOp());
|
||||
inst.put(OP_REMOVE_CACHE_DIRECTIVE,
|
||||
new RemoveCacheDirectiveInfoOp());
|
||||
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
|
||||
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
|
||||
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
|
||||
|
||||
inst.put(OP_SET_ACL, new SetAclOp());
|
||||
inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
|
||||
OP_ROLLING_UPGRADE_START, "start"));
|
||||
inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
|
||||
OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
|
||||
inst.put(OP_SET_XATTR, new SetXAttrOp());
|
||||
inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
|
||||
inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
|
||||
inst.put(OP_APPEND, new AppendOp());
|
||||
inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp());
|
||||
}
|
||||
|
||||
public FSEditLogOp get(FSEditLogOpCodes opcode) {
|
||||
return inst.get(opcode);
|
||||
}
|
||||
}
|
||||
|
||||
private static ImmutableMap<String, FsAction> fsActionMap() {
|
||||
ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
|
||||
for (FsAction v : FsAction.values())
|
||||
|
@ -776,7 +759,7 @@ public abstract class FSEditLogOp {
|
|||
* {@link ClientProtocol#append}
|
||||
*/
|
||||
static class AddOp extends AddCloseOp {
|
||||
private AddOp() {
|
||||
AddOp() {
|
||||
super(OP_ADD);
|
||||
}
|
||||
|
||||
|
@ -804,7 +787,7 @@ public abstract class FSEditLogOp {
|
|||
* finally log an AddOp.
|
||||
*/
|
||||
static class CloseOp extends AddCloseOp {
|
||||
private CloseOp() {
|
||||
CloseOp() {
|
||||
super(OP_CLOSE);
|
||||
}
|
||||
|
||||
|
@ -832,7 +815,7 @@ public abstract class FSEditLogOp {
|
|||
String clientMachine;
|
||||
boolean newBlock;
|
||||
|
||||
private AppendOp() {
|
||||
AppendOp() {
|
||||
super(OP_APPEND);
|
||||
}
|
||||
|
||||
|
@ -922,7 +905,7 @@ public abstract class FSEditLogOp {
|
|||
private Block penultimateBlock;
|
||||
private Block lastBlock;
|
||||
|
||||
private AddBlockOp() {
|
||||
AddBlockOp() {
|
||||
super(OP_ADD_BLOCK);
|
||||
}
|
||||
|
||||
|
@ -1034,7 +1017,7 @@ public abstract class FSEditLogOp {
|
|||
String path;
|
||||
Block[] blocks;
|
||||
|
||||
private UpdateBlocksOp() {
|
||||
UpdateBlocksOp() {
|
||||
super(OP_UPDATE_BLOCKS);
|
||||
}
|
||||
|
||||
|
@ -1128,7 +1111,7 @@ public abstract class FSEditLogOp {
|
|||
String path;
|
||||
short replication;
|
||||
|
||||
private SetReplicationOp() {
|
||||
SetReplicationOp() {
|
||||
super(OP_SET_REPLICATION);
|
||||
}
|
||||
|
||||
|
@ -1207,7 +1190,7 @@ public abstract class FSEditLogOp {
|
|||
long timestamp;
|
||||
final static public int MAX_CONCAT_SRC = 1024 * 1024;
|
||||
|
||||
private ConcatDeleteOp() {
|
||||
ConcatDeleteOp() {
|
||||
super(OP_CONCAT_DELETE);
|
||||
}
|
||||
|
||||
|
@ -1365,7 +1348,7 @@ public abstract class FSEditLogOp {
|
|||
String dst;
|
||||
long timestamp;
|
||||
|
||||
private RenameOldOp() {
|
||||
RenameOldOp() {
|
||||
super(OP_RENAME_OLD);
|
||||
}
|
||||
|
||||
|
@ -1477,7 +1460,7 @@ public abstract class FSEditLogOp {
|
|||
String path;
|
||||
long timestamp;
|
||||
|
||||
private DeleteOp() {
|
||||
DeleteOp() {
|
||||
super(OP_DELETE);
|
||||
}
|
||||
|
||||
|
@ -1578,7 +1561,7 @@ public abstract class FSEditLogOp {
|
|||
List<AclEntry> aclEntries;
|
||||
List<XAttr> xAttrs;
|
||||
|
||||
private MkdirOp() {
|
||||
MkdirOp() {
|
||||
super(OP_MKDIR);
|
||||
}
|
||||
|
||||
|
@ -1751,7 +1734,7 @@ public abstract class FSEditLogOp {
|
|||
static class SetGenstampV1Op extends FSEditLogOp {
|
||||
long genStampV1;
|
||||
|
||||
private SetGenstampV1Op() {
|
||||
SetGenstampV1Op() {
|
||||
super(OP_SET_GENSTAMP_V1);
|
||||
}
|
||||
|
||||
|
@ -1809,7 +1792,7 @@ public abstract class FSEditLogOp {
|
|||
static class SetGenstampV2Op extends FSEditLogOp {
|
||||
long genStampV2;
|
||||
|
||||
private SetGenstampV2Op() {
|
||||
SetGenstampV2Op() {
|
||||
super(OP_SET_GENSTAMP_V2);
|
||||
}
|
||||
|
||||
|
@ -1867,7 +1850,7 @@ public abstract class FSEditLogOp {
|
|||
static class AllocateBlockIdOp extends FSEditLogOp {
|
||||
long blockId;
|
||||
|
||||
private AllocateBlockIdOp() {
|
||||
AllocateBlockIdOp() {
|
||||
super(OP_ALLOCATE_BLOCK_ID);
|
||||
}
|
||||
|
||||
|
@ -1926,7 +1909,7 @@ public abstract class FSEditLogOp {
|
|||
String src;
|
||||
FsPermission permissions;
|
||||
|
||||
private SetPermissionsOp() {
|
||||
SetPermissionsOp() {
|
||||
super(OP_SET_PERMISSIONS);
|
||||
}
|
||||
|
||||
|
@ -1999,7 +1982,7 @@ public abstract class FSEditLogOp {
|
|||
String username;
|
||||
String groupname;
|
||||
|
||||
private SetOwnerOp() {
|
||||
SetOwnerOp() {
|
||||
super(OP_SET_OWNER);
|
||||
}
|
||||
|
||||
|
@ -2086,7 +2069,7 @@ public abstract class FSEditLogOp {
|
|||
String src;
|
||||
long nsQuota;
|
||||
|
||||
private SetNSQuotaOp() {
|
||||
SetNSQuotaOp() {
|
||||
super(OP_SET_NS_QUOTA);
|
||||
}
|
||||
|
||||
|
@ -2144,7 +2127,7 @@ public abstract class FSEditLogOp {
|
|||
static class ClearNSQuotaOp extends FSEditLogOp {
|
||||
String src;
|
||||
|
||||
private ClearNSQuotaOp() {
|
||||
ClearNSQuotaOp() {
|
||||
super(OP_CLEAR_NS_QUOTA);
|
||||
}
|
||||
|
||||
|
@ -2198,7 +2181,7 @@ public abstract class FSEditLogOp {
|
|||
long nsQuota;
|
||||
long dsQuota;
|
||||
|
||||
private SetQuotaOp() {
|
||||
SetQuotaOp() {
|
||||
super(OP_SET_QUOTA);
|
||||
}
|
||||
|
||||
|
@ -2283,7 +2266,7 @@ public abstract class FSEditLogOp {
|
|||
long dsQuota;
|
||||
StorageType type;
|
||||
|
||||
private SetQuotaByStorageTypeOp() {
|
||||
SetQuotaByStorageTypeOp() {
|
||||
super(OP_SET_QUOTA_BY_STORAGETYPE);
|
||||
}
|
||||
|
||||
|
@ -2366,7 +2349,7 @@ public abstract class FSEditLogOp {
|
|||
long mtime;
|
||||
long atime;
|
||||
|
||||
private TimesOp() {
|
||||
TimesOp() {
|
||||
super(OP_TIMES);
|
||||
}
|
||||
|
||||
|
@ -2475,7 +2458,7 @@ public abstract class FSEditLogOp {
|
|||
long atime;
|
||||
PermissionStatus permissionStatus;
|
||||
|
||||
private SymlinkOp() {
|
||||
SymlinkOp() {
|
||||
super(OP_SYMLINK);
|
||||
}
|
||||
|
||||
|
@ -2634,7 +2617,7 @@ public abstract class FSEditLogOp {
|
|||
long timestamp;
|
||||
Rename[] options;
|
||||
|
||||
private RenameOp() {
|
||||
RenameOp() {
|
||||
super(OP_RENAME);
|
||||
}
|
||||
|
||||
|
@ -2799,7 +2782,7 @@ public abstract class FSEditLogOp {
|
|||
long timestamp;
|
||||
Block truncateBlock;
|
||||
|
||||
private TruncateOp() {
|
||||
TruncateOp() {
|
||||
super(OP_TRUNCATE);
|
||||
}
|
||||
|
||||
|
@ -2932,7 +2915,7 @@ public abstract class FSEditLogOp {
|
|||
String path;
|
||||
String newHolder;
|
||||
|
||||
private ReassignLeaseOp() {
|
||||
ReassignLeaseOp() {
|
||||
super(OP_REASSIGN_LEASE);
|
||||
}
|
||||
|
||||
|
@ -3014,7 +2997,7 @@ public abstract class FSEditLogOp {
|
|||
DelegationTokenIdentifier token;
|
||||
long expiryTime;
|
||||
|
||||
private GetDelegationTokenOp() {
|
||||
GetDelegationTokenOp() {
|
||||
super(OP_GET_DELEGATION_TOKEN);
|
||||
}
|
||||
|
||||
|
@ -3093,7 +3076,7 @@ public abstract class FSEditLogOp {
|
|||
DelegationTokenIdentifier token;
|
||||
long expiryTime;
|
||||
|
||||
private RenewDelegationTokenOp() {
|
||||
RenewDelegationTokenOp() {
|
||||
super(OP_RENEW_DELEGATION_TOKEN);
|
||||
}
|
||||
|
||||
|
@ -3171,7 +3154,7 @@ public abstract class FSEditLogOp {
|
|||
static class CancelDelegationTokenOp extends FSEditLogOp {
|
||||
DelegationTokenIdentifier token;
|
||||
|
||||
private CancelDelegationTokenOp() {
|
||||
CancelDelegationTokenOp() {
|
||||
super(OP_CANCEL_DELEGATION_TOKEN);
|
||||
}
|
||||
|
||||
|
@ -3230,7 +3213,7 @@ public abstract class FSEditLogOp {
|
|||
static class UpdateMasterKeyOp extends FSEditLogOp {
|
||||
DelegationKey key;
|
||||
|
||||
private UpdateMasterKeyOp() {
|
||||
UpdateMasterKeyOp() {
|
||||
super(OP_UPDATE_MASTER_KEY);
|
||||
}
|
||||
|
||||
|
@ -3335,8 +3318,20 @@ public abstract class FSEditLogOp {
|
|||
}
|
||||
}
|
||||
|
||||
static class StartLogSegmentOp extends LogSegmentOp {
|
||||
StartLogSegmentOp() {
|
||||
super(OP_START_LOG_SEGMENT);
|
||||
}
|
||||
}
|
||||
|
||||
static class EndLogSegmentOp extends LogSegmentOp {
|
||||
EndLogSegmentOp() {
|
||||
super(OP_END_LOG_SEGMENT);
|
||||
}
|
||||
}
|
||||
|
||||
static class InvalidOp extends FSEditLogOp {
|
||||
private InvalidOp() {
|
||||
InvalidOp() {
|
||||
super(OP_INVALID);
|
||||
}
|
||||
|
||||
|
@ -4147,7 +4142,7 @@ public abstract class FSEditLogOp {
|
|||
List<XAttr> xAttrs;
|
||||
String src;
|
||||
|
||||
private RemoveXAttrOp() {
|
||||
RemoveXAttrOp() {
|
||||
super(OP_REMOVE_XATTR);
|
||||
}
|
||||
|
||||
|
@ -4200,7 +4195,7 @@ public abstract class FSEditLogOp {
|
|||
List<XAttr> xAttrs;
|
||||
String src;
|
||||
|
||||
private SetXAttrOp() {
|
||||
SetXAttrOp() {
|
||||
super(OP_SET_XATTR);
|
||||
}
|
||||
|
||||
|
@ -4253,7 +4248,7 @@ public abstract class FSEditLogOp {
|
|||
List<AclEntry> aclEntries = Lists.newArrayList();
|
||||
String src;
|
||||
|
||||
private SetAclOp() {
|
||||
SetAclOp() {
|
||||
super(OP_SET_ACL);
|
||||
}
|
||||
|
||||
|
@ -4350,7 +4345,7 @@ public abstract class FSEditLogOp {
|
|||
/**
|
||||
* Operation corresponding to upgrade
|
||||
*/
|
||||
static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
|
||||
abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
|
||||
private final String name;
|
||||
private long time;
|
||||
|
||||
|
@ -4417,7 +4412,7 @@ public abstract class FSEditLogOp {
|
|||
String path;
|
||||
byte policyId;
|
||||
|
||||
private SetStoragePolicyOp() {
|
||||
SetStoragePolicyOp() {
|
||||
super(OP_SET_STORAGE_POLICY);
|
||||
}
|
||||
|
||||
|
@ -4483,6 +4478,26 @@ public abstract class FSEditLogOp {
|
|||
}
|
||||
}
|
||||
|
||||
static class RollingUpgradeStartOp extends RollingUpgradeOp {
|
||||
RollingUpgradeStartOp() {
|
||||
super(OP_ROLLING_UPGRADE_START, "start");
|
||||
}
|
||||
|
||||
static RollingUpgradeStartOp getInstance(OpInstanceCache cache) {
|
||||
return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START);
|
||||
}
|
||||
}
|
||||
|
||||
static class RollingUpgradeFinalizeOp extends RollingUpgradeOp {
|
||||
RollingUpgradeFinalizeOp() {
|
||||
super(OP_ROLLING_UPGRADE_FINALIZE, "finalize");
|
||||
}
|
||||
|
||||
static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
|
||||
return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class for writing editlog ops
|
||||
*/
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
|
||||
|
||||
/**
|
||||
* Op codes for edits file
|
||||
|
@ -27,60 +28,64 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Unstable
|
||||
public enum FSEditLogOpCodes {
|
||||
// last op code in file
|
||||
OP_ADD ((byte) 0),
|
||||
OP_RENAME_OLD ((byte) 1), // deprecated operation
|
||||
OP_DELETE ((byte) 2),
|
||||
OP_MKDIR ((byte) 3),
|
||||
OP_SET_REPLICATION ((byte) 4),
|
||||
OP_ADD ((byte) 0, AddOp.class),
|
||||
// deprecated operation
|
||||
OP_RENAME_OLD ((byte) 1, RenameOldOp.class),
|
||||
OP_DELETE ((byte) 2, DeleteOp.class),
|
||||
OP_MKDIR ((byte) 3, MkdirOp.class),
|
||||
OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class),
|
||||
@Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete
|
||||
@Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete
|
||||
OP_SET_PERMISSIONS ((byte) 7),
|
||||
OP_SET_OWNER ((byte) 8),
|
||||
OP_CLOSE ((byte) 9),
|
||||
OP_SET_GENSTAMP_V1 ((byte) 10),
|
||||
OP_SET_NS_QUOTA ((byte) 11), // obsolete
|
||||
OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete
|
||||
OP_TIMES ((byte) 13), // set atime, mtime
|
||||
OP_SET_QUOTA ((byte) 14),
|
||||
OP_RENAME ((byte) 15), // filecontext rename
|
||||
OP_CONCAT_DELETE ((byte) 16), // concat files
|
||||
OP_SYMLINK ((byte) 17),
|
||||
OP_GET_DELEGATION_TOKEN ((byte) 18),
|
||||
OP_RENEW_DELEGATION_TOKEN ((byte) 19),
|
||||
OP_CANCEL_DELEGATION_TOKEN ((byte) 20),
|
||||
OP_UPDATE_MASTER_KEY ((byte) 21),
|
||||
OP_REASSIGN_LEASE ((byte) 22),
|
||||
OP_END_LOG_SEGMENT ((byte) 23),
|
||||
OP_START_LOG_SEGMENT ((byte) 24),
|
||||
OP_UPDATE_BLOCKS ((byte) 25),
|
||||
OP_CREATE_SNAPSHOT ((byte) 26),
|
||||
OP_DELETE_SNAPSHOT ((byte) 27),
|
||||
OP_RENAME_SNAPSHOT ((byte) 28),
|
||||
OP_ALLOW_SNAPSHOT ((byte) 29),
|
||||
OP_DISALLOW_SNAPSHOT ((byte) 30),
|
||||
OP_SET_GENSTAMP_V2 ((byte) 31),
|
||||
OP_ALLOCATE_BLOCK_ID ((byte) 32),
|
||||
OP_ADD_BLOCK ((byte) 33),
|
||||
OP_ADD_CACHE_DIRECTIVE ((byte) 34),
|
||||
OP_REMOVE_CACHE_DIRECTIVE ((byte) 35),
|
||||
OP_ADD_CACHE_POOL ((byte) 36),
|
||||
OP_MODIFY_CACHE_POOL ((byte) 37),
|
||||
OP_REMOVE_CACHE_POOL ((byte) 38),
|
||||
OP_MODIFY_CACHE_DIRECTIVE ((byte) 39),
|
||||
OP_SET_ACL ((byte) 40),
|
||||
OP_ROLLING_UPGRADE_START ((byte) 41),
|
||||
OP_ROLLING_UPGRADE_FINALIZE ((byte) 42),
|
||||
OP_SET_XATTR ((byte) 43),
|
||||
OP_REMOVE_XATTR ((byte) 44),
|
||||
OP_SET_STORAGE_POLICY ((byte) 45),
|
||||
OP_TRUNCATE ((byte) 46),
|
||||
OP_APPEND ((byte) 47),
|
||||
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48),
|
||||
OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class),
|
||||
OP_SET_OWNER ((byte) 8, SetOwnerOp.class),
|
||||
OP_CLOSE ((byte) 9, CloseOp.class),
|
||||
OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class),
|
||||
OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete
|
||||
OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete
|
||||
OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime
|
||||
OP_SET_QUOTA ((byte) 14, SetQuotaOp.class),
|
||||
// filecontext rename
|
||||
OP_RENAME ((byte) 15, RenameOp.class),
|
||||
// concat files
|
||||
OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class),
|
||||
OP_SYMLINK ((byte) 17, SymlinkOp.class),
|
||||
OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class),
|
||||
OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class),
|
||||
OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class),
|
||||
OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class),
|
||||
OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class),
|
||||
OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class),
|
||||
OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class),
|
||||
OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class),
|
||||
OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class),
|
||||
OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class),
|
||||
OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class),
|
||||
OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class),
|
||||
OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class),
|
||||
OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class),
|
||||
OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class),
|
||||
OP_ADD_BLOCK ((byte) 33, AddBlockOp.class),
|
||||
OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class),
|
||||
OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class),
|
||||
OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class),
|
||||
OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class),
|
||||
OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class),
|
||||
OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class),
|
||||
OP_SET_ACL ((byte) 40, SetAclOp.class),
|
||||
OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class),
|
||||
OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class),
|
||||
OP_SET_XATTR ((byte) 43, SetXAttrOp.class),
|
||||
OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class),
|
||||
OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class),
|
||||
OP_TRUNCATE ((byte) 46, TruncateOp.class),
|
||||
OP_APPEND ((byte) 47, AppendOp.class),
|
||||
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),
|
||||
|
||||
// Note that the current range of the valid OP code is 0~127
|
||||
OP_INVALID ((byte) -1);
|
||||
|
||||
private final byte opCode;
|
||||
private final Class<? extends FSEditLogOp> opClass;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -88,7 +93,12 @@ public enum FSEditLogOpCodes {
|
|||
* @param opCode byte value of constructed enum
|
||||
*/
|
||||
FSEditLogOpCodes(byte opCode) {
|
||||
this(opCode, null);
|
||||
}
|
||||
|
||||
FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {
|
||||
this.opCode = opCode;
|
||||
this.opClass = opClass;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -100,6 +110,10 @@ public enum FSEditLogOpCodes {
|
|||
return opCode;
|
||||
}
|
||||
|
||||
public Class<? extends FSEditLogOp> getOpClass() {
|
||||
return opClass;
|
||||
}
|
||||
|
||||
private static final FSEditLogOpCodes[] VALUES;
|
||||
|
||||
static {
|
||||
|
|
|
@ -140,7 +140,7 @@ public class FSImage implements Closeable {
|
|||
storage.setRestoreFailedStorage(true);
|
||||
}
|
||||
|
||||
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
||||
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
|
||||
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
||||
}
|
||||
|
||||
|
|
|
@ -1275,7 +1275,6 @@ public class NameNode implements NameNodeStatusMXBean {
|
|||
newSharedEditLog.logEdit(op);
|
||||
|
||||
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
|
||||
newSharedEditLog.logSync();
|
||||
newSharedEditLog.endCurrentLogSegment(false);
|
||||
LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
|
||||
+ stream);
|
||||
|
|
|
@ -295,8 +295,8 @@ public class NameNodeMetrics {
|
|||
transactions.add(latency);
|
||||
}
|
||||
|
||||
public void incrTransactionsBatchedInSync() {
|
||||
transactionsBatchedInSync.incr();
|
||||
public void incrTransactionsBatchedInSync(long count) {
|
||||
transactionsBatchedInSync.incr(count);
|
||||
}
|
||||
|
||||
public void addSync(long elapsed) {
|
||||
|
|
|
@ -267,6 +267,9 @@ public class DFSTestUtil {
|
|||
}
|
||||
|
||||
public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
|
||||
// spies are shallow copies, must allow async log to restart its thread
|
||||
// so it has the new copy
|
||||
newLog.restart();
|
||||
Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
|
||||
Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
|
||||
}
|
||||
|
|
|
@ -71,17 +71,21 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
public class TestAuditLogs {
|
||||
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
|
||||
final boolean useAsyncLog;
|
||||
|
||||
final boolean useAsyncEdits;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{new Boolean(false)});
|
||||
params.add(new Object[]{new Boolean(true)});
|
||||
params.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
|
||||
params.add(new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
params.add(new Object[]{Boolean.FALSE, Boolean.TRUE});
|
||||
params.add(new Object[]{Boolean.TRUE, Boolean.TRUE});
|
||||
return params;
|
||||
}
|
||||
|
||||
public TestAuditLogs(boolean useAsyncLog) {
|
||||
|
||||
public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
|
||||
this.useAsyncLog = useAsyncLog;
|
||||
this.useAsyncEdits = useAsyncEdits;
|
||||
}
|
||||
|
||||
// Pattern for:
|
||||
|
@ -119,6 +123,7 @@ public class TestAuditLogs {
|
|||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
|
||||
conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits);
|
||||
util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
|
||||
setNumFiles(20).build();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||
|
|
|
@ -88,6 +88,9 @@ import org.apache.log4j.AppenderSkeleton;
|
|||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.Mockito;
|
||||
import org.xml.sax.ContentHandler;
|
||||
import org.xml.sax.SAXException;
|
||||
|
@ -98,12 +101,33 @@ import com.google.common.collect.Lists;
|
|||
/**
|
||||
* This class tests the creation and validation of a checkpoint.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEditLog {
|
||||
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestEditLog(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
public static Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* A garbage mkdir op which is used for testing
|
||||
* {@link EditLogFileInputStream#scanEditLog(File)}
|
||||
|
@ -225,11 +249,12 @@ public class TestEditLog {
|
|||
* @param storage Storage object used by namenode
|
||||
*/
|
||||
private static FSEditLog getFSEditLog(NNStorage storage) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
Configuration conf = getConf();
|
||||
// Make sure the edits dirs are set in the provided configuration object.
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
|
||||
StringUtils.join(",", storage.getEditsDirectories()));
|
||||
FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
FSEditLog log = FSEditLog.newInstance(
|
||||
conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
return log;
|
||||
}
|
||||
|
||||
|
@ -252,7 +277,7 @@ public class TestEditLog {
|
|||
*/
|
||||
@Test
|
||||
public void testPreTxidEditLogWithEdits() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
|
@ -282,7 +307,7 @@ public class TestEditLog {
|
|||
@Test
|
||||
public void testSimpleEditLog() throws IOException {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
try {
|
||||
|
@ -351,7 +376,7 @@ public class TestEditLog {
|
|||
private void testEditLog(int initialSize) throws IOException {
|
||||
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
|
||||
|
@ -482,8 +507,12 @@ public class TestEditLog {
|
|||
|
||||
@Test
|
||||
public void testSyncBatching() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
if (useAsyncEditLog) {
|
||||
// semantics are completely differently since edits will be auto-synced
|
||||
return;
|
||||
}
|
||||
// start a cluster
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
ExecutorService threadA = Executors.newSingleThreadExecutor();
|
||||
|
@ -546,7 +575,7 @@ public class TestEditLog {
|
|||
@Test
|
||||
public void testBatchedSyncWithClosedLogs() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
ExecutorService threadA = Executors.newSingleThreadExecutor();
|
||||
|
@ -585,7 +614,7 @@ public class TestEditLog {
|
|||
@Test
|
||||
public void testEditChecksum() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||
|
@ -657,7 +686,7 @@ public class TestEditLog {
|
|||
*/
|
||||
private void testCrashRecovery(int numTransactions) throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
|
||||
CHECKPOINT_ON_STARTUP_MIN_TXNS);
|
||||
|
||||
|
@ -802,7 +831,7 @@ public class TestEditLog {
|
|||
boolean updateTransactionIdFile, boolean shouldSucceed)
|
||||
throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(NUM_DATA_NODES).build();
|
||||
|
@ -1133,7 +1162,7 @@ public class TestEditLog {
|
|||
public static NNStorage setupEdits(List<URI> editUris, int numrolls,
|
||||
boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException {
|
||||
List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
|
||||
NNStorage storage = new NNStorage(new Configuration(),
|
||||
NNStorage storage = new NNStorage(getConf(),
|
||||
Collections.<URI>emptyList(),
|
||||
editUris);
|
||||
storage.format(new NamespaceInfo());
|
||||
|
@ -1295,7 +1324,7 @@ public class TestEditLog {
|
|||
EditLogFileOutputStream elfos = null;
|
||||
EditLogFileInputStream elfis = null;
|
||||
try {
|
||||
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
|
||||
elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
|
||||
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||
elfos.writeRaw(garbage, 0, garbage.length);
|
||||
elfos.setReadyToFlush();
|
||||
|
@ -1471,7 +1500,7 @@ public class TestEditLog {
|
|||
public void testManyEditLogSegments() throws IOException {
|
||||
final int NUM_EDIT_LOG_ROLLS = 1000;
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
try {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.net.BindException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
|
||||
|
@ -30,18 +32,40 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEditLogAutoroll {
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestEditLogAutoroll(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
|
@ -61,6 +85,8 @@ public class TestEditLogAutoroll {
|
|||
// Make it autoroll after 10 edits
|
||||
conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f);
|
||||
conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
|
||||
int retryCount = 0;
|
||||
while (true) {
|
||||
|
|
|
@ -21,12 +21,13 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -43,13 +44,37 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEditLogJournalFailures {
|
||||
|
||||
private int editsPerformed = 0;
|
||||
private MiniDFSCluster cluster;
|
||||
private FileSystem fs;
|
||||
private boolean useAsyncEdits;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{Boolean.FALSE});
|
||||
params.add(new Object[]{Boolean.TRUE});
|
||||
return params;
|
||||
}
|
||||
|
||||
public TestEditLogJournalFailures(boolean useAsyncEdits) {
|
||||
this.useAsyncEdits = useAsyncEdits;
|
||||
}
|
||||
|
||||
private Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEdits);
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the mini cluster for testing and sub in a custom runtime so that
|
||||
|
@ -57,9 +82,9 @@ public class TestEditLogJournalFailures {
|
|||
*/
|
||||
@Before
|
||||
public void setUpMiniCluster() throws IOException {
|
||||
setUpMiniCluster(new HdfsConfiguration(), true);
|
||||
setUpMiniCluster(getConf(), true);
|
||||
}
|
||||
|
||||
|
||||
public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs)
|
||||
throws IOException {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||
|
@ -153,7 +178,7 @@ public class TestEditLogJournalFailures {
|
|||
String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings(
|
||||
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
||||
shutDownMiniCluster();
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
|
||||
|
@ -193,7 +218,7 @@ public class TestEditLogJournalFailures {
|
|||
throws IOException {
|
||||
// Set up 4 name/edits dirs.
|
||||
shutDownMiniCluster();
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
String[] nameDirs = new String[4];
|
||||
for (int i = 0; i < nameDirs.length; i++) {
|
||||
File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i);
|
||||
|
|
|
@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer;
|
|||
* This class tests various synchronization bugs in FSEditLog rolling
|
||||
* and namespace saving.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEditLogRace {
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ false });
|
||||
params.add(new Object[]{ true });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static final String NAME_DIR =
|
||||
MiniDFSCluster.getBaseDirectory() + "name1";
|
||||
private static boolean useAsyncEditLog;
|
||||
|
||||
public TestEditLogRace(boolean useAsyncEditLog) {
|
||||
TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
|
||||
|
||||
// This test creates NUM_THREADS threads and each thread continuously writes
|
||||
// transactions
|
||||
|
@ -94,21 +113,29 @@ public class TestEditLogRace {
|
|||
* This value needs to be significantly longer than the average
|
||||
* time for an fsync() or enterSafeMode().
|
||||
*/
|
||||
private static final int BLOCK_TIME = 10;
|
||||
|
||||
private static final int BLOCK_TIME = 4; // 4 sec pretty generous
|
||||
|
||||
//
|
||||
// an object that does a bunch of transactions
|
||||
//
|
||||
static class Transactions implements Runnable {
|
||||
final NamenodeProtocols nn;
|
||||
final MiniDFSCluster cluster;
|
||||
FileSystem fs;
|
||||
short replication = 3;
|
||||
long blockSize = 64;
|
||||
volatile boolean stopped = false;
|
||||
volatile Thread thr;
|
||||
final AtomicReference<Throwable> caught;
|
||||
|
||||
Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
|
||||
nn = ns;
|
||||
Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) {
|
||||
this.cluster = cluster;
|
||||
this.nn = cluster.getNameNodeRpc();
|
||||
try {
|
||||
this.fs = cluster.getFileSystem();
|
||||
} catch (IOException e) {
|
||||
caught.set(e);
|
||||
}
|
||||
this.caught = caught;
|
||||
}
|
||||
|
||||
|
@ -122,11 +149,23 @@ public class TestEditLogRace {
|
|||
while (!stopped) {
|
||||
try {
|
||||
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
|
||||
nn.mkdirs(dirname, p, true);
|
||||
nn.delete(dirname, true);
|
||||
if (i % 2 == 0) {
|
||||
Path dirnamePath = new Path(dirname);
|
||||
fs.mkdirs(dirnamePath);
|
||||
fs.delete(dirnamePath, true);
|
||||
} else {
|
||||
nn.mkdirs(dirname, p, true);
|
||||
nn.delete(dirname, true);
|
||||
}
|
||||
} catch (SafeModeException sme) {
|
||||
// This is OK - the tests will bring NN in and out of safemode
|
||||
} catch (Throwable e) {
|
||||
// This is OK - the tests will bring NN in and out of safemode
|
||||
if (e instanceof RemoteException &&
|
||||
((RemoteException)e).getClassName()
|
||||
.contains("SafeModeException")) {
|
||||
return;
|
||||
}
|
||||
LOG.warn("Got error in transaction thread", e);
|
||||
caught.compareAndSet(null, e);
|
||||
break;
|
||||
|
@ -144,11 +183,11 @@ public class TestEditLogRace {
|
|||
}
|
||||
}
|
||||
|
||||
private void startTransactionWorkers(NamenodeProtocols namesystem,
|
||||
private void startTransactionWorkers(MiniDFSCluster cluster,
|
||||
AtomicReference<Throwable> caughtErr) {
|
||||
// Create threads and make them run transactions concurrently.
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
Transactions trans = new Transactions(namesystem, caughtErr);
|
||||
Transactions trans = new Transactions(cluster, caughtErr);
|
||||
new Thread(trans, "TransactionThread-" + i).start();
|
||||
workers.add(trans);
|
||||
}
|
||||
|
@ -174,21 +213,21 @@ public class TestEditLogRace {
|
|||
@Test
|
||||
public void testEditLogRolling() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
Configuration conf = getConf();
|
||||
final MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||
FileSystem fileSys = null;
|
||||
|
||||
|
||||
AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||
cluster.waitActive();
|
||||
fileSys = cluster.getFileSystem();
|
||||
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
|
||||
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
||||
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
||||
|
||||
startTransactionWorkers(nn, caughtErr);
|
||||
startTransactionWorkers(cluster, caughtErr);
|
||||
|
||||
long previousLogTxId = 1;
|
||||
|
||||
|
@ -256,7 +295,7 @@ public class TestEditLogRace {
|
|||
@Test
|
||||
public void testSaveNamespace() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
|
||||
|
@ -266,12 +305,11 @@ public class TestEditLogRace {
|
|||
cluster.waitActive();
|
||||
fileSys = cluster.getFileSystem();
|
||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||
final NamenodeProtocols nn = cluster.getNameNodeRpc();
|
||||
|
||||
FSImage fsimage = namesystem.getFSImage();
|
||||
FSEditLog editLog = fsimage.getEditLog();
|
||||
|
||||
startTransactionWorkers(nn, caughtErr);
|
||||
startTransactionWorkers(cluster, caughtErr);
|
||||
|
||||
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
|
||||
try {
|
||||
|
@ -321,11 +359,13 @@ public class TestEditLogRace {
|
|||
|
||||
private Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||
//conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
|
||||
//conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -389,7 +429,7 @@ public class TestEditLogRace {
|
|||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
LOG.info("Flush called");
|
||||
if (Thread.currentThread() == doAnEditThread) {
|
||||
if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) {
|
||||
LOG.info("edit thread: Telling main thread we made it to flush section...");
|
||||
// Signal to main thread that the edit thread is in the racy section
|
||||
waitToEnterFlush.countDown();
|
||||
|
@ -457,62 +497,52 @@ public class TestEditLogRace {
|
|||
|
||||
try {
|
||||
FSImage fsimage = namesystem.getFSImage();
|
||||
FSEditLog editLog = spy(fsimage.getEditLog());
|
||||
DFSTestUtil.setEditLogForTesting(namesystem, editLog);
|
||||
final FSEditLog editLog = fsimage.getEditLog();
|
||||
|
||||
final AtomicReference<Throwable> deferredException =
|
||||
new AtomicReference<Throwable>();
|
||||
final CountDownLatch waitToEnterSync = new CountDownLatch(1);
|
||||
|
||||
final CountDownLatch sleepingBeforeSync = new CountDownLatch(1);
|
||||
|
||||
final Thread doAnEditThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting mkdirs");
|
||||
namesystem.mkdirs("/test",
|
||||
new PermissionStatus("test","test", new FsPermission((short)00755)),
|
||||
true);
|
||||
LOG.info("mkdirs complete");
|
||||
LOG.info("Starting setOwner");
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
editLog.logSetOwner("/","test","test");
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
sleepingBeforeSync.countDown();
|
||||
LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
|
||||
Thread.sleep(BLOCK_TIME*1000);
|
||||
editLog.logSync();
|
||||
LOG.info("edit thread: logSync complete");
|
||||
} catch (Throwable ioe) {
|
||||
LOG.fatal("Got exception", ioe);
|
||||
deferredException.set(ioe);
|
||||
waitToEnterSync.countDown();
|
||||
sleepingBeforeSync.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Answer<Void> blockingSync = new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
LOG.info("logSync called");
|
||||
if (Thread.currentThread() == doAnEditThread) {
|
||||
LOG.info("edit thread: Telling main thread we made it just before logSync...");
|
||||
waitToEnterSync.countDown();
|
||||
LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
|
||||
Thread.sleep(BLOCK_TIME*1000);
|
||||
LOG.info("Going through to logSync. This will allow the main thread to continue.");
|
||||
}
|
||||
invocation.callRealMethod();
|
||||
LOG.info("logSync complete");
|
||||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(blockingSync).when(editLog).logSync();
|
||||
|
||||
doAnEditThread.setDaemon(true);
|
||||
doAnEditThread.start();
|
||||
LOG.info("Main thread: waiting to just before logSync...");
|
||||
waitToEnterSync.await();
|
||||
sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS);
|
||||
assertNull(deferredException.get());
|
||||
LOG.info("Main thread: detected that logSync about to be called.");
|
||||
LOG.info("Trying to enter safe mode.");
|
||||
LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
|
||||
|
||||
|
||||
long st = Time.now();
|
||||
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
long et = Time.now();
|
||||
LOG.info("Entered safe mode");
|
||||
// Make sure we really waited for the flush to complete!
|
||||
assertTrue(et - st > (BLOCK_TIME - 1)*1000);
|
||||
LOG.info("Entered safe mode after "+(et-st)+"ms");
|
||||
|
||||
// Make sure we didn't wait for the thread that did a logEdit but
|
||||
// not logSync. Going into safemode does a logSyncAll that will flush
|
||||
// its edit.
|
||||
assertTrue(et - st < (BLOCK_TIME/2)*1000);
|
||||
|
||||
// Once we're in safe mode, save namespace.
|
||||
namesystem.saveNamespace();
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
|
@ -51,25 +53,48 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Files;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestFSEditLogLoader {
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestFSEditLogLoader(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
private static Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
return conf;
|
||||
}
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
|
||||
GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
|
||||
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
|
||||
|
||||
private static final int NUM_DATA_NODES = 0;
|
||||
|
||||
|
||||
@Test
|
||||
public void testDisplayRecentEditLogOpCodes() throws IOException {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
// start a cluster
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
|
||||
|
@ -119,7 +144,7 @@ public class TestFSEditLogLoader {
|
|||
@Test
|
||||
public void testReplicationAdjusted() throws Exception {
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
// Replicate and heartbeat fast to shave a few seconds off test
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
|
|
|
@ -27,6 +27,8 @@ import static org.mockito.Mockito.spy;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -50,13 +52,38 @@ import org.apache.hadoop.io.IOUtils;
|
|||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* This tests data recovery mode for the NameNode.
|
||||
*/
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestNameNodeRecovery {
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestNameNodeRecovery(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
private static Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
return conf;
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
|
||||
private static final StartupOption recoverStartOpt = StartupOption.RECOVER;
|
||||
private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class);
|
||||
|
@ -73,7 +100,7 @@ public class TestNameNodeRecovery {
|
|||
EditLogFileOutputStream elfos = null;
|
||||
EditLogFileInputStream elfis = null;
|
||||
try {
|
||||
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
|
||||
elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
|
||||
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||
|
||||
elts.addTransactionsToLog(elfos, cache);
|
||||
|
@ -519,7 +546,7 @@ public class TestNameNodeRecovery {
|
|||
final boolean needRecovery = corruptor.needRecovery(finalize);
|
||||
|
||||
// start a cluster
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
setupRecoveryTestConf(conf);
|
||||
MiniDFSCluster cluster = null;
|
||||
FileSystem fileSys = null;
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.BindException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.HAUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
@ -43,11 +46,31 @@ import org.apache.hadoop.net.ServerSocketUtil;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestEditLogTailer {
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{ Boolean.FALSE });
|
||||
params.add(new Object[]{ Boolean.TRUE });
|
||||
return params;
|
||||
}
|
||||
|
||||
private static boolean useAsyncEditLog;
|
||||
public TestEditLogTailer(Boolean async) {
|
||||
useAsyncEditLog = async;
|
||||
}
|
||||
|
||||
private static final String DIR_PREFIX = "/dir";
|
||||
private static final int DIRS_TO_MAKE = 20;
|
||||
static final long SLEEP_TIME = 1000;
|
||||
|
@ -55,13 +78,21 @@ public class TestEditLogTailer {
|
|||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
|
||||
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
|
||||
GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
|
||||
private static Configuration getConf() {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
|
||||
useAsyncEditLog);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTailer() throws IOException, InterruptedException,
|
||||
ServiceFailedException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
|
@ -119,7 +150,7 @@ public class TestEditLogTailer {
|
|||
|
||||
private static void testStandbyTriggersLogRolls(int activeIndex)
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
Configuration conf = getConf();
|
||||
// Roll every 1s
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
|
|
Loading…
Reference in New Issue