HDFS-1580. Merging change r1210749, r1210602 from trunk to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297864 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-03-07 06:48:27 +00:00
parent d6f9460eba
commit aec70c23aa
16 changed files with 302 additions and 40 deletions

View File

@ -102,6 +102,9 @@ Release 0.23.3 - UNRELEASED
HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra) HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra)
HDFS-1580. Add interface for generic Write Ahead Logging mechanisms.
(Ivan Kelly via jitendra)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong) namenode state. (Tomasz Nykiel via hairong)

View File

@ -163,6 +163,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT;
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir"; public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";

View File

@ -103,7 +103,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
} }
@Override @Override
long length() throws IOException { public long length() throws IOException {
// file size + size of both buffers // file size + size of both buffers
return inner.length(); return inner.length();
} }

View File

@ -66,12 +66,12 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
@Override // EditLogOutputStream @Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException { public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
} }
@Override @Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException { public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
@ -79,7 +79,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
* There is no persistent storage. Just clear the buffers. * There is no persistent storage. Just clear the buffers.
*/ */
@Override // EditLogOutputStream @Override // EditLogOutputStream
void create() throws IOException { public void create() throws IOException {
assert doubleBuf.isFlushed() : "previous data is not flushed yet"; assert doubleBuf.isFlushed() : "previous data is not flushed yet";
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
} }
@ -105,7 +105,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
@Override // EditLogOutputStream @Override // EditLogOutputStream
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
doubleBuf.setReadyToFlush(); doubleBuf.setReadyToFlush();
} }

View File

@ -129,7 +129,7 @@ class EditLogFileInputStream extends EditLogInputStream {
} }
@Override @Override
long length() throws IOException { public long length() throws IOException {
// file size + size of both buffers // file size + size of both buffers
return file.length(); return file.length();
} }

View File

