HDFS-1580. Add interface for generic Write Ahead Logging mechanisms. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1210602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-12-05 20:10:27 +00:00
parent 6a358ee140
commit d18e5b3844
15 changed files with 126 additions and 40 deletions

View File

@ -72,6 +72,9 @@ Trunk (unreleased changes)
Move the support for multiple protocols to lower layer so that Writable, Move the support for multiple protocols to lower layer so that Writable,
PB and Avro can all use it (Sanjay) PB and Avro can all use it (Sanjay)
HDFS-1580. Add interface for generic Write Ahead Logging mechanisms.
(Ivan Kelly via jitendra)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong) namenode state. (Tomasz Nykiel via hairong)

View File

@ -161,6 +161,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT;
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir"; public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";

View File

@ -103,7 +103,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
} }
@Override @Override
long length() throws IOException { public long length() throws IOException {
// file size + size of both buffers // file size + size of both buffers
return inner.length(); return inner.length();
} }

View File

@ -67,12 +67,12 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
@Override // EditLogOutputStream @Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException { public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
} }
@Override @Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException { public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
@ -80,7 +80,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
* There is no persistent storage. Just clear the buffers. * There is no persistent storage. Just clear the buffers.
*/ */
@Override // EditLogOutputStream @Override // EditLogOutputStream
void create() throws IOException { public void create() throws IOException {
assert doubleBuf.isFlushed() : "previous data is not flushed yet"; assert doubleBuf.isFlushed() : "previous data is not flushed yet";
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
} }
@ -106,7 +106,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
@Override // EditLogOutputStream @Override // EditLogOutputStream
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
doubleBuf.setReadyToFlush(); doubleBuf.setReadyToFlush();
} }

View File

@ -127,7 +127,7 @@ class EditLogFileInputStream extends EditLogInputStream {
} }
@Override @Override
long length() throws IOException { public long length() throws IOException {
// file size + size of both buffers // file size + size of both buffers
return file.length(); return file.length();
} }

View File

