HBASE-2544 Forward port branch 0.20 WAL to TRUNK

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944063 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-05-13 23:59:05 +00:00
parent eb11dd6c82
commit 635768d28a
5 changed files with 527 additions and 173 deletions

View File

@ -308,6 +308,7 @@ Release 0.21.0 - Unreleased
HBASE-2431 Master does not respect generation stamps, may result in meta
getting permanently offlined
HBASE-2515 ChangeTableState considers split&&offline regions as being served
HBASE-2544 Forward port branch 0.20 WAL to TRUNK
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -1,5 +1,5 @@
/**
* Copyright 2007 The Apache Software Foundation
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -19,34 +19,14 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@ -66,6 +46,32 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
* implementation.
@ -120,20 +126,21 @@ public class HLog implements HConstants, Syncable {
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private final Path oldLogDir;
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
private int initialReplication; // initial replication factor of SequenceFile.writer
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
void close() throws IOException;
Entry next() throws IOException;
Entry next(Entry reuse) throws IOException;
void seek(long pos) throws IOException;
long getPosition() throws IOException;
}
public interface Writer {
@ -144,9 +151,6 @@ public class HLog implements HConstants, Syncable {
long getLength() throws IOException;
}
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
/*
* Current log file.
*/
@ -168,11 +172,14 @@ public class HLog implements HConstants, Syncable {
private final AtomicLong logSeqNum = new AtomicLong(0);
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
//number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
// If > than this size, roll the log.
// If > than this size, roll the log. This is typically 0.95 times the size
// of the default Hdfs block size.
private final long logrollsize;
// This lock prevents starting a log roll during a cache flush.
@ -272,7 +279,7 @@ public class HLog implements HConstants, Syncable {
}
fs.mkdirs(dir);
this.oldLogDir = oldLogDir;
if(!fs.exists(oldLogDir)) {
if (!fs.exists(oldLogDir)) {
fs.mkdirs(this.oldLogDir);
}
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
@ -282,7 +289,29 @@ public class HLog implements HConstants, Syncable {
", enabled=" + this.enabled +
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
// rollWriter sets this.hdfs_out if it can.
rollWriter();
// handle the reflection necessary to call getNumCurrentReplicas()
this.getNumCurrentReplicas = null;
if(this.hdfs_out != null) {
try {
this.getNumCurrentReplicas = this.hdfs_out.getClass().
getMethod("getNumCurrentReplicas", new Class<?> []{});
this.getNumCurrentReplicas.setAccessible(true);
} catch (NoSuchMethodException e) {
// Thrown if getNumCurrentReplicas() function isn't available
} catch (SecurityException e) {
// Thrown if we can't get access to getNumCurrentReplicas()
this.getNumCurrentReplicas = null; // could happen on setAccessible()
}
}
if(this.getNumCurrentReplicas != null) {
LOG.info("Using getNumCurrentReplicas--HDFS-826");
} else {
LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
}
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
Threads.setDaemonThreadRunning(logSyncerThread,
Thread.currentThread().getName() + ".logSyncer");
@ -355,6 +384,17 @@ public class HLog implements HConstants, Syncable {
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
this.initialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
this.hdfs_out = null;
if (this.writer instanceof SequenceFileLogWriter) {
this.hdfs_out =
((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
}
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
@ -628,7 +668,7 @@ public class HLog implements HConstants, Syncable {
throws IOException {
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion);
this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
}
/**
@ -650,8 +690,7 @@ public class HLog implements HConstants, Syncable {
* @param logKey
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
final boolean isMetaRegion)
public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@ -672,7 +711,7 @@ public class HLog implements HConstants, Syncable {
}
// sync txn to file system
this.sync(isMetaRegion);
this.sync(regionInfo.isMetaRegion());
}
/**
@ -705,15 +744,14 @@ public class HLog implements HConstants, Syncable {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
long seqNum = obtainSeqNum();
synchronized (this.updateLock) {
long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, seqNum);
int counter = 0;
HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
doWrite(info, logKey, edits);
this.numEntries.incrementAndGet();
@ -799,10 +837,6 @@ public class HLog implements HConstants, Syncable {
}
lock.lock();
try {
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;
}
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;
@ -852,7 +886,24 @@ public class HLog implements HConstants, Syncable {
syncOps++;
this.forceSync = false;
this.unflushedEntries.set(0);
// TODO: HBASE-2401
// if the number of replicas in HDFS has fallen below the initial
// value, then roll logs.
try {
int numCurrentReplicas = getLogReplication();
if (numCurrentReplicas != 0 &&
numCurrentReplicas < this.initialReplication) {
LOG.warn("HDFS pipeline error detected. " +
"Found " + numCurrentReplicas + " replicas but expecting " +
this.initialReplication + " replicas. " +
" Requesting close of hlog.");
requestLogRoll();
logRollRequested = true;
}
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
" still proceeding ahead...");
}
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
@ -866,6 +917,29 @@ public class HLog implements HConstants, Syncable {
}
}
/**
* This method gets the datanode replication count for the current HLog.
*
* If the pipeline isn't started yet or is empty, you will get the default
* replication factor. Therefore, if this function returns 0, it means you
* are not properly running with the HDFS-826 patch.
*
* @throws Exception
*/
int getLogReplication() throws Exception {
if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
if (repl instanceof Integer) {
return ((Integer)repl).intValue();
}
}
return 0;
}
boolean canGetCurReplicas() {
return this.getNumCurrentReplicas != null;
}
public void hsync() throws IOException {
// Not yet implemented up in hdfs so just call hflush.
hflush();
@ -916,20 +990,6 @@ public class HLog implements HConstants, Syncable {
return outputfiles.size();
}
/*
* Obtain a specified number of sequence numbers
*
* @param num number of sequence numbers to obtain
* @return array of sequence numbers
*/
private long [] obtainSeqNum(int num) {
long [] results = new long[num];
for (int i = 0; i < num; i++) {
results[i] = this.logSeqNum.incrementAndGet();
}
return results;
}
/**
* By acquiring a log sequence ID, we can allow log messages to continue while
* we flush the cache.
@ -968,10 +1028,10 @@ public class HLog implements HConstants, Syncable {
}
synchronized (updateLock) {
long now = System.currentTimeMillis();
WALEdit edits = completeCacheFlushLogEdit();
this.writer.append(new HLog.Entry(
makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
edits));
WALEdit edit = completeCacheFlushLogEdit();
HLogKey key = makeKey(regionName, tableName, logSeqId,
System.currentTimeMillis());
this.writer.append(new Entry(key, edit));
writeTime += System.currentTimeMillis() - now;
writeOps++;
this.numEntries.incrementAndGet();
@ -1024,11 +1084,12 @@ public class HLog implements HConstants, Syncable {
* <code>${ROOTDIR}/log_HOST_PORT</code>
* @param oldLogDir
* @param fs FileSystem
* @param conf HBaseConfiguration
* @param conf Configuration
* @throws IOException
*/
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException {
Path oldLogDir, final FileSystem fs, final Configuration conf)
throws IOException {
long millis = System.currentTimeMillis();
List<Path> splits = null;
@ -1104,7 +1165,8 @@ public class HLog implements HConstants, Syncable {
*/
private static List<Path> splitLog(final Path rootDir,
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
final Configuration conf) throws IOException {
final Configuration conf)
throws IOException {
final Map<byte [], WriterAndPath> logWriters =
Collections.synchronizedMap(
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
@ -1115,35 +1177,51 @@ public class HLog implements HConstants, Syncable {
int logWriterThreads =
conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
// Number of logs to read concurrently when log splitting.
// More means faster but bigger mem consumption */
int concurrentLogReads =
// Number of logs to read into memory before writing to their appropriate
// regions when log splitting. More means faster but bigger mem consumption
int logFilesPerStep =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
// append support = we can avoid data loss (yay)
// we open for append, then close to recover the correct file length
final boolean appendSupport = isAppend(conf);
// store corrupt logs for post-mortem analysis (empty string = discard)
final String corruptDir =
conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt");
List<Path> finishedFiles = new LinkedList<Path>();
List<Path> corruptFiles = new LinkedList<Path>();
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
logFilesPerStep)).intValue();
for (int step = 0; step < maxSteps; step++) {
// Step 1: read N log files into memory
final Map<byte[], LinkedList<HLog.Entry>> logEntries =
new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
// Stop at logfiles.length when it's the last step
int endIndex = step == maxSteps - 1? logfiles.length:
step * concurrentLogReads + concurrentLogReads;
for (int i = (step * concurrentLogReads); i < endIndex; i++) {
// Check for possibly empty file. With appends, currently Hadoop
// reports a zero length even if the file has been sync'd. Revisit if
// HADOOP-4751 is committed.
long length = logfiles[i].getLen();
step * logFilesPerStep + logFilesPerStep;
for (int i = (step * logFilesPerStep); i < endIndex; i++) {
Path curLogFile = logfiles[i].getPath();
// make sure we get the right file length before opening for read
recoverLog(fs, curLogFile, appendSupport);
long length = fs.getFileStatus(curLogFile).getLen();
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
": " + curLogFile + ", length=" + length);
}
Reader in = null;
boolean cleanRead = false;
int count = 0;
try {
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
in = HLog.getReader(fs, curLogFile, conf);
try {
HLog.Entry entry;
Entry entry;
while ((entry = in.next()) != null) {
byte [] regionName = entry.getKey().getRegionName();
LinkedList<HLog.Entry> queue = logEntries.get(regionName);
@ -1155,20 +1233,24 @@ public class HLog implements HConstants, Syncable {
queue.push(entry);
count++;
}
LOG.debug("Pushed=" + count + " entries from " +
logfiles[i].getPath());
LOG.debug("Pushed=" + count + " entries from " + curLogFile);
cleanRead = true;
} catch (IOException e) {
LOG.debug("IOE Pushed=" + count + " entries from " +
logfiles[i].getPath());
LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile);
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
LOG.warn("Exception processing " + logfiles[i].getPath() +
" -- continuing. Possible DATA LOSS!", e);
String msg = "Exception processing " + curLogFile +
" -- continuing. Possible DATA LOSS!";
if (corruptDir.length() > 0) {
msg += " Storing in hlog corruption directory.";
}
LOG.warn(msg, e);
}
}
} catch (IOException e) {
if (length <= 0) {
LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
cleanRead = true;
continue;
}
throw e;
@ -1178,8 +1260,16 @@ public class HLog implements HConstants, Syncable {
in.close();
}
} catch (IOException e) {
LOG.warn("Close in finally threw exception -- continuing", e);
LOG.warn("File.close() threw exception -- continuing, "
+ "but marking file as corrupt.", e);
cleanRead = false;
}
if (cleanRead) {
finishedFiles.add(curLogFile);
} else {
corruptFiles.add(curLogFile);
}
/* TODO FOR J-D REVIEW
// Archive the input file now so we do not replay edits. We could
// have gotten here because of an exception. If so, probably
// nothing we can do about it. Replaying it, it could work but we
@ -1187,85 +1277,89 @@ public class HLog implements HConstants, Syncable {
// could have lost some edits.
fs.rename(logfiles[i].getPath(),
getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
*/
}
}
// Step 2: Some regionserver log files have been read into memory.
// Assign them to the appropriate region directory.
class ThreadWithException extends Thread {
ThreadWithException(String name) { super(name); }
public IOException exception = null;
}
List<ThreadWithException> threadList =
new ArrayList<ThreadWithException>(logEntries.size());
ExecutorService threadPool =
Executors.newFixedThreadPool(logWriterThreads);
for (final byte[] key : logEntries.keySet()) {
Thread thread = new Thread(Bytes.toStringBinary(key)) {
for (final byte [] region: logEntries.keySet()) {
ThreadWithException thread =
new ThreadWithException(Bytes.toStringBinary(region)) {
@Override
public void run() {
LinkedList<HLog.Entry> entries = logEntries.get(key);
LinkedList<HLog.Entry> entries = logEntries.get(region);
LOG.debug("Thread got " + entries.size() + " to process");
if(entries.size() <= 0) {
LOG.warn("Got a region with no entries to process.");
return;
}
long threadTime = System.currentTimeMillis();
try {
int count = 0;
// get the logfile associated with this region. 2 logs often
// write to the same region, so persist this info across logs
WriterAndPath wap = logWriters.get(region);
if (wap == null) {
// first write to this region, make new logfile
assert entries.size() > 0;
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootDir,
entries.getFirst().getKey().getTablename()),
HRegionInfo.encodeRegionName(region)),
HREGION_OLDLOGFILE_NAME);
// If splitLog() was running when the user restarted his
// cluster, then we could already have a 'logfile'.
// Since we don't delete logs until everything is written to
// their respective regions, we can safely remove this tmp.
if (fs.exists(logfile)) {
LOG.warn("Deleting old hlog file: " + logfile);
// TODO: Archive?
fs.delete(logfile, true);
}
// associate an OutputStream with this logfile
Writer w = createWriter(fs, logfile, conf);
wap = new WriterAndPath(logfile, w);
logWriters.put(region, wap);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toStringBinary(region));
}
}
// Items were added to the linkedlist oldest first. Pull them
// out in that order.
for (ListIterator<HLog.Entry> i =
entries.listIterator(entries.size());
for (ListIterator<HLog.Entry> i = entries.listIterator(entries.size());
i.hasPrevious();) {
HLog.Entry logEntry = i.previous();
WriterAndPath wap = logWriters.get(key);
if (wap == null) {
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootDir, logEntry.getKey().getTablename()),
HRegionInfo.encodeRegionName(key)),
HREGION_OLDLOGFILE_NAME);
Path oldlogfile = null;
Reader old = null;
if (fs.exists(logfile)) {
FileStatus stat = fs.getFileStatus(logfile);
if (stat.getLen() <= 0) {
LOG.warn("Old hlog file " + logfile + " is zero " +
"length. Deleting existing file");
fs.delete(logfile, false);
} else {
LOG.warn("Old hlog file " + logfile + " already " +
"exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
old = getReader(fs, oldlogfile, conf);
}
}
Writer w = createWriter(fs, logfile, conf);
wap = new WriterAndPath(logfile, w);
logWriters.put(key, wap);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toStringBinary(key));
}
if (old != null) {
// Copy from existing log file
HLog.Entry entry;
for (; (entry = old.next()) != null; count++) {
if (LOG.isDebugEnabled() && count > 0
&& count % 10000 == 0) {
LOG.debug("Copied " + count + " edits");
}
w.append(entry);
}
old.close();
fs.delete(oldlogfile, true);
}
}
wap.w.append(logEntry);
wap.w.append(i.previous());
count++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits to "
+ Bytes.toStringBinary(key) + " in "
+ Bytes.toStringBinary(region) + " in "
+ (System.currentTimeMillis() - threadTime) + "ms");
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Got while writing region " + Bytes.toStringBinary(key)
+ " log " + e);
LOG.warn("Got while writing region "
+ Bytes.toStringBinary(region) + " log " + e);
e.printStackTrace();
exception = e;
}
}
};
threadList.add(thread);
threadPool.execute(thread);
}
threadPool.shutdown();
@ -1274,9 +1368,19 @@ public class HLog implements HConstants, Syncable {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
}catch(InterruptedException ex) {
LOG.warn("Hlog writers were interrupted, possible data loss!");
} catch (InterruptedException ex) {
LOG.warn("Hlog writers were interrupted during splitLog(). "
+"Retaining log files to avoid data loss.");
throw new IOException(ex.getMessage(), ex.getCause());
}
// throw an exception if one of the threads reported one
for (ThreadWithException t : threadList) {
if (t.exception != null) {
throw t.exception;
}
}
// End of for loop. Rinse and repeat
}
} finally {
splits = new ArrayList<Path>(logWriters.size());
@ -1286,9 +1390,79 @@ public class HLog implements HConstants, Syncable {
splits.add(wap.p);
}
}
// Step 3: All writes succeeded! Get rid of the now-unnecessary logs
for(Path p : finishedFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully split Hlog file. Deleting " + p);
}
try {
if (!fs.delete(p, true) && LOG.isDebugEnabled()) {
LOG.debug("Delete of split Hlog (" + p + ") failed.");
}
} catch (IOException e) {
// don't throw error here. worst case = double-read
LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e);
}
}
for (Path p : corruptFiles) {
if (corruptDir.length() > 0) {
// store any corrupt logs for later analysis
Path cp = new Path(conf.get(HBASE_DIR), corruptDir);
if(!fs.exists(cp)) {
fs.mkdirs(cp);
}
Path newp = new Path(cp, p.getName());
if (!fs.exists(newp)) {
if (!fs.rename(p, newp)) {
LOG.warn("Rename of " + p + " to " + newp + " failed.");
} else {
LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp);
}
} else {
LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp +
". Ignoring");
}
} else {
// data loss is less important than disk space, delete
try {
if (!fs.delete(p, true) ) {
if (LOG.isDebugEnabled()) {
LOG.debug("Delete of split Hlog " + p + " failed.");
}
} else {
LOG.warn("Corrupt Hlog (" + p + ") deleted!");
}
} catch (IOException e) {
LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e);
}
}
}
return splits;
}
/*
* @param conf
* @return True if append enabled and we have the syncFs in our path.
*/
static boolean isAppend(final Configuration conf) {
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
// TODO: The implementation that comes back when we do a createWriter
// may not be using SequenceFile so the below is not a definitive test.
// Will do for now (hdfs-200).
SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
append = true;
} catch (SecurityException e) {
} catch (NoSuchMethodException e) {
append = false;
}
}
return append;
}
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
@ -1345,6 +1519,64 @@ public class HLog implements HConstants, Syncable {
}
}
/*
* Recover log.
* Try and open log in append mode.
* Doing this, we get a hold of the file that crashed writer
* was writing to. Once we have it, close it. This will
* allow subsequent reader to see up to last sync.
* @param fs
* @param p
* @param append
*/
public static void recoverLog(final FileSystem fs, final Path p,
final boolean append) throws IOException {
if (!append) {
return;
}
// lease recovery not needed for local file system case.
// currently, local file system doesn't implement append either.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
LOG.debug("Recovering DFS lease for path " + p);
long startWaiting = System.currentTimeMillis();
// Trying recovery
boolean recovered = false;
while (!recovered) {
try {
FSDataOutputStream out = fs.append(p);
out.close();
recovered = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (e instanceof AlreadyBeingCreatedException) {
// We expect that we'll get this message while the lease is still
// within its soft limit, but if we get it past that, it means
// that the RS is holding onto the file even though it lost its
// znode. We could potentially abort after some time here.
long waitedFor = System.currentTimeMillis() - startWaiting;
if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p
+ ":" + e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
} else {
throw new IOException("Failed to open " + p + " for append", e);
}
}
}
LOG.info("Past out lease recovery");
}
/**
* Construct the HLog directory name
*

View File

@ -21,29 +21,42 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.DefaultCodec;
/**
* Implementation of {@link HLog.Writer} that delegates to
* {@link SequenceFile.Writer}.
*/
public class SequenceFileLogWriter implements HLog.Writer {
private final Log LOG = LogFactory.getLog(this.getClass());
// The sequence file we delegate to.
private SequenceFile.Writer writer;
// The dfsclient out stream gotten made accessible or null if not available.
private OutputStream dfsClient_out;
// The syncFs method from hdfs-200 or null if not available.
private Method syncFs;
SequenceFile.Writer writer;
FSDataOutputStream writer_out;
public SequenceFileLogWriter() { }
public SequenceFileLogWriter() {
super();
}
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
writer = SequenceFile.createWriter(fs, conf, path,
// Create a SF.Writer instance.
this.writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
@ -58,19 +71,39 @@ public class SequenceFileLogWriter implements HLog.Writer {
// Get at the private FSDataOutputStream inside in SequenceFile so we can
// call sync on it. Make it accessible. Stash it aside for call up in
// the sync method.
final Field fields[] = writer.getClass().getDeclaredFields();
final Field fields [] = this.writer.getClass().getDeclaredFields();
final String fieldName = "out";
for (int i = 0; i < fields.length; ++i) {
if (fieldName.equals(fields[i].getName())) {
try {
// Make the 'out' field up in SF.Writer accessible.
fields[i].setAccessible(true);
this.writer_out = (FSDataOutputStream)fields[i].get(writer);
FSDataOutputStream out =
(FSDataOutputStream)fields[i].get(this.writer);
this.dfsClient_out = out.getWrappedStream();
break;
} catch (IllegalAccessException ex) {
throw new IOException("Accessing " + fieldName, ex);
}
}
}
// Now do dirty work to see if syncFs is available.
// Test if syncfs is available.
Method m = null;
if (conf.getBoolean("dfs.support.append", false)) {
try {
// function pointer to writer.syncFs()
m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
} catch (SecurityException e) {
throw new IOException("Failed test for syncfs", e);
} catch (NoSuchMethodException e) {
// Not available
}
}
this.syncFs = m;
LOG.info((this.syncFs != null)?
"Using syncFs -- HDFS-200": "syncFs -- HDFS-200 -- not available");
}
@Override
@ -86,12 +119,25 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override
public void sync() throws IOException {
this.writer.sync();
this.writer.syncFs();
if (this.syncFs != null) {
try {
this.syncFs.invoke(this.writer, HLog.NO_ARGS);
} catch (Exception e) {
throw new IOException("Reflection", e);
}
}
}
@Override
public long getLength() throws IOException {
return this.writer.getLength();
}
/**
* @return The dfsclient out stream up inside SF.Writer made accessible, or
* null if not available.
*/
public OutputStream getDFSCOutputStream() {
return this.dfsClient_out;
}
}

View File

@ -28,15 +28,14 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.security.UnixUserGroupInformation;
@ -129,6 +128,7 @@ public class MiniHBaseCluster implements HConstants {
*/
public static class MiniHBaseClusterRegionServer extends HRegionServer {
private static int index = 0;
private Thread shutdownThread = null;
public MiniHBaseClusterRegionServer(Configuration conf)
throws IOException {
@ -159,11 +159,20 @@ public class MiniHBaseCluster implements HConstants {
@Override
protected void init(MapWritable c) throws IOException {
super.init(c);
// Change shutdown hook to only shutdown the FileSystem added above by
// {@link #getFileSystem(HBaseConfiguration)
if (getFileSystem() instanceof DistributedFileSystem) {
Thread t = new SingleFileSystemShutdownThread(getFileSystem());
Runtime.getRuntime().addShutdownHook(t);
// Run this thread to shutdown our filesystem on way out.
this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
}
@Override
public void run() {
try {
super.run();
} finally {
// Run this on the way out.
if (this.shutdownThread != null) {
this.shutdownThread.start();
Threads.shutdown(this.shutdownThread, 30000);
}
}
}

View File

@ -19,6 +19,11 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@ -27,15 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
private Path dir;
@ -322,4 +322,70 @@ public class TestHLog extends HBaseTestCase implements HConstants {
}
}
}
/**
* @throws IOException
*/
public void testAppend() throws IOException {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
this.conf.setBoolean("dfs.support.append", true);
Reader reader = null;
HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
for (int i = 0; i < COL_COUNT; i++) {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
log.append(hri, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
log.close();
Path filename = log.computeFilename(log.getFilenum());
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
HLog.Entry entry = reader.next();
assertEquals(COL_COUNT, entry.getEdit().size());
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
assertTrue(Bytes.equals(hri.getRegionName(),
entry.getKey().getRegionName()));
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(idx + '0'), val.getValue()[0]);
System.out.println(entry.getKey() + " " + val);
idx++;
}
// Get next row... the meta flushed row.
entry = reader.next();
assertEquals(1, entry.getEdit().size());
for (KeyValue val : entry.getEdit().getKeyValues()) {
assertTrue(Bytes.equals(hri.getRegionName(),
entry.getKey().getRegionName()));
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
val.getValue()));
System.out.println(entry.getKey() + " " + val);
}
} finally {
if (log != null) {
log.closeAndDelete();
}
if (reader != null) {
reader.close();
}
}
}
}