@ -72,7 +72,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
} }
@Override @Override
void write(FSEditLogOp op) throws IOException { public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
} }
@ -85,7 +85,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* </ul> * </ul>
* */ * */
@Override @Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException { public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
doubleBuf.writeRaw(bytes, offset, length); doubleBuf.writeRaw(bytes, offset, length);
} }
@ -93,7 +93,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* Create empty edits logs file. * Create empty edits logs file.
*/ */
@Override @Override
void create() throws IOException { public void create() throws IOException {
fc.truncate(0); fc.truncate(0);
fc.position(0); fc.position(0);
doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION); doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION);
@ -149,7 +149,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* data can be still written to the stream while flushing is performed. * data can be still written to the stream while flushing is performed.
*/ */
@Override @Override
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
doubleBuf.setReadyToFlush(); doubleBuf.setReadyToFlush();
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -27,7 +29,9 @@ import java.io.IOException;
* It should stream bytes from the storage exactly as they were written * It should stream bytes from the storage exactly as they were written
* into the #{@link EditLogOutputStream}. * into the #{@link EditLogOutputStream}.
*/ */
abstract class EditLogInputStream implements JournalStream, Closeable { @InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class EditLogInputStream implements JournalStream, Closeable {
/** /**
* @return the first transaction which will be found in this stream * @return the first transaction which will be found in this stream
*/ */
@ -74,5 +78,5 @@ abstract class EditLogInputStream implements JournalStream, Closeable {
/** /**
* Return the size of the current edits log. * Return the size of the current edits log.
*/ */
abstract long length() throws IOException; public abstract long length() throws IOException;
} }

View File

@ -21,17 +21,21 @@ import java.io.IOException;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
* A generic abstract class to support journaling of edits logs into * A generic abstract class to support journaling of edits logs into
* a persistent storage. * a persistent storage.
*/ */
abstract class EditLogOutputStream { @InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class EditLogOutputStream {
// these are statistics counters // these are statistics counters
private long numSync; // number of sync(s) to disk private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync private long totalTimeSync; // total time to sync
EditLogOutputStream() { public EditLogOutputStream() throws IOException {
numSync = totalTimeSync = 0; numSync = totalTimeSync = 0;
} }
@ -41,7 +45,7 @@ abstract class EditLogOutputStream {
* @param op operation * @param op operation
* @throws IOException * @throws IOException
*/ */
abstract void write(FSEditLogOp op) throws IOException; abstract public void write(FSEditLogOp op) throws IOException;
/** /**
* Write raw data to an edit log. This data should already have * Write raw data to an edit log. This data should already have
@ -54,7 +58,7 @@ abstract class EditLogOutputStream {
* @param length number of bytes to write * @param length number of bytes to write
* @throws IOException * @throws IOException
*/ */
abstract void writeRaw(byte[] bytes, int offset, int length) abstract public void writeRaw(byte[] bytes, int offset, int length)
throws IOException; throws IOException;
/** /**
@ -62,7 +66,7 @@ abstract class EditLogOutputStream {
* *
* @throws IOException * @throws IOException
*/ */
abstract void create() throws IOException; abstract public void create() throws IOException;
/** /**
* Close the journal. * Close the journal.
@ -81,7 +85,7 @@ abstract class EditLogOutputStream {
* All data that has been written to the stream so far will be flushed. * All data that has been written to the stream so far will be flushed.
* New data can be still written to the stream while flushing is performed. * New data can be still written to the stream while flushing is performed.
*/ */
abstract void setReadyToFlush() throws IOException; abstract public void setReadyToFlush() throws IOException;
/** /**
* Flush and sync all data that is ready to be flush * Flush and sync all data that is ready to be flush

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -109,6 +111,7 @@ public class FSEditLog {
private NameNodeMetrics metrics; private NameNodeMetrics metrics;
private NNStorage storage; private NNStorage storage;
private Configuration conf;
private static class TransactionId { private static class TransactionId {
public long txid; public long txid;
@ -145,6 +148,7 @@ public class FSEditLog {
* @param editsDirs List of journals to use * @param editsDirs List of journals to use
*/ */
FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) { FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
this.conf = conf;
isSyncRunning = false; isSyncRunning = false;
this.storage = storage; this.storage = storage;
metrics = NameNode.getNameNodeMetrics(); metrics = NameNode.getNameNodeMetrics();
@ -167,9 +171,13 @@ public class FSEditLog {
this.journalSet = new JournalSet(); this.journalSet = new JournalSet();
for (URI u : this.editsDirs) { for (URI u : this.editsDirs) {
StorageDirectory sd = storage.getStorageDirectory(u); if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
if (sd != null) { StorageDirectory sd = storage.getStorageDirectory(u);
journalSet.add(new FileJournalManager(sd)); if (sd != null) {
journalSet.add(new FileJournalManager(sd));
}
} else {
journalSet.add(createJournal(u));
} }
} }
@ -1003,4 +1011,53 @@ public class FSEditLog {
IOUtils.closeStream(s); IOUtils.closeStream(s);
} }
} }
/**
* Retrieve the implementation class for a Journal scheme.
* @param conf The configuration to retrieve the information from
* @param uriScheme The uri scheme to look up.
* @return the class of the journal implementation
* @throws IllegalArgumentException if no class is configured for uri
*/
static Class<? extends JournalManager> getJournalClass(Configuration conf,
String uriScheme) {
String key
= DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + "." + uriScheme;
Class <? extends JournalManager> clazz = null;
try {
clazz = conf.getClass(key, null, JournalManager.class);
} catch (RuntimeException re) {
throw new IllegalArgumentException(
"Invalid class specified for " + uriScheme, re);
}
if (clazz == null) {
LOG.warn("No class configured for " +uriScheme
+ ", " + key + " is empty");
throw new IllegalArgumentException(
"No class configured for " + uriScheme);
}
return clazz;
}
/**
* Construct a custom journal manager.
* The class to construct is taken from the configuration.
* @param uri Uri to construct
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
private JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class);
return cons.newInstance(conf, uri);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
}
} }

View File

@ -26,6 +26,8 @@ import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumMap; import java.util.EnumMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -61,6 +63,8 @@ import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader { public class FSEditLogLoader {
private final FSNamesystem fsNamesys; private final FSNamesystem fsNamesys;
@ -550,7 +554,7 @@ public class FSEditLogLoader {
/** /**
* Stream wrapper that keeps track of the current stream position. * Stream wrapper that keeps track of the current stream position.
*/ */
static class PositionTrackingInputStream extends FilterInputStream { public static class PositionTrackingInputStream extends FilterInputStream {
private long curPos = 0; private long curPos = 0;
private long markPos = -1; private long markPos = -1;

View File

@ -113,6 +113,10 @@ public abstract class FSEditLogOp {
this.txid = 0; this.txid = 0;
} }
public long getTransactionId() {
return txid;
}
public void setTransactionId(long txid) { public void setTransactionId(long txid) {
this.txid = txid; this.txid = txid;
} }

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
* A JournalManager is responsible for managing a single place of storing * A JournalManager is responsible for managing a single place of storing
@ -28,7 +30,9 @@ import java.io.IOException;
* each conceptual place of storage corresponds to exactly one instance of * each conceptual place of storage corresponds to exactly one instance of
* this class, which is created when the EditLog is first opened. * this class, which is created when the EditLog is first opened.
*/ */
interface JournalManager extends Closeable { @InterfaceAudience.Private
@InterfaceStability.Evolving
public interface JournalManager extends Closeable {
/** /**
* Begin writing to a new segment of the log stream, which starts at * Begin writing to a new segment of the log stream, which starts at
* the given transaction ID. * the given transaction ID.
@ -71,7 +75,6 @@ interface JournalManager extends Closeable {
* *
* @param minTxIdToKeep the earliest txid that must be retained after purging * @param minTxIdToKeep the earliest txid that must be retained after purging
* old logs * old logs
* @param purger the purging implementation to use
* @throws IOException if purging fails * @throws IOException if purging fails
*/ */
void purgeLogsOlderThan(long minTxIdToKeep) void purgeLogsOlderThan(long minTxIdToKeep)

View File

@ -309,7 +309,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void write(final FSEditLogOp op) public void write(final FSEditLogOp op)
throws IOException { throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
@ -322,7 +322,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void writeRaw(final byte[] data, final int offset, final int length) public void writeRaw(final byte[] data, final int offset, final int length)
throws IOException { throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
@ -335,7 +335,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void create() throws IOException { public void create() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
public void apply(JournalAndStream jas) throws IOException { public void apply(JournalAndStream jas) throws IOException {
@ -367,7 +367,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
public void apply(JournalAndStream jas) throws IOException { public void apply(JournalAndStream jas) throws IOException {

View File

@ -68,7 +68,8 @@ public class NNStorage extends Storage implements Closeable {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName()); private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest"; static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
static final String LOCAL_URI_SCHEME = "file";
// //
// The filenames used for storing the images // The filenames used for storing the images
// //
@ -325,22 +326,14 @@ public class NNStorage extends Storage implements Closeable {
/** /**
* Checks the consistency of a URI, in particular if the scheme * Checks the consistency of a URI, in particular if the scheme
* is specified and is supported by a concrete implementation * is specified
* @param u URI whose consistency is being checked. * @param u URI whose consistency is being checked.
*/ */
private static void checkSchemeConsistency(URI u) throws IOException { private static void checkSchemeConsistency(URI u) throws IOException {
String scheme = u.getScheme(); String scheme = u.getScheme();
// the URI should have a proper scheme // the URI should have a proper scheme
if(scheme == null) if(scheme == null) {
throw new IOException("Undefined scheme for " + u); throw new IOException("Undefined scheme for " + u);
else {
try {
// the scheme should be enumerated as JournalType
JournalType.valueOf(scheme.toUpperCase());
} catch (IllegalArgumentException iae){
throw new IOException("Unknown scheme " + scheme +
". It should correspond to a JournalType enumeration value");
}
} }
} }

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Collections2;
import com.google.common.base.Predicate;
/** /**
* *
@ -69,7 +71,18 @@ public class NameNodeResourceChecker {
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
addDirsToCheck(FSNamesystem.getNamespaceDirs(conf)); addDirsToCheck(FSNamesystem.getNamespaceDirs(conf));
addDirsToCheck(FSNamesystem.getNamespaceEditsDirs(conf));
Collection<URI> localEditDirs = Collections2.filter(
FSNamesystem.getNamespaceEditsDirs(conf),
new Predicate<URI>() {
public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true;
}
return false;
}
});
addDirsToCheck(localEditDirs);
addDirsToCheck(extraCheckedVolumes); addDirsToCheck(extraCheckedVolumes);
} }

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Writable;
import java.net.URI;
import java.io.IOException;
public class TestGenericJournalConf {
/**
* Test that an exception is thrown if a journal class doesn't exist
* in the configuration
*/
@Test(expected=IllegalArgumentException.class)
public void testNotConfigured() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that an exception is thrown if a journal class doesn't
* exist in the classloader.
*/
@Test(expected=IllegalArgumentException.class)
public void testClassDoesntExist() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
"org.apache.hadoop.nonexistent");
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that a implementation of JournalManager without a
* (Configuration,URI) constructor throws an exception
*/
@Test
public void testBadConstructor() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
BadConstructorJournalManager.class.getName());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
fail("Should have failed before this point");
} catch (IllegalArgumentException iae) {
if (!iae.getMessage().contains("Unable to construct journal")) {
fail("Should have failed with unable to construct exception");
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test that a dummy implementation of JournalManager can
* be initialized on startup
*/
@Test
public void testDummyJournalManager() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
DummyJournalManager.class.getName());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
public static class DummyJournalManager implements JournalManager {
public DummyJournalManager(Configuration conf, URI u) {}
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
return mock(EditLogOutputStream.class);
}
@Override
public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
// noop
}
@Override
public EditLogInputStream getInputStream(long fromTxnId)
throws IOException {
return null;
}
@Override
public long getNumberOfTransactions(long fromTxnId)
throws IOException {
return 0;
}
@Override
public void setOutputBufferCapacity(int size) {}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {}
@Override
public void recoverUnfinalizedSegments() throws IOException {}
@Override
public void close() throws IOException {}
}
public static class BadConstructorJournalManager extends DummyJournalManager {
public BadConstructorJournalManager() {
super(null, null);
}
}
}