HBASE-2059 Break out WAL reader and writer impl from HLog

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@892451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2009-12-19 08:10:45 +00:00
parent 7b8a829cec
commit 23cbf39343
11 changed files with 344 additions and 230 deletions

View File

@ -242,6 +242,7 @@ Release 0.21.0 - Unreleased
HBASE-2049 Cleanup HLog binary log output (Dave Latham via Stack)
HBASE-2052 Make hbase more 'live' when comes to noticing table creation,
splits, etc., for 0.20.3
HBASE-2059 Break out WAL reader and writer impl from HLog
NEW FEATURES
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write

View File

@ -190,6 +190,16 @@
<value>3600000</value>
<description>Period at which we will roll the commit log.</description>
</property>
<property>
<name>hbase.regionserver.hlog.reader.impl</name>
<value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader</value>
<description>The HLog file reader implementation.</description>
</property>
<property>
<name>hbase.regionserver.hlog.writer.impl</name>
<value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter</value>
<description>The HLog file writer implementation.</description>
</property>
<property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>20000</value>

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
import org.apache.hadoop.io.SequenceFile;
/**
* Add support for transactional operations to the regionserver's
@ -47,11 +46,6 @@ class THLog extends HLog {
super(fs, dir, conf, listener);
}
@Override
protected SequenceFile.Writer createWriter(Path path) throws IOException {
return super.createWriter(path, THLogKey.class, KeyValue.class);
}
@Override
protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqNum,
long now) {

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
/**
@ -107,12 +106,8 @@ class THLogRecoveryManager {
Set<Long> commitedTransactions = new HashSet<Long>();
Set<Long> abortedTransactions = new HashSet<Long>();
SequenceFile.Reader logReader = HLog.getReader(fileSystem,
reconstructionLog, conf);
HLog.Reader reader = HLog.getReader(fileSystem, reconstructionLog, conf);
try {
THLogKey key = new THLogKey();
KeyValue val = new KeyValue();
long skippedEdits = 0;
long totalEdits = 0;
long startCount = 0;
@ -123,7 +118,10 @@ class THLogRecoveryManager {
int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
2000);
while (logReader.next(key, val)) {
HLog.Entry entry;
while ((entry = reader.next()) != null) {
THLogKey key = (THLogKey)entry.getKey();
KeyValue val = entry.getEdit();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
@ -200,7 +198,7 @@ class THLogRecoveryManager {
+ " aborts, and " + commitCount + " commits.");
}
} finally {
logReader.close();
reader.close();
}
if (pendingTransactionsById.size() > 0) {

View File

@ -108,6 +108,8 @@ public class TransactionalRegionServer extends HRegionServer implements
@Override
protected HLog instantiateHLog(Path logdir) throws IOException {
conf.set("hbase.regionserver.hlog.keyclass",
THLogKey.class.getCanonicalName());
HLog newlog = new THLog(super.getFileSystem(), logdir, conf, super.getLogRoller());
return newlog;
}

View File

@ -58,6 +58,8 @@ public class TestTHLog extends HBaseTestCase implements
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
.getHomeDirectory().toString());
this.conf.set("hbase.regionserver.hlog.keyclass",
THLogKey.class.getCanonicalName());
super.setUp();
this.dir = new Path("/hbase", getName());
if (fs.exists(dir)) {

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@ -317,17 +316,17 @@ public class Store implements HConstants, HeapSize {
// general memory usage accounting.
long maxSeqIdInLog = -1;
long firstSeqIdInLog = -1;
SequenceFile.Reader logReader = HLog.getReader(this.fs, reconstructionLog,
this.conf);
HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf);
try {
HLogKey key = HLog.newKey(conf);
KeyValue val = new KeyValue();
long skippedEdits = 0;
long editsCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval =
this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while (logReader.next(key, val)) {
HLog.Entry entry;
while ((entry = logReader.next()) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}

View File

@ -23,7 +23,6 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@ -46,8 +45,6 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -64,12 +61,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
@ -123,7 +114,30 @@ public class HLog implements HConstants, Syncable {
private final long blocksize;
private final int flushlogentries;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private final short replicationLevel;
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
void close() throws IOException;
Entry next() throws IOException;
Entry next(Entry reuse) throws IOException;
}
public interface Writer {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
void close() throws IOException;
void sync() throws IOException;
void append(Entry entry) throws IOException;
}
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@ -131,10 +145,7 @@ public class HLog implements HConstants, Syncable {
/*
* Current log file.
*/
SequenceFile.Writer writer;
// This is the above writer's output stream. Its private but we use reflection
// to expose it so we can call sync on it.
FSDataOutputStream writer_out;
Writer writer;
/*
* Map of all log files but the current one.
@ -218,8 +229,6 @@ public class HLog implements HConstants, Syncable {
conf.getInt("hbase.regionserver.flushlogentries", 1);
this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
this.replicationLevel = (short) conf.getInt("hbase.regionserver.hlog.replication",
this.fs.getDefaultReplication());
// Roll at 95% of block size.
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
this.logrollsize = (long)(this.blocksize * multi);
@ -249,16 +258,6 @@ public class HLog implements HConstants, Syncable {
return this.filenum;
}
/**
* Get the compression type for the hlog files
* @param c Configuration to use.
* @return the kind of compression to use
*/
static CompressionType getCompressionType(final Configuration c) {
// Compression makes no sense for commit log. Always return NONE.
return CompressionType.NONE;
}
/**
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
@ -318,7 +317,7 @@ public class HLog implements HConstants, Syncable {
Path oldFile = cleanupCurrentWriter(this.filenum);
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
this.writer = createWriter(newPath);
this.writer = createWriter(fs, newPath, new HBaseConfiguration(conf));
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
@ -349,113 +348,54 @@ public class HLog implements HConstants, Syncable {
return regionToFlush;
}
protected SequenceFile.Writer createWriter(Path path) throws IOException {
return createWriter(path, HLogKey.class, KeyValue.class);
}
/**
* Hack just to set the correct file length up in SequenceFile.Reader.
* See HADOOP-6307. The below is all about setting the right length on the
* file we are reading. fs.getFileStatus(file).getLen() is passed down to
* a private SequenceFile.Reader constructor. This won't work. Need to do
* the available on the stream. The below is ugly. It makes getPos, the
* first time its called, return length of the file -- i.e. tell a lie -- just
* so this line up in SF.Reader's constructor ends up with right answer:
*
* this.end = in.getPos() + length;
*/
private static class WALReader extends SequenceFile.Reader {
WALReader(final FileSystem fs, final Path p, final Configuration c)
throws IOException {
super(fs, p, c);
}
@Override
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length)
throws IOException {
return new WALReaderFSDataInputStream(super.openFile(fs, file, bufferSize,
length), length);
}
/**
* Override just so can intercept first call to getPos.
*/
static class WALReaderFSDataInputStream extends FSDataInputStream {
private boolean firstGetPosInvocation = true;
private long length;
WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
throws IOException {
super(is);
this.length = l;
}
@Override
public long getPos() throws IOException {
if (this.firstGetPosInvocation) {
this.firstGetPosInvocation = false;
// Tell a lie. We're doing this just so that this line up in
// SequenceFile.Reader constructor comes out with the correct length
// on the file:
// this.end = in.getPos() + length;
//
long available = this.in.available();
// Length gets added up in the SF.Reader constructor so subtract the
// difference. If available < this.length, then return this.length.
// I ain't sure what else to do.
return available >= this.length? available - this.length: this.length;
}
return super.getPos();
}
}
}
/**
* Get a Reader for WAL.
* Reader is a subclass of SequenceFile.Reader. The subclass has amendments
* to make it so we see edits up to the last sync (HDFS-265). Of note, we
* can only see up to the sync that happened before this file was opened.
* Will require us doing up our own WAL Reader if we want to keep up with
* a syncing Writer.
* @param p
* @return A WAL Reader. Close when done with it.
* Get a reader for the WAL.
* @param fs
* @param path
* @param keyClass
* @param valueClass
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
public static SequenceFile.Reader getReader(final FileSystem fs,
final Path p, final Configuration c)
@SuppressWarnings("unchecked")
public static Reader getReader(final FileSystem fs,
final Path path, HBaseConfiguration conf)
throws IOException {
return new WALReader(fs, p, c);
try {
Class c = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class.getCanonicalName()));
HLog.Reader reader = (HLog.Reader) c.newInstance();
reader.init(fs, path, conf);
return reader;
} catch (Exception e) {
IOException ie = new IOException("cannot get log reader");
ie.initCause(e);
throw ie;
}
}
protected SequenceFile.Writer createWriter(Path path,
Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
throws IOException {
SequenceFile.Writer writer =
SequenceFile.createWriter(this.fs, this.conf, path, keyClass,
valueClass, fs.getConf().getInt("io.file.buffer.size", 4096),
this.replicationLevel, this.blocksize,
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
new Metadata());
// Get at the private FSDataOutputStream inside in SequenceFile so we can
// call sync on it. Make it accessible. Stash it aside for call up in
// the sync method above.
final Field fields[] = writer.getClass().getDeclaredFields();
final String fieldName = "out";
for (int i = 0; i < fields.length; ++i) {
if (fieldName.equals(fields[i].getName())) {
/**
* Get a writer for the WAL.
* @param path
* @param keyClass
* @param valueClass
* @return A WAL writer. Close when done with it.
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static Writer createWriter(final FileSystem fs,
final Path path, HBaseConfiguration conf) throws IOException {
try {
fields[i].setAccessible(true);
this.writer_out = (FSDataOutputStream)fields[i].get(writer);
break;
} catch (IllegalAccessException ex) {
throw new IOException("Accessing " + fieldName, ex);
}
}
}
Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
SequenceFileLogWriter.class.getCanonicalName()));
HLog.Writer writer = (HLog.Writer) c.newInstance();
writer.init(fs, path, conf);
return writer;
} catch (Exception e) {
IOException ie = new IOException("cannot get log writer");
ie.initCause(e);
throw ie;
}
}
/*
@ -820,9 +760,6 @@ public class HLog implements HConstants, Syncable {
this.unflushedEntries.get() >= this.flushlogentries) {
try {
this.writer.sync();
if (this.writer_out != null) {
this.writer_out.sync();
}
this.forceSync = false;
this.unflushedEntries.set(0);
} catch (IOException e) {
@ -857,7 +794,7 @@ public class HLog implements HConstants, Syncable {
LOG.debug("edit=" + this.numEntries.get() + ", write=" +
logKey.toString());
}
this.writer.append(logKey, logEdit);
this.writer.append(new HLog.Entry(logKey, logEdit));
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
@ -936,8 +873,9 @@ public class HLog implements HConstants, Syncable {
return;
}
synchronized (updateLock) {
this.writer.append(makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
completeCacheFlushLogEdit());
this.writer.append(new HLog.Entry(
makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
completeCacheFlushLogEdit()));
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
@ -1018,20 +956,20 @@ public class HLog implements HConstants, Syncable {
// Private immutable datastructure to hold Writer and its Path.
private final static class WriterAndPath {
final Path p;
final SequenceFile.Writer w;
WriterAndPath(final Path p, final SequenceFile.Writer w) {
final Writer w;
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
}
@SuppressWarnings("unchecked")
static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>)
conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
}
public static HLogKey newKey(HBaseConfiguration conf) throws IOException {
public static HLogKey newKey(Configuration conf) throws IOException {
Class<? extends HLogKey> keyClass = getKeyClass(conf);
try {
return keyClass.newInstance();
@ -1072,8 +1010,8 @@ public class HLog implements HConstants, Syncable {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
for (int step = 0; step < maxSteps; step++) {
final Map<byte[], LinkedList<HLogEntry>> logEntries =
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
final Map<byte[], LinkedList<HLog.Entry>> logEntries =
new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
// Stop at logfiles.length when it's the last step
int endIndex = step == maxSteps - 1? logfiles.length:
step * concurrentLogReads + concurrentLogReads;
@ -1086,28 +1024,22 @@ public class HLog implements HConstants, Syncable {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
SequenceFile.Reader in = null;
Reader in = null;
int count = 0;
try {
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
try {
HLogKey key = newKey(conf);
KeyValue val = new KeyValue();
while (in.next(key, val)) {
byte [] regionName = key.getRegionName();
LinkedList<HLogEntry> queue = logEntries.get(regionName);
HLog.Entry entry;
while ((entry = in.next()) != null) {
byte [] regionName = entry.getKey().getRegionName();
LinkedList<HLog.Entry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLogEntry>();
queue = new LinkedList<HLog.Entry>();
LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName));
logEntries.put(regionName, queue);
}
HLogEntry hle = new HLogEntry(val, key);
queue.push(hle);
queue.push(entry);
count++;
// Make the key and value new each time; otherwise same instance
// is used over and over.
key = newKey(conf);
val = new KeyValue();
}
LOG.debug("Pushed=" + count + " entries from " +
logfiles[i].getPath());
@ -1148,17 +1080,17 @@ public class HLog implements HConstants, Syncable {
Thread thread = new Thread(Bytes.toStringBinary(key)) {
@Override
public void run() {
LinkedList<HLogEntry> entries = logEntries.get(key);
LinkedList<HLog.Entry> entries = logEntries.get(key);
LOG.debug("Thread got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int count = 0;
// Items were added to the linkedlist oldest first. Pull them
// out in that order.
for (ListIterator<HLogEntry> i =
for (ListIterator<HLog.Entry> i =
entries.listIterator(entries.size());
i.hasPrevious();) {
HLogEntry logEntry = i.previous();
HLog.Entry logEntry = i.previous();
WriterAndPath wap = logWriters.get(key);
if (wap == null) {
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
@ -1166,7 +1098,7 @@ public class HLog implements HConstants, Syncable {
HRegionInfo.encodeRegionName(key)),
HREGION_OLDLOGFILE_NAME);
Path oldlogfile = null;
SequenceFile.Reader old = null;
Reader old = null;
if (fs.exists(logfile)) {
FileStatus stat = fs.getFileStatus(logfile);
if (stat.getLen() <= 0) {
@ -1178,12 +1110,10 @@ public class HLog implements HConstants, Syncable {
"exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
old = new SequenceFile.Reader(fs, oldlogfile, conf);
old = getReader(fs, oldlogfile, conf);
}
}
SequenceFile.Writer w =
SequenceFile.createWriter(fs, conf, logfile,
getKeyClass(conf), KeyValue.class, getCompressionType(conf));
Writer w = createWriter(fs, logfile, conf);
wap = new WriterAndPath(logfile, w);
logWriters.put(key, wap);
if (LOG.isDebugEnabled()) {
@ -1193,20 +1123,19 @@ public class HLog implements HConstants, Syncable {
if (old != null) {
// Copy from existing log file
HLogKey oldkey = newKey(conf);
KeyValue oldval = new KeyValue();
for (; old.next(oldkey, oldval); count++) {
HLog.Entry entry;
for (; (entry = old.next()) != null; count++) {
if (LOG.isDebugEnabled() && count > 0
&& count % 10000 == 0) {
LOG.debug("Copied " + count + " edits");
}
w.append(oldkey, oldval);
w.append(entry);
}
old.close();
fs.delete(oldlogfile, true);
}
}
wap.w.append(logEntry.getKey(), logEntry.getEdit());
wap.w.append(logEntry);
count++;
}
if (LOG.isDebugEnabled()) {
@ -1249,18 +1178,24 @@ public class HLog implements HConstants, Syncable {
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
public static class HLogEntry {
public static class Entry {
private KeyValue edit;
private HLogKey key;
public Entry() {
edit = new KeyValue();
key = new HLogKey();
}
/**
* Constructor for both params
* @param edit log's edit
* @param key log's key
*/
public HLogEntry(KeyValue edit, HLogKey key) {
public Entry(HLogKey key, KeyValue edit) {
super();
this.edit = edit;
this.key = key;
this.edit = edit;
}
/**
* Gets the edit
@ -1360,12 +1295,11 @@ public class HLog implements HConstants, Syncable {
if (!fs.isFile(logPath)) {
throw new IOException(args[i] + " is not a file");
}
Reader log = new SequenceFile.Reader(fs, logPath, conf);
Reader log = getReader(fs, logPath, conf);
try {
HLogKey key = new HLogKey();
KeyValue val = new KeyValue();
while (log.next(key, val)) {
System.out.println(key.toString() + " " + val.toString());
HLog.Entry entry;
while ((entry = log.next()) != null) {
System.out.println(entry.toString());
}
} finally {
log.close();
@ -1383,15 +1317,4 @@ public class HLog implements HConstants, Syncable {
ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
static class HLogWriter extends SequenceFile.Writer {
public HLogWriter(FileSystem arg0, Configuration arg1, Path arg2,
Class<?> arg3, Class<?> arg4, int arg5, short arg6, long arg7,
Progressable arg8, Metadata arg9) throws IOException {
super(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
}
void flush() {
}
}
}

View File

@ -0,0 +1,110 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.io.SequenceFile;
public class SequenceFileLogReader implements HLog.Reader {
/**
* Hack just to set the correct file length up in SequenceFile.Reader.
* See HADOOP-6307. The below is all about setting the right length on the
* file we are reading. fs.getFileStatus(file).getLen() is passed down to
* a private SequenceFile.Reader constructor. This won't work. Need to do
* the available on the stream. The below is ugly. It makes getPos, the
* first time its called, return length of the file -- i.e. tell a lie -- just
* so this line up in SF.Reader's constructor ends up with right answer:
*
* this.end = in.getPos() + length;
*
*/
private static class WALReader extends SequenceFile.Reader {
WALReader(final FileSystem fs, final Path p, final Configuration c)
throws IOException {
super(fs, p, c);
}
@Override
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length)
throws IOException {
return new WALReaderFSDataInputStream(super.openFile(fs, file,
bufferSize, length), length);
}
/**
* Override just so can intercept first call to getPos.
*/
static class WALReaderFSDataInputStream extends FSDataInputStream {
private boolean firstGetPosInvocation = true;
private long length;
WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
throws IOException {
super(is);
this.length = l;
}
@Override
public long getPos() throws IOException {
if (this.firstGetPosInvocation) {
this.firstGetPosInvocation = false;
// Tell a lie. We're doing this just so that this line up in
// SequenceFile.Reader constructor comes out with the correct length
// on the file:
// this.end = in.getPos() + length;
long available = this.in.available();
// Length gets added up in the SF.Reader constructor so subtract the
// difference. If available < this.length, then return this.length.
return available >= this.length? available - this.length: this.length;
}
return super.getPos();
}
}
}
Configuration conf;
WALReader reader;
public SequenceFileLogReader() { }
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
this.conf = conf;
reader = new WALReader(fs, path, conf);
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public HLog.Entry next() throws IOException {
return next(null);
}
@Override
public HLog.Entry next(HLog.Entry reuse) throws IOException {
if (reuse == null) {
HLogKey key = HLog.newKey(conf);
KeyValue val = new KeyValue();
if (reader.next(key, val)) {
return new HLog.Entry(key, val);
}
} else if (reader.next(reuse.getKey(), reuse.getEdit())) {
return reuse;
}
return null;
}
}

