HDFS-4677. Editlog should support synchronous writes. Contributed by Ivan Mitic.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1491096 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ivan Mitic 2013-06-08 23:28:37 +00:00
parent d00d968c49
commit cb11d68f40
16 changed files with 114 additions and 46 deletions

View File

@ -696,6 +696,8 @@ Release 2.1.0-beta - UNRELEASED
HDFS-4610. Use common utils FileUtil#setReadable/Writable/Executable and HDFS-4610. Use common utils FileUtil#setReadable/Writable/Executable and
FileUtil#canRead/Write/Execute. (Ivan Mitic via suresh) FileUtil#canRead/Write/Execute. (Ivan Mitic via suresh)
HDFS-4677. Editlog should support synchronous writes. (ivanmi)
BREAKDOWN OF HDFS-2802 HDFS SNAPSHOT SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-2802 HDFS SNAPSHOT SUBTASKS AND RELATED JIRAS
HDFS-4076. Support snapshot of single files. (szetszwo) HDFS-4076. Support snapshot of single files. (szetszwo)

View File

@ -176,6 +176,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum"; public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1; public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated"; public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -56,16 +57,18 @@ class JNStorage extends Storage {
ImmutableList.of(Pattern.compile("(\\d+)")); ImmutableList.of(Pattern.compile("(\\d+)"));
/** /**
* @param conf Configuration object
* @param logDir the path to the directory in which data will be stored * @param logDir the path to the directory in which data will be stored
* @param errorReporter a callback to report errors * @param errorReporter a callback to report errors
* @throws IOException * @throws IOException
*/ */
protected JNStorage(File logDir, StorageErrorReporter errorReporter) throws IOException { protected JNStorage(Configuration conf, File logDir,
StorageErrorReporter errorReporter) throws IOException {
super(NodeType.JOURNAL_NODE); super(NodeType.JOURNAL_NODE);
sd = new StorageDirectory(logDir); sd = new StorageDirectory(logDir);
this.addStorageDir(sd); this.addStorageDir(sd);
this.fjm = new FileJournalManager(sd, errorReporter); this.fjm = new FileJournalManager(conf, sd, errorReporter);
analyzeStorage(); analyzeStorage();
} }

View File

@ -32,6 +32,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
@ -133,9 +134,9 @@ class Journal implements Closeable {
*/ */
private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000; private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;
Journal(File logDir, String journalId, Journal(Configuration conf, File logDir, String journalId,
StorageErrorReporter errorReporter) throws IOException { StorageErrorReporter errorReporter) throws IOException {
storage = new JNStorage(logDir, errorReporter); storage = new JNStorage(conf, logDir, errorReporter);
this.journalId = journalId; this.journalId = journalId;
refreshCachedData(); refreshCachedData();

View File

@ -76,7 +76,7 @@ public class JournalNode implements Tool, Configurable {
if (journal == null) { if (journal == null) {
File logDir = getLogDir(jid); File logDir = getLogDir(jid);
LOG.info("Initializing journal in directory " + logDir); LOG.info("Initializing journal in directory " + logDir);
journal = new Journal(logDir, jid, new ErrorReporter()); journal = new Journal(conf, logDir, jid, new ErrorReporter());
journalsById.put(jid, journal); journalsById.put(jid, journal);
} }

View File

@ -29,6 +29,8 @@ import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -48,6 +50,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
private FileChannel fc; // channel of the file stream for sync private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf; private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH); static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
private boolean shouldSyncWritesAndSkipFsync = false;
private static boolean shouldSkipFsyncForTests = false; private static boolean shouldSkipFsyncForTests = false;
@ -61,17 +64,29 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
/** /**
* Creates output buffers and file object. * Creates output buffers and file object.
* *
* @param conf
* Configuration object
* @param name * @param name
* File name to store edit log * File name to store edit log
* @param size * @param size
* Size of flush buffer * Size of flush buffer
* @throws IOException * @throws IOException
*/ */
public EditLogFileOutputStream(File name, int size) throws IOException { public EditLogFileOutputStream(Configuration conf, File name, int size)
throws IOException {
super(); super();
shouldSyncWritesAndSkipFsync = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH,
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT);
file = name; file = name;
doubleBuf = new EditsDoubleBuffer(size); doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp = new RandomAccessFile(name, "rw"); RandomAccessFile rp;
if (shouldSyncWritesAndSkipFsync) {
rp = new RandomAccessFile(name, "rws");
} else {
rp = new RandomAccessFile(name, "rw");
}
fp = new FileOutputStream(rp.getFD()); // open for append fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel(); fc = rp.getChannel();
fc.position(fc.size()); fc.position(fc.size());
@ -182,9 +197,9 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
LOG.info("Nothing to flush"); LOG.info("Nothing to flush");
return; return;
} }
preallocate(); // preallocate file if necessay preallocate(); // preallocate file if necessary
doubleBuf.flushTo(fp); doubleBuf.flushTo(fp);
if (durable && !shouldSkipFsyncForTests) { if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
fc.force(false); // metadata updates not needed fc.force(false); // metadata updates not needed
} }
} }

View File

@ -245,7 +245,7 @@ public class FSEditLog implements LogsPurgeable {
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u); StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) { if (sd != null) {
journalSet.add(new FileJournalManager(sd, storage), required); journalSet.add(new FileJournalManager(conf, sd, storage), required);
} }
} else { } else {
journalSet.add(createJournal(u), required); journalSet.add(createJournal(u), required);

View File

@ -30,6 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -57,6 +58,7 @@ import com.google.common.collect.ComparisonChain;
public class FileJournalManager implements JournalManager { public class FileJournalManager implements JournalManager {
private static final Log LOG = LogFactory.getLog(FileJournalManager.class); private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
private final Configuration conf;
private final StorageDirectory sd; private final StorageDirectory sd;
private final StorageErrorReporter errorReporter; private final StorageErrorReporter errorReporter;
private int outputBufferCapacity = 512*1024; private int outputBufferCapacity = 512*1024;
@ -72,8 +74,9 @@ public class FileJournalManager implements JournalManager {
StoragePurger purger StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger(); = new NNStorageRetentionManager.DeletionStoragePurger();
public FileJournalManager(StorageDirectory sd, public FileJournalManager(Configuration conf, StorageDirectory sd,
StorageErrorReporter errorReporter) { StorageErrorReporter errorReporter) {
this.conf = conf;
this.sd = sd; this.sd = sd;
this.errorReporter = errorReporter; this.errorReporter = errorReporter;
} }
@ -102,8 +105,8 @@ public class FileJournalManager implements JournalManager {
throws IOException { throws IOException {
try { try {
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid); currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress, EditLogOutputStream stm = new EditLogFileOutputStream(conf,
outputBufferCapacity); currentInProgress, outputBufferCapacity);
stm.create(); stm.create();
return stm; return stm;
} catch (IOException e) { } catch (IOException e) {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
@ -39,7 +40,8 @@ public class BinaryEditsVisitor implements OfflineEditsVisitor {
* @param filename Name of file to write output to * @param filename Name of file to write output to
*/ */
public BinaryEditsVisitor(String outputName) throws IOException { public BinaryEditsVisitor(String outputName) throws IOException {
this.elfos = new EditLogFileOutputStream(new File(outputName), 0); this.elfos = new EditLogFileOutputStream(new Configuration(),
new File(outputName), 0);
elfos.create(); elfos.create();
} }

View File

@ -1290,4 +1290,21 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>false</value>
<description>
Specifies whether to flush edit log file channel. When set, expensive
FileChannel#force calls are skipped and synchronous disk writes are
enabled instead by opening the edit log file with RandomAccessFile("rws")
flags. This can significantly improve the performance of edit log writes
on the Windows platform.
Note that the behavior of the "rws" flags is platform and hardware specific
and might not provide the same level of guarantees as FileChannel#force.
For example, the write will skip the disk-cache on SAS and SCSI devices
while it might not on SATA devices. This is an expert level setting,
change with caution.
</description>
</property>
</configuration> </configuration>

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
@ -53,13 +54,16 @@ public class TestJournal {
private StorageErrorReporter mockErrorReporter = Mockito.mock( private StorageErrorReporter mockErrorReporter = Mockito.mock(
StorageErrorReporter.class); StorageErrorReporter.class);
private Configuration conf;
private Journal journal; private Journal journal;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
FileUtil.fullyDelete(TEST_LOG_DIR); FileUtil.fullyDelete(TEST_LOG_DIR);
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter); conf = new Configuration();
journal = new Journal(conf, TEST_LOG_DIR, JID,
mockErrorReporter);
journal.format(FAKE_NSINFO); journal.format(FAKE_NSINFO);
} }
@ -135,7 +139,7 @@ public class TestJournal {
journal.close(); // close to unlock the storage dir journal.close(); // close to unlock the storage dir
// Now re-instantiate, make sure history is still there // Now re-instantiate, make sure history is still there
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter); journal = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
// The storage info should be read, even if no writer has taken over. // The storage info should be read, even if no writer has taken over.
assertEquals(storageString, assertEquals(storageString,
@ -192,7 +196,7 @@ public class TestJournal {
journal.newEpoch(FAKE_NSINFO, 1); journal.newEpoch(FAKE_NSINFO, 1);
try { try {
new Journal(TEST_LOG_DIR, JID, mockErrorReporter); new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
fail("Did not fail to create another journal in same dir"); fail("Did not fail to create another journal in same dir");
} catch (IOException ioe) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
@ -203,7 +207,7 @@ public class TestJournal {
// Journal should no longer be locked after the close() call. // Journal should no longer be locked after the close() call.
// Hence, should be able to create a new Journal in the same dir. // Hence, should be able to create a new Journal in the same dir.
Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter); Journal journal2 = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
journal2.newEpoch(FAKE_NSINFO, 2); journal2.newEpoch(FAKE_NSINFO, 2);
journal2.close(); journal2.close();
} }
@ -231,7 +235,7 @@ public class TestJournal {
// Check that, even if we re-construct the journal by scanning the // Check that, even if we re-construct the journal by scanning the
// disk, we don't allow finalizing incorrectly. // disk, we don't allow finalizing incorrectly.
journal.close(); journal.close();
journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter); journal = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
try { try {
journal.finalizeLogSegment(makeRI(4), 1, 6); journal.finalizeLogSegment(makeRI(4), 1, 6);

View File

@ -762,7 +762,7 @@ public class TestEditLog {
File log = new File(currentDir, File log = new File(currentDir,
NNStorage.getInProgressEditsFileName(3)); NNStorage.getInProgressEditsFileName(3));
EditLogFileOutputStream stream = new EditLogFileOutputStream(log, 1024); EditLogFileOutputStream stream = new EditLogFileOutputStream(conf, log, 1024);
try { try {
stream.create(); stream.create();
if (!inBothDirs) { if (!inBothDirs) {
@ -1233,7 +1233,7 @@ public class TestEditLog {
EditLogFileOutputStream elfos = null; EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null; EditLogFileInputStream elfis = null;
try { try {
elfos = new EditLogFileOutputStream(TEST_LOG_NAME, 0); elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
elfos.create(); elfos.create();
elfos.writeRaw(garbage, 0, garbage.length); elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush(); elfos.setReadyToFlush();

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -40,6 +41,8 @@ public class TestEditLogFileOutputStream {
final static int MIN_PREALLOCATION_LENGTH = final static int MIN_PREALLOCATION_LENGTH =
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH; EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH;
private Configuration conf;
static { static {
// No need to fsync for the purposes of tests. This makes // No need to fsync for the purposes of tests. This makes
// the tests run much faster. // the tests run much faster.
@ -52,6 +55,11 @@ public class TestEditLogFileOutputStream {
if (TEST_EDITS.exists()) TEST_EDITS.delete(); if (TEST_EDITS.exists()) TEST_EDITS.delete();
} }
@Before
public void setUp() {
conf = new Configuration();
}
static void flushAndCheckLength(EditLogFileOutputStream elos, static void flushAndCheckLength(EditLogFileOutputStream elos,
long expectedLength) throws IOException { long expectedLength) throws IOException {
elos.setReadyToFlush(); elos.setReadyToFlush();
@ -65,7 +73,8 @@ public class TestEditLogFileOutputStream {
*/ */
@Test @Test
public void testRawWrites() throws IOException { public void testRawWrites() throws IOException {
EditLogFileOutputStream elos = new EditLogFileOutputStream(TEST_EDITS, 0); EditLogFileOutputStream elos = new EditLogFileOutputStream(conf, TEST_EDITS,
0);
try { try {
byte[] small = new byte[] {1,2,3,4,5,8,7}; byte[] small = new byte[] {1,2,3,4,5,8,7};
elos.create(); elos.create();
@ -104,7 +113,7 @@ public class TestEditLogFileOutputStream {
public void testEditLogFileOutputStreamCloseAbort() throws IOException { public void testEditLogFileOutputStreamCloseAbort() throws IOException {
// abort after a close should just ignore // abort after a close should just ignore
EditLogFileOutputStream editLogStream = EditLogFileOutputStream editLogStream =
new EditLogFileOutputStream(TEST_EDITS, 0); new EditLogFileOutputStream(conf, TEST_EDITS, 0);
editLogStream.close(); editLogStream.close();
editLogStream.abort(); editLogStream.abort();
} }
@ -117,7 +126,7 @@ public class TestEditLogFileOutputStream {
public void testEditLogFileOutputStreamCloseClose() throws IOException { public void testEditLogFileOutputStreamCloseClose() throws IOException {
// close after a close should result in an IOE // close after a close should result in an IOE
EditLogFileOutputStream editLogStream = EditLogFileOutputStream editLogStream =
new EditLogFileOutputStream(TEST_EDITS, 0); new EditLogFileOutputStream(conf, TEST_EDITS, 0);
editLogStream.close(); editLogStream.close();
try { try {
editLogStream.close(); editLogStream.close();
@ -135,7 +144,7 @@ public class TestEditLogFileOutputStream {
public void testEditLogFileOutputStreamAbortAbort() throws IOException { public void testEditLogFileOutputStreamAbortAbort() throws IOException {
// abort after a close should just ignore // abort after a close should just ignore
EditLogFileOutputStream editLogStream = EditLogFileOutputStream editLogStream =
new EditLogFileOutputStream(TEST_EDITS, 0); new EditLogFileOutputStream(conf, TEST_EDITS, 0);
editLogStream.abort(); editLogStream.abort();
editLogStream.abort(); editLogStream.abort();
} }

View File

@ -36,6 +36,7 @@ import java.util.PriorityQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec; import org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -51,12 +53,19 @@ import com.google.common.collect.ImmutableList;
public class TestFileJournalManager { public class TestFileJournalManager {
static final Log LOG = LogFactory.getLog(TestFileJournalManager.class); static final Log LOG = LogFactory.getLog(TestFileJournalManager.class);
private Configuration conf;
static { static {
// No need to fsync for the purposes of tests. This makes // No need to fsync for the purposes of tests. This makes
// the tests run much faster. // the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
} }
@Before
public void setUp() {
conf = new Configuration();
}
/** /**
* Find out how many transactions we can read from a * Find out how many transactions we can read from a
* FileJournalManager, starting at a given transaction ID. * FileJournalManager, starting at a given transaction ID.
@ -115,7 +124,7 @@ public class TestFileJournalManager {
long numJournals = 0; long numJournals = 0;
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
numJournals++; numJournals++;
} }
@ -135,7 +144,7 @@ public class TestFileJournalManager {
5, new AbortSpec(5, 0)); 5, new AbortSpec(5, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
getNumberOfTransactions(jm, 1, true, false)); getNumberOfTransactions(jm, 1, true, false));
} }
@ -158,16 +167,16 @@ public class TestFileJournalManager {
5, new AbortSpec(5, 1)); 5, new AbortSpec(5, 1));
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(conf, sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true, false)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(conf, sd, storage);
assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
} }
@ -191,17 +200,17 @@ public class TestFileJournalManager {
new AbortSpec(5, 2)); new AbortSpec(5, 2));
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true, false)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(conf, sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true, false)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(conf, sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true, false)); true, false));
} }
@ -226,7 +235,7 @@ public class TestFileJournalManager {
10, new AbortSpec(10, 0)); 10, new AbortSpec(10, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
String sdRootPath = sd.getRoot().getAbsolutePath(); String sdRootPath = sd.getRoot().getAbsolutePath();
FileUtil.chmod(sdRootPath, "-w", true); FileUtil.chmod(sdRootPath, "-w", true);
try { try {
@ -251,7 +260,7 @@ public class TestFileJournalManager {
10, new AbortSpec(10, 0)); 10, new AbortSpec(10, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
assertEquals(expectedTotalTxnCount, getNumberOfTransactions(jm, 1, assertEquals(expectedTotalTxnCount, getNumberOfTransactions(jm, 1,
true, false)); true, false));
@ -277,7 +286,7 @@ public class TestFileJournalManager {
10); 10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
// 10 rolls, so 11 rolled files, 110 txids total. // 10 rolls, so 11 rolled files, 110 txids total.
final int TOTAL_TXIDS = 10 * 11; final int TOTAL_TXIDS = 10 * 11;
@ -315,7 +324,7 @@ public class TestFileJournalManager {
assertEquals(1, files.length); assertEquals(1, files.length);
assertTrue(files[0].delete()); assertTrue(files[0].delete());
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(startGapTxId-1, getNumberOfTransactions(jm, 1, true, true)); assertEquals(startGapTxId-1, getNumberOfTransactions(jm, 1, true, true));
assertEquals(0, getNumberOfTransactions(jm, startGapTxId, true, true)); assertEquals(0, getNumberOfTransactions(jm, startGapTxId, true, true));
@ -348,7 +357,7 @@ public class TestFileJournalManager {
corruptAfterStartSegment(files[0]); corruptAfterStartSegment(files[0]);
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
assertEquals(10*TXNS_PER_ROLL+1, assertEquals(10*TXNS_PER_ROLL+1,
getNumberOfTransactions(jm, 1, true, false)); getNumberOfTransactions(jm, 1, true, false));
} }
@ -363,7 +372,7 @@ public class TestFileJournalManager {
NNStorage.getFinalizedEditsFileName(1001, 1100)); NNStorage.getFinalizedEditsFileName(1001, 1100));
// passing null for NNStorage because this unit test will not use it // passing null for NNStorage because this unit test will not use it
FileJournalManager fjm = new FileJournalManager(sd, null); FileJournalManager fjm = new FileJournalManager(conf, sd, null);
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1)); assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101)); assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
assertEquals("[1001,1100]", getLogsAsString(fjm, 201)); assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
@ -427,7 +436,7 @@ public class TestFileJournalManager {
10); 10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
EditLogInputStream elis = getJournalInputStream(jm, 5, true); EditLogInputStream elis = getJournalInputStream(jm, 5, true);
FSEditLogOp op = elis.readOp(); FSEditLogOp op = elis.readOp();
@ -448,7 +457,7 @@ public class TestFileJournalManager {
10, false); 10, false);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(conf, sd, storage);
// If we exclude the in-progess stream, we should only have 100 tx. // If we exclude the in-progess stream, we should only have 100 tx.
assertEquals(100, getNumberOfTransactions(jm, 1, false, false)); assertEquals(100, getNumberOfTransactions(jm, 1, false, false));

View File

@ -266,12 +266,12 @@ public class TestNNStorageRetentionManager {
Joiner.on(",").join(purgedPaths)); Joiner.on(",").join(purgedPaths));
} }
private static class TestCaseDescription { private class TestCaseDescription {
private Map<File, FakeRoot> dirRoots = Maps.newHashMap(); private Map<File, FakeRoot> dirRoots = Maps.newHashMap();
private Set<File> expectedPurgedLogs = Sets.newLinkedHashSet(); private Set<File> expectedPurgedLogs = Sets.newLinkedHashSet();
private Set<File> expectedPurgedImages = Sets.newLinkedHashSet(); private Set<File> expectedPurgedImages = Sets.newLinkedHashSet();
private static class FakeRoot { private class FakeRoot {
NameNodeDirType type; NameNodeDirType type;
List<File> files; List<File> files;
@ -331,7 +331,7 @@ public class TestNNStorageRetentionManager {
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue; if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
// passing null NNStorage for unit test because it does not use it // passing null NNStorage for unit test because it does not use it
FileJournalManager fjm = new FileJournalManager( FileJournalManager fjm = new FileJournalManager(conf,
root.mockStorageDir(), null); root.mockStorageDir(), null);
fjm.purger = purger; fjm.purger = purger;
jms.add(fjm); jms.add(fjm);

View File

@ -74,7 +74,7 @@ public class TestNameNodeRecovery {
EditLogFileOutputStream elfos = null; EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null; EditLogFileInputStream elfis = null;
try { try {
elfos = new EditLogFileOutputStream(TEST_LOG_NAME, 0); elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
elfos.create(); elfos.create();
elts.addTransactionsToLog(elfos, cache); elts.addTransactionsToLog(elfos, cache);