@ -73,7 +73,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
void write(FSEditLogOp op) throws IOException { public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
} }
@ -86,7 +86,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* </ul> * </ul>
* */ * */
@Override @Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException { public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
doubleBuf.writeRaw(bytes, offset, length); doubleBuf.writeRaw(bytes, offset, length);
} }
@ -94,7 +94,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* Create empty edits logs file. * Create empty edits logs file.
*/ */
@Override @Override
void create() throws IOException { public void create() throws IOException {
fc.truncate(0); fc.truncate(0);
fc.position(0); fc.position(0);
doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION); doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION);
@ -150,7 +150,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
* data can be still written to the stream while flushing is performed. * data can be still written to the stream while flushing is performed.
*/ */
@Override @Override
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
doubleBuf.setReadyToFlush(); doubleBuf.setReadyToFlush();
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -27,7 +29,9 @@ import java.io.IOException;
* It should stream bytes from the storage exactly as they were written * It should stream bytes from the storage exactly as they were written
* into the #{@link EditLogOutputStream}. * into the #{@link EditLogOutputStream}.
*/ */
abstract class EditLogInputStream implements JournalStream, Closeable { @InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class EditLogInputStream implements JournalStream, Closeable {
/** /**
* @return the first transaction which will be found in this stream * @return the first transaction which will be found in this stream
*/ */
@ -74,5 +78,5 @@ abstract class EditLogInputStream implements JournalStream, Closeable {
/** /**
* Return the size of the current edits log. * Return the size of the current edits log.
*/ */
abstract long length() throws IOException; public abstract long length() throws IOException;
} }

View File

@ -21,17 +21,21 @@ import java.io.IOException;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
* A generic abstract class to support journaling of edits logs into * A generic abstract class to support journaling of edits logs into
* a persistent storage. * a persistent storage.
*/ */
abstract class EditLogOutputStream { @InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class EditLogOutputStream {
// these are statistics counters // these are statistics counters
private long numSync; // number of sync(s) to disk private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync private long totalTimeSync; // total time to sync
EditLogOutputStream() { public EditLogOutputStream() throws IOException {
numSync = totalTimeSync = 0; numSync = totalTimeSync = 0;
} }
@ -41,7 +45,7 @@ abstract class EditLogOutputStream {
* @param op operation * @param op operation
* @throws IOException * @throws IOException
*/ */
abstract void write(FSEditLogOp op) throws IOException; abstract public void write(FSEditLogOp op) throws IOException;
/** /**
* Write raw data to an edit log. This data should already have * Write raw data to an edit log. This data should already have
@ -54,7 +58,7 @@ abstract class EditLogOutputStream {
* @param length number of bytes to write * @param length number of bytes to write
* @throws IOException * @throws IOException
*/ */
abstract void writeRaw(byte[] bytes, int offset, int length) abstract public void writeRaw(byte[] bytes, int offset, int length)
throws IOException; throws IOException;
/** /**
@ -62,7 +66,7 @@ abstract class EditLogOutputStream {
* *
* @throws IOException * @throws IOException
*/ */
abstract void create() throws IOException; abstract public void create() throws IOException;
/** /**
* Close the journal. * Close the journal.
@ -81,7 +85,7 @@ abstract class EditLogOutputStream {
* All data that has been written to the stream so far will be flushed. * All data that has been written to the stream so far will be flushed.
* New data can be still written to the stream while flushing is performed. * New data can be still written to the stream while flushing is performed.
*/ */
abstract void setReadyToFlush() throws IOException; abstract public void setReadyToFlush() throws IOException;
/** /**
* Flush and sync all data that is ready to be flush * Flush and sync all data that is ready to be flush

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -108,6 +110,7 @@ public class FSEditLog {
private NameNodeMetrics metrics; private NameNodeMetrics metrics;
private NNStorage storage; private NNStorage storage;
private Configuration conf;
private static class TransactionId { private static class TransactionId {
public long txid; public long txid;
@ -144,6 +147,7 @@ public class FSEditLog {
* @param editsDirs List of journals to use * @param editsDirs List of journals to use
*/ */
FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) { FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
this.conf = conf;
isSyncRunning = false; isSyncRunning = false;
this.storage = storage; this.storage = storage;
metrics = NameNode.getNameNodeMetrics(); metrics = NameNode.getNameNodeMetrics();
@ -166,9 +170,13 @@ public class FSEditLog {
this.journalSet = new JournalSet(); this.journalSet = new JournalSet();
for (URI u : this.editsDirs) { for (URI u : this.editsDirs) {
StorageDirectory sd = storage.getStorageDirectory(u); if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
if (sd != null) { StorageDirectory sd = storage.getStorageDirectory(u);
journalSet.add(new FileJournalManager(sd)); if (sd != null) {
journalSet.add(new FileJournalManager(sd));
}
} else {
journalSet.add(createJournal(u));
} }
} }
@ -994,4 +1002,53 @@ public class FSEditLog {
IOUtils.closeStream(s); IOUtils.closeStream(s);
} }
} }
/**
* Retrieve the implementation class for a Journal scheme.
* @param conf The configuration to retrieve the information from
* @param uriScheme The uri scheme to look up.
* @return the class of the journal implementation
* @throws IllegalArgumentException if no class is configured for uri
*/
static Class<? extends JournalManager> getJournalClass(Configuration conf,
String uriScheme) {
String key
= DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + "." + uriScheme;
Class <? extends JournalManager> clazz = null;
try {
clazz = conf.getClass(key, null, JournalManager.class);
} catch (RuntimeException re) {
throw new IllegalArgumentException(
"Invalid class specified for " + uriScheme, re);
}
if (clazz == null) {
LOG.warn("No class configured for " +uriScheme
+ ", " + key + " is empty");
throw new IllegalArgumentException(
"No class configured for " + uriScheme);
}
return clazz;
}
/**
* Construct a custom journal manager.
* The class to construct is taken from the configuration.
* @param uri Uri to construct
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
private JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class);
return cons.newInstance(conf, uri);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
}
} }

View File

@ -25,6 +25,8 @@ import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumMap; import java.util.EnumMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder; import org.apache.hadoop.hdfs.util.Holder;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader { public class FSEditLogLoader {
private final FSNamesystem fsNamesys; private final FSNamesystem fsNamesys;
@ -514,7 +518,7 @@ public class FSEditLogLoader {
/** /**
* Stream wrapper that keeps track of the current stream position. * Stream wrapper that keeps track of the current stream position.
*/ */
static class PositionTrackingInputStream extends FilterInputStream { public static class PositionTrackingInputStream extends FilterInputStream {
private long curPos = 0; private long curPos = 0;
private long markPos = -1; private long markPos = -1;

View File

@ -113,6 +113,10 @@ public abstract class FSEditLogOp {
this.txid = 0; this.txid = 0;
} }
public long getTransactionId() {
return txid;
}
public void setTransactionId(long txid) { public void setTransactionId(long txid) {
this.txid = txid; this.txid = txid;
} }

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
* A JournalManager is responsible for managing a single place of storing * A JournalManager is responsible for managing a single place of storing
@ -28,7 +30,9 @@ import java.io.IOException;
* each conceptual place of storage corresponds to exactly one instance of * each conceptual place of storage corresponds to exactly one instance of
* this class, which is created when the EditLog is first opened. * this class, which is created when the EditLog is first opened.
*/ */
interface JournalManager extends Closeable { @InterfaceAudience.Private
@InterfaceStability.Evolving
public interface JournalManager extends Closeable {
/** /**
* Begin writing to a new segment of the log stream, which starts at * Begin writing to a new segment of the log stream, which starts at
* the given transaction ID. * the given transaction ID.
@ -71,7 +75,6 @@ interface JournalManager extends Closeable {
* *
* @param minTxIdToKeep the earliest txid that must be retained after purging * @param minTxIdToKeep the earliest txid that must be retained after purging
* old logs * old logs
* @param purger the purging implementation to use
* @throws IOException if purging fails * @throws IOException if purging fails
*/ */
void purgeLogsOlderThan(long minTxIdToKeep) void purgeLogsOlderThan(long minTxIdToKeep)

View File

@ -309,7 +309,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void write(final FSEditLogOp op) public void write(final FSEditLogOp op)
throws IOException { throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
@ -322,7 +322,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void writeRaw(final byte[] data, final int offset, final int length) public void writeRaw(final byte[] data, final int offset, final int length)
throws IOException { throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
@ -335,7 +335,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void create() throws IOException { public void create() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
public void apply(JournalAndStream jas) throws IOException { public void apply(JournalAndStream jas) throws IOException {
@ -367,7 +367,7 @@ public class JournalSet implements JournalManager {
} }
@Override @Override
void setReadyToFlush() throws IOException { public void setReadyToFlush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() { mapJournalsAndReportErrors(new JournalClosure() {
@Override @Override
public void apply(JournalAndStream jas) throws IOException { public void apply(JournalAndStream jas) throws IOException {

View File

@ -70,7 +70,8 @@ public class NNStorage extends Storage implements Closeable {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName()); private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest"; static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
static final String LOCAL_URI_SCHEME = "file";
// //
// The filenames used for storing the images // The filenames used for storing the images
// //
@ -324,22 +325,14 @@ public class NNStorage extends Storage implements Closeable {
/** /**
* Checks the consistency of a URI, in particular if the scheme * Checks the consistency of a URI, in particular if the scheme
* is specified and is supported by a concrete implementation * is specified
* @param u URI whose consistency is being checked. * @param u URI whose consistency is being checked.
*/ */
private static void checkSchemeConsistency(URI u) throws IOException { private static void checkSchemeConsistency(URI u) throws IOException {
String scheme = u.getScheme(); String scheme = u.getScheme();
// the URI should have a proper scheme // the URI should have a proper scheme
if(scheme == null) if(scheme == null) {
throw new IOException("Undefined scheme for " + u); throw new IOException("Undefined scheme for " + u);
else {
try {
// the scheme should be enumerated as JournalType
JournalType.valueOf(scheme.toUpperCase());
} catch (IllegalArgumentException iae){
throw new IOException("Unknown scheme " + scheme +
". It should correspond to a JournalType enumeration value");
}
} }
} }

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Collections2;
import com.google.common.base.Predicate;
/** /**
* *
@ -69,7 +71,18 @@ public class NameNodeResourceChecker {
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
addDirsToCheck(FSNamesystem.getNamespaceDirs(conf)); addDirsToCheck(FSNamesystem.getNamespaceDirs(conf));
addDirsToCheck(FSNamesystem.getNamespaceEditsDirs(conf));
Collection<URI> localEditDirs = Collections2.filter(
FSNamesystem.getNamespaceEditsDirs(conf),
new Predicate<URI>() {
public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true;
}
return false;
}
});
addDirsToCheck(localEditDirs);
addDirsToCheck(extraCheckedVolumes); addDirsToCheck(extraCheckedVolumes);
} }