View File

@ -0,0 +1,74 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.DefaultCodec;
public class SequenceFileLogWriter implements HLog.Writer {
SequenceFile.Writer writer;
FSDataOutputStream writer_out;
public SequenceFileLogWriter() { }
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), KeyValue.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),
conf.getLong("hbase.regionserver.hlog.blocksize",
fs.getDefaultBlockSize()),
SequenceFile.CompressionType.NONE,
new DefaultCodec(),
null,
new Metadata());
// Get at the private FSDataOutputStream inside in SequenceFile so we can
// call sync on it. Make it accessible. Stash it aside for call up in
// the sync method.
final Field fields[] = writer.getClass().getDeclaredFields();
final String fieldName = "out";
for (int i = 0; i < fields.length; ++i) {
if (fieldName.equals(fields[i].getName())) {
try {
fields[i].setAccessible(true);
this.writer_out = (FSDataOutputStream)fields[i].get(writer);
break;
} catch (IllegalAccessException ex) {
throw new IOException("Accessing " + fieldName, ex);
}
}
}
}
@Override
public void append(HLog.Entry entry) throws IOException {
this.writer.append(entry.getKey(), entry.getEdit());
}
@Override
public void close() throws IOException {
this.writer.close();
}
@Override
public void sync() throws IOException {
this.writer.sync();
if (this.writer_out != null) {
this.writer_out.sync();
}
}
}

View File

@ -33,9 +33,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
@ -139,10 +136,10 @@ public class TestHLog extends HBaseTestCase implements HConstants {
wal.sync();
// Open a Reader.
Path walPath = wal.computeFilename(wal.getFilenum());
SequenceFile.Reader reader = HLog.getReader(this.fs, walPath, this.conf);
HLog.Reader reader = HLog.getReader(fs, walPath, conf);
int count = 0;
HLogKey key = new HLogKey();
while(reader.next(key)) count++;
HLog.Entry entry = new HLog.Entry();
while ((entry = reader.next(entry)) != null) count++;
assertEquals(total, count);
reader.close();
// Add test that checks to see that an open of a Reader works on a file
@ -152,16 +149,16 @@ public class TestHLog extends HBaseTestCase implements HConstants {
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(bytes, bytes, kvs, System.currentTimeMillis());
}
reader = HLog.getReader(this.fs, walPath, this.conf);
reader = HLog.getReader(fs, walPath, conf);
count = 0;
while(reader.next(key)) count++;
while((entry = reader.next(entry)) != null) count++;
assertTrue(count >= total);
reader.close();
// If I sync, should see double the edits.
wal.sync();
reader = HLog.getReader(this.fs, walPath, this.conf);
reader = HLog.getReader(fs, walPath, conf);
count = 0;
while(reader.next(key)) count++;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 2, count);
// Now do a test that ensures stuff works when we go over block boundary,
// especially that we return good length on file.
@ -173,16 +170,16 @@ public class TestHLog extends HBaseTestCase implements HConstants {
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
reader = HLog.getReader(this.fs, walPath, this.conf);
reader = HLog.getReader(fs, walPath, conf);
count = 0;
while(reader.next(key)) count++;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
// Close it and ensure that closed, Reader gets right length also.
wal.close();
reader = HLog.getReader(this.fs, walPath, this.conf);
reader = HLog.getReader(fs, walPath, conf);
count = 0;
while(reader.next(key)) count++;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
}
@ -191,14 +188,15 @@ public class TestHLog extends HBaseTestCase implements HConstants {
throws IOException {
assertEquals(howmany, splits.size());
for (int i = 0; i < splits.size(); i++) {
SequenceFile.Reader r = HLog.getReader(this.fs, splits.get(i), this.conf);
HLog.Reader reader = HLog.getReader(this.fs, splits.get(i), conf);
try {
HLogKey key = new HLogKey();
KeyValue kv = new KeyValue();
int count = 0;
String previousRegion = null;
long seqno = -1;
while(r.next(key, kv)) {
HLog.Entry entry = new HLog.Entry();
while((entry = reader.next(entry)) != null) {
HLogKey key = entry.getKey();
KeyValue kv = entry.getEdit();
String region = Bytes.toString(key.getRegionName());
// Assert that all edits are for same region.
if (previousRegion != null) {
@ -212,7 +210,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
}
assertEquals(howmany * howmany, count);
} finally {
r.close();
reader.close();
}
}
}
@ -226,7 +224,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
final byte [] regionName = Bytes.toBytes("regionname");
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
Reader reader = null;
HLog.Reader reader = null;
HLog log = new HLog(fs, dir, this.conf, null);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
@ -246,17 +244,20 @@ public class TestHLog extends HBaseTestCase implements HConstants {
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
HLogKey key = new HLogKey();
KeyValue val = new KeyValue();
HLog.Entry entry = new HLog.Entry();
for (int i = 0; i < COL_COUNT; i++) {
reader.next(key, val);
reader.next(entry);
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(i + '0'), val.getValue()[0]);
System.out.println(key + " " + val);
}
while (reader.next(key, val)) {
while ((entry = reader.next(null)) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
// Assert only one more row... the meta flushed row.
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));