diff --git a/CHANGES.txt b/CHANGES.txt
index 4e30a1e3b89..05f422def50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -308,6 +308,7 @@ Release 0.21.0 - Unreleased
HBASE-2431 Master does not respect generation stamps, may result in meta
getting permanently offlined
HBASE-2515 ChangeTableState considers split&&offline regions as being served
+ HBASE-2544 Forward port branch 0.20 WAL to TRUNK
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index a0132116d85..6f46bab19ed 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,34 +19,14 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.Writable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -66,6 +46,32 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
* implementation.
@@ -120,20 +126,21 @@ public class HLog implements HConstants, Syncable {
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private final Path oldLogDir;
+ private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
+ private int initialReplication; // initial replication factor of SequenceFile.writer
+ private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
+ final static Object [] NO_ARGS = new Object []{};
+
+ // used to indirectly tell syncFs to force the sync
+ private boolean forceSync = false;
+
public interface Reader {
-
void init(FileSystem fs, Path path, Configuration c) throws IOException;
-
void close() throws IOException;
-
Entry next() throws IOException;
-
Entry next(Entry reuse) throws IOException;
-
void seek(long pos) throws IOException;
-
long getPosition() throws IOException;
-
}
public interface Writer {
@@ -144,9 +151,6 @@ public class HLog implements HConstants, Syncable {
long getLength() throws IOException;
}
- // used to indirectly tell syncFs to force the sync
- private boolean forceSync = false;
-
/*
* Current log file.
*/
@@ -168,11 +172,14 @@ public class HLog implements HConstants, Syncable {
private final AtomicLong logSeqNum = new AtomicLong(0);
+ // The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
+ //number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
- // If > than this size, roll the log.
+ // If > than this size, roll the log. This is typically 0.95 times the size
+ // of the default Hdfs block size.
private final long logrollsize;
// This lock prevents starting a log roll during a cache flush.
@@ -272,7 +279,7 @@ public class HLog implements HConstants, Syncable {
}
fs.mkdirs(dir);
this.oldLogDir = oldLogDir;
- if(!fs.exists(oldLogDir)) {
+ if (!fs.exists(oldLogDir)) {
fs.mkdirs(this.oldLogDir);
}
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
@@ -282,7 +289,29 @@ public class HLog implements HConstants, Syncable {
", enabled=" + this.enabled +
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
+ // rollWriter sets this.hdfs_out if it can.
rollWriter();
+
+ // handle the reflection necessary to call getNumCurrentReplicas()
+ this.getNumCurrentReplicas = null;
+ if(this.hdfs_out != null) {
+ try {
+ this.getNumCurrentReplicas = this.hdfs_out.getClass().
+ getMethod("getNumCurrentReplicas", new Class> []{});
+ this.getNumCurrentReplicas.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ // Thrown if getNumCurrentReplicas() function isn't available
+ } catch (SecurityException e) {
+ // Thrown if we can't get access to getNumCurrentReplicas()
+ this.getNumCurrentReplicas = null; // could happen on setAccessible()
+ }
+ }
+ if(this.getNumCurrentReplicas != null) {
+ LOG.info("Using getNumCurrentReplicas--HDFS-826");
+ } else {
+ LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
+ }
+
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
Threads.setDaemonThreadRunning(logSyncerThread,
Thread.currentThread().getName() + ".logSyncer");
@@ -355,6 +384,17 @@ public class HLog implements HConstants, Syncable {
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
+ this.initialReplication = fs.getFileStatus(newPath).getReplication();
+
+ // Can we get at the dfsclient outputstream? If an instance of
+ // SFLW, it'll have done the necessary reflection to get at the
+ // protected field name.
+ this.hdfs_out = null;
+ if (this.writer instanceof SequenceFileLogWriter) {
+ this.hdfs_out =
+ ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
+ }
+
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
@@ -628,7 +668,7 @@ public class HLog implements HConstants, Syncable {
throws IOException {
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
- this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion);
+ this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
}
/**
@@ -650,8 +690,7 @@ public class HLog implements HConstants, Syncable {
* @param logKey
* @throws IOException
*/
- public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
- final boolean isMetaRegion)
+ public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@@ -672,7 +711,7 @@ public class HLog implements HConstants, Syncable {
}
// sync txn to file system
- this.sync(isMetaRegion);
+ this.sync(regionInfo.isMetaRegion());
}
/**
@@ -705,15 +744,14 @@ public class HLog implements HConstants, Syncable {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
- long seqNum = obtainSeqNum();
synchronized (this.updateLock) {
+ long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, seqNum);
- int counter = 0;
HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
doWrite(info, logKey, edits);
this.numEntries.incrementAndGet();
@@ -799,10 +837,6 @@ public class HLog implements HConstants, Syncable {
}
lock.lock();
try {
- if (syncerShuttingDown) {
- LOG.warn(getName() + " was shut down while waiting for sync");
- return;
- }
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;
@@ -852,7 +886,24 @@ public class HLog implements HConstants, Syncable {
syncOps++;
this.forceSync = false;
this.unflushedEntries.set(0);
- // TODO: HBASE-2401
+
+ // if the number of replicas in HDFS has fallen below the initial
+ // value, then roll logs.
+ try {
+ int numCurrentReplicas = getLogReplication();
+ if (numCurrentReplicas != 0 &&
+ numCurrentReplicas < this.initialReplication) {
+ LOG.warn("HDFS pipeline error detected. " +
+ "Found " + numCurrentReplicas + " replicas but expecting " +
+ this.initialReplication + " replicas. " +
+ " Requesting close of hlog.");
+ requestLogRoll();
+ logRollRequested = true;
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+ " still proceeding ahead...");
+ }
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
@@ -866,6 +917,29 @@ public class HLog implements HConstants, Syncable {
}
}
+ /**
+ * This method gets the datanode replication count for the current HLog.
+ *
+ * If the pipeline isn't started yet or is empty, you will get the default
+ * replication factor. Therefore, if this function returns 0, it means you
+ * are not properly running with the HDFS-826 patch.
+ *
+ * @throws Exception
+ */
+ int getLogReplication() throws Exception {
+ if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
+ Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
+ if (repl instanceof Integer) {
+ return ((Integer)repl).intValue();
+ }
+ }
+ return 0;
+ }
+
+ boolean canGetCurReplicas() {
+ return this.getNumCurrentReplicas != null;
+ }
+
public void hsync() throws IOException {
// Not yet implemented up in hdfs so just call hflush.
hflush();
@@ -916,20 +990,6 @@ public class HLog implements HConstants, Syncable {
return outputfiles.size();
}
- /*
- * Obtain a specified number of sequence numbers
- *
- * @param num number of sequence numbers to obtain
- * @return array of sequence numbers
- */
- private long [] obtainSeqNum(int num) {
- long [] results = new long[num];
- for (int i = 0; i < num; i++) {
- results[i] = this.logSeqNum.incrementAndGet();
- }
- return results;
- }
-
/**
* By acquiring a log sequence ID, we can allow log messages to continue while
* we flush the cache.
@@ -968,10 +1028,10 @@ public class HLog implements HConstants, Syncable {
}
synchronized (updateLock) {
long now = System.currentTimeMillis();
- WALEdit edits = completeCacheFlushLogEdit();
- this.writer.append(new HLog.Entry(
- makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
- edits));
+ WALEdit edit = completeCacheFlushLogEdit();
+ HLogKey key = makeKey(regionName, tableName, logSeqId,
+ System.currentTimeMillis());
+ this.writer.append(new Entry(key, edit));
writeTime += System.currentTimeMillis() - now;
writeOps++;
this.numEntries.incrementAndGet();
@@ -1024,11 +1084,12 @@ public class HLog implements HConstants, Syncable {
* ${ROOTDIR}/log_HOST_PORT
* @param oldLogDir
* @param fs FileSystem
- * @param conf HBaseConfiguration
+ * @param conf Configuration
* @throws IOException
*/
public static List splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException {
+ Path oldLogDir, final FileSystem fs, final Configuration conf)
+ throws IOException {
long millis = System.currentTimeMillis();
List splits = null;
@@ -1104,7 +1165,8 @@ public class HLog implements HConstants, Syncable {
*/
private static List splitLog(final Path rootDir,
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
- final Configuration conf) throws IOException {
+ final Configuration conf)
+ throws IOException {
final Map logWriters =
Collections.synchronizedMap(
new TreeMap(Bytes.BYTES_COMPARATOR));
@@ -1115,35 +1177,51 @@ public class HLog implements HConstants, Syncable {
int logWriterThreads =
conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
- // Number of logs to read concurrently when log splitting.
- // More means faster but bigger mem consumption */
- int concurrentLogReads =
+ // Number of logs to read into memory before writing to their appropriate
+ // regions when log splitting. More means faster but bigger mem consumption
+ int logFilesPerStep =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
- // Is append supported?
+
+ // append support = we can avoid data loss (yay)
+ // we open for append, then close to recover the correct file length
+ final boolean appendSupport = isAppend(conf);
+
+ // store corrupt logs for post-mortem analysis (empty string = discard)
+ final String corruptDir =
+ conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt");
+
+ List finishedFiles = new LinkedList();
+ List corruptFiles = new LinkedList();
+
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
- concurrentLogReads)).intValue();
+ logFilesPerStep)).intValue();
for (int step = 0; step < maxSteps; step++) {
+
+ // Step 1: read N log files into memory
final Map> logEntries =
new TreeMap>(Bytes.BYTES_COMPARATOR);
- // Stop at logfiles.length when it's the last step
int endIndex = step == maxSteps - 1? logfiles.length:
- step * concurrentLogReads + concurrentLogReads;
- for (int i = (step * concurrentLogReads); i < endIndex; i++) {
- // Check for possibly empty file. With appends, currently Hadoop
- // reports a zero length even if the file has been sync'd. Revisit if
- // HADOOP-4751 is committed.
- long length = logfiles[i].getLen();
+ step * logFilesPerStep + logFilesPerStep;
+ for (int i = (step * logFilesPerStep); i < endIndex; i++) {
+ Path curLogFile = logfiles[i].getPath();
+
+ // make sure we get the right file length before opening for read
+ recoverLog(fs, curLogFile, appendSupport);
+
+ long length = fs.getFileStatus(curLogFile).getLen();
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
- ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ ": " + curLogFile + ", length=" + length);
}
+
Reader in = null;
+ boolean cleanRead = false;
int count = 0;
try {
- in = HLog.getReader(fs, logfiles[i].getPath(), conf);
+ in = HLog.getReader(fs, curLogFile, conf);
try {
- HLog.Entry entry;
+ Entry entry;
while ((entry = in.next()) != null) {
byte [] regionName = entry.getKey().getRegionName();
LinkedList queue = logEntries.get(regionName);
@@ -1155,20 +1233,24 @@ public class HLog implements HConstants, Syncable {
queue.push(entry);
count++;
}
- LOG.debug("Pushed=" + count + " entries from " +
- logfiles[i].getPath());
+ LOG.debug("Pushed=" + count + " entries from " + curLogFile);
+ cleanRead = true;
} catch (IOException e) {
- LOG.debug("IOE Pushed=" + count + " entries from " +
- logfiles[i].getPath());
+ LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile);
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
- LOG.warn("Exception processing " + logfiles[i].getPath() +
- " -- continuing. Possible DATA LOSS!", e);
+ String msg = "Exception processing " + curLogFile +
+ " -- continuing. Possible DATA LOSS!";
+ if (corruptDir.length() > 0) {
+ msg += " Storing in hlog corruption directory.";
+ }
+ LOG.warn(msg, e);
}
}
} catch (IOException e) {
if (length <= 0) {
- LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
+ LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+ cleanRead = true;
continue;
}
throw e;
@@ -1178,8 +1260,16 @@ public class HLog implements HConstants, Syncable {
in.close();
}
} catch (IOException e) {
- LOG.warn("Close in finally threw exception -- continuing", e);
+ LOG.warn("File.close() threw exception -- continuing, "
+ + "but marking file as corrupt.", e);
+ cleanRead = false;
}
+ if (cleanRead) {
+ finishedFiles.add(curLogFile);
+ } else {
+ corruptFiles.add(curLogFile);
+ }
+ /* TODO FOR J-D REVIEW
// Archive the input file now so we do not replay edits. We could
// have gotten here because of an exception. If so, probably
// nothing we can do about it. Replaying it, it could work but we
@@ -1187,85 +1277,89 @@ public class HLog implements HConstants, Syncable {
// could have lost some edits.
fs.rename(logfiles[i].getPath(),
getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
+ */
}
}
+
+ // Step 2: Some regionserver log files have been read into memory.
+ // Assign them to the appropriate region directory.
+ class ThreadWithException extends Thread {
+ ThreadWithException(String name) { super(name); }
+ public IOException exception = null;
+ }
+ List threadList =
+ new ArrayList(logEntries.size());
ExecutorService threadPool =
Executors.newFixedThreadPool(logWriterThreads);
- for (final byte[] key : logEntries.keySet()) {
- Thread thread = new Thread(Bytes.toStringBinary(key)) {
+ for (final byte [] region: logEntries.keySet()) {
+ ThreadWithException thread =
+ new ThreadWithException(Bytes.toStringBinary(region)) {
@Override
public void run() {
- LinkedList entries = logEntries.get(key);
+ LinkedList entries = logEntries.get(region);
LOG.debug("Thread got " + entries.size() + " to process");
+ if(entries.size() <= 0) {
+ LOG.warn("Got a region with no entries to process.");
+ return;
+ }
long threadTime = System.currentTimeMillis();
try {
int count = 0;
+ // get the logfile associated with this region. 2 logs often
+ // write to the same region, so persist this info across logs
+ WriterAndPath wap = logWriters.get(region);
+ if (wap == null) {
+ // first write to this region, make new logfile
+ assert entries.size() > 0;
+ Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+ .getTableDir(rootDir,
+ entries.getFirst().getKey().getTablename()),
+ HRegionInfo.encodeRegionName(region)),
+ HREGION_OLDLOGFILE_NAME);
+
+ // If splitLog() was running when the user restarted his
+ // cluster, then we could already have a 'logfile'.
+ // Since we don't delete logs until everything is written to
+ // their respective regions, we can safely remove this tmp.
+ if (fs.exists(logfile)) {
+ LOG.warn("Deleting old hlog file: " + logfile);
+ // TODO: Archive?
+ fs.delete(logfile, true);
+ }
+
+ // associate an OutputStream with this logfile
+ Writer w = createWriter(fs, logfile, conf);
+ wap = new WriterAndPath(logfile, w);
+ logWriters.put(region, wap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new hlog file writer for path "
+ + logfile + " and region " + Bytes.toStringBinary(region));
+ }
+ }
+
// Items were added to the linkedlist oldest first. Pull them
// out in that order.
- for (ListIterator i =
- entries.listIterator(entries.size());
+ for (ListIterator i = entries.listIterator(entries.size());
i.hasPrevious();) {
- HLog.Entry logEntry = i.previous();
- WriterAndPath wap = logWriters.get(key);
- if (wap == null) {
- Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
- .getTableDir(rootDir, logEntry.getKey().getTablename()),
- HRegionInfo.encodeRegionName(key)),
- HREGION_OLDLOGFILE_NAME);
- Path oldlogfile = null;
- Reader old = null;
- if (fs.exists(logfile)) {
- FileStatus stat = fs.getFileStatus(logfile);
- if (stat.getLen() <= 0) {
- LOG.warn("Old hlog file " + logfile + " is zero " +
- "length. Deleting existing file");
- fs.delete(logfile, false);
- } else {
- LOG.warn("Old hlog file " + logfile + " already " +
- "exists. Copying existing file to new file");
- oldlogfile = new Path(logfile.toString() + ".old");
- fs.rename(logfile, oldlogfile);
- old = getReader(fs, oldlogfile, conf);
- }
- }
- Writer w = createWriter(fs, logfile, conf);
- wap = new WriterAndPath(logfile, w);
- logWriters.put(key, wap);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new hlog file writer for path "
- + logfile + " and region " + Bytes.toStringBinary(key));
- }
-
- if (old != null) {
- // Copy from existing log file
- HLog.Entry entry;
- for (; (entry = old.next()) != null; count++) {
- if (LOG.isDebugEnabled() && count > 0
- && count % 10000 == 0) {
- LOG.debug("Copied " + count + " edits");
- }
- w.append(entry);
- }
- old.close();
- fs.delete(oldlogfile, true);
- }
- }
- wap.w.append(logEntry);
+ wap.w.append(i.previous());
count++;
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits to "
- + Bytes.toStringBinary(key) + " in "
+ + Bytes.toStringBinary(region) + " in "
+ (System.currentTimeMillis() - threadTime) + "ms");
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.warn("Got while writing region " + Bytes.toStringBinary(key)
- + " log " + e);
+ LOG.warn("Got while writing region "
+ + Bytes.toStringBinary(region) + " log " + e);
e.printStackTrace();
+ exception = e;
}
}
};
+ threadList.add(thread);
threadPool.execute(thread);
}
threadPool.shutdown();
@@ -1274,9 +1368,19 @@ public class HLog implements HConstants, Syncable {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
- }catch(InterruptedException ex) {
- LOG.warn("Hlog writers were interrupted, possible data loss!");
+ } catch (InterruptedException ex) {
+ LOG.warn("Hlog writers were interrupted during splitLog(). "
+ +"Retaining log files to avoid data loss.");
+ throw new IOException(ex.getMessage(), ex.getCause());
}
+ // throw an exception if one of the threads reported one
+ for (ThreadWithException t : threadList) {
+ if (t.exception != null) {
+ throw t.exception;
+ }
+ }
+
+ // End of for loop. Rinse and repeat
}
} finally {
splits = new ArrayList(logWriters.size());
@@ -1286,9 +1390,79 @@ public class HLog implements HConstants, Syncable {
splits.add(wap.p);
}
}
+
+ // Step 3: All writes succeeded! Get rid of the now-unnecessary logs
+ for(Path p : finishedFiles) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully split Hlog file. Deleting " + p);
+ }
+ try {
+ if (!fs.delete(p, true) && LOG.isDebugEnabled()) {
+ LOG.debug("Delete of split Hlog (" + p + ") failed.");
+ }
+ } catch (IOException e) {
+ // don't throw error here. worst case = double-read
+ LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e);
+ }
+ }
+ for (Path p : corruptFiles) {
+ if (corruptDir.length() > 0) {
+ // store any corrupt logs for later analysis
+ Path cp = new Path(conf.get(HBASE_DIR), corruptDir);
+ if(!fs.exists(cp)) {
+ fs.mkdirs(cp);
+ }
+ Path newp = new Path(cp, p.getName());
+ if (!fs.exists(newp)) {
+ if (!fs.rename(p, newp)) {
+ LOG.warn("Rename of " + p + " to " + newp + " failed.");
+ } else {
+ LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp);
+ }
+ } else {
+ LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp +
+ ". Ignoring");
+ }
+ } else {
+ // data loss is less important than disk space, delete
+ try {
+ if (!fs.delete(p, true) ) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete of split Hlog " + p + " failed.");
+ }
+ } else {
+ LOG.warn("Corrupt Hlog (" + p + ") deleted!");
+ }
+ } catch (IOException e) {
+ LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e);
+ }
+ }
+ }
+
return splits;
}
+ /*
+ * @param conf
+ * @return True if append enabled and we have the syncFs in our path.
+ */
+ static boolean isAppend(final Configuration conf) {
+ boolean append = conf.getBoolean("dfs.support.append", false);
+ if (append) {
+ try {
+ // TODO: The implementation that comes back when we do a createWriter
+ // may not be using SequenceFile so the below is not a definitive test.
+ // Will do for now (hdfs-200).
+ SequenceFile.Writer.class.getMethod("syncFs", new Class> []{});
+ append = true;
+ } catch (SecurityException e) {
+ } catch (NoSuchMethodException e) {
+ append = false;
+ }
+ }
+ return append;
+ }
+
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
@@ -1345,6 +1519,64 @@ public class HLog implements HConstants, Syncable {
}
}
+ /*
+ * Recover log.
+ * Try and open log in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * @param fs
+ * @param p
+ * @param append
+ */
+ public static void recoverLog(final FileSystem fs, final Path p,
+ final boolean append) throws IOException {
+ if (!append) {
+ return;
+ }
+
+ // lease recovery not needed for local file system case.
+ // currently, local file system doesn't implement append either.
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+
+ LOG.debug("Recovering DFS lease for path " + p);
+ long startWaiting = System.currentTimeMillis();
+
+ // Trying recovery
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+ FSDataOutputStream out = fs.append(p);
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ if (e instanceof AlreadyBeingCreatedException) {
+ // We expect that we'll get this message while the lease is still
+ // within its soft limit, but if we get it past that, it means
+ // that the RS is holding onto the file even though it lost its
+ // znode. We could potentially abort after some time here.
+ long waitedFor = System.currentTimeMillis() - startWaiting;
+
+ if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
+ LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p
+ + ":" + e.getMessage());
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ } else {
+ throw new IOException("Failed to open " + p + " for append", e);
+ }
+ }
+ }
+ LOG.info("Past out lease recovery");
+ }
+
/**
* Construct the HLog directory name
*
diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index d77875a6499..d872f345ebf 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -21,29 +21,42 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.io.OutputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.DefaultCodec;
+/**
+ * Implementation of {@link HLog.Writer} that delegates to
+ * {@link SequenceFile.Writer}.
+ */
public class SequenceFileLogWriter implements HLog.Writer {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+ // The sequence file we delegate to.
+ private SequenceFile.Writer writer;
+ // The dfsclient out stream gotten made accessible or null if not available.
+ private OutputStream dfsClient_out;
+ // The syncFs method from hdfs-200 or null if not available.
+ private Method syncFs;
- SequenceFile.Writer writer;
- FSDataOutputStream writer_out;
-
- public SequenceFileLogWriter() { }
+ public SequenceFileLogWriter() {
+ super();
+ }
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
- writer = SequenceFile.createWriter(fs, conf, path,
+ // Create a SF.Writer instance.
+ this.writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
@@ -58,19 +71,39 @@ public class SequenceFileLogWriter implements HLog.Writer {
// Get at the private FSDataOutputStream inside in SequenceFile so we can
// call sync on it. Make it accessible. Stash it aside for call up in
// the sync method.
- final Field fields[] = writer.getClass().getDeclaredFields();
+ final Field fields [] = this.writer.getClass().getDeclaredFields();
final String fieldName = "out";
for (int i = 0; i < fields.length; ++i) {
if (fieldName.equals(fields[i].getName())) {
try {
+ // Make the 'out' field up in SF.Writer accessible.
fields[i].setAccessible(true);
- this.writer_out = (FSDataOutputStream)fields[i].get(writer);
+ FSDataOutputStream out =
+ (FSDataOutputStream)fields[i].get(this.writer);
+ this.dfsClient_out = out.getWrappedStream();
break;
} catch (IllegalAccessException ex) {
throw new IOException("Accessing " + fieldName, ex);
}
}
}
+
+ // Now do dirty work to see if syncFs is available.
+ // Test if syncfs is available.
+ Method m = null;
+ if (conf.getBoolean("dfs.support.append", false)) {
+ try {
+ // function pointer to writer.syncFs()
+ m = this.writer.getClass().getMethod("syncFs", new Class> []{});
+ } catch (SecurityException e) {
+ throw new IOException("Failed test for syncfs", e);
+ } catch (NoSuchMethodException e) {
+ // Not available
+ }
+ }
+ this.syncFs = m;
+ LOG.info((this.syncFs != null)?
+ "Using syncFs -- HDFS-200": "syncFs -- HDFS-200 -- not available");
}
@Override
@@ -86,12 +119,25 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override
public void sync() throws IOException {
this.writer.sync();
-
- this.writer.syncFs();
+ if (this.syncFs != null) {
+ try {
+ this.syncFs.invoke(this.writer, HLog.NO_ARGS);
+ } catch (Exception e) {
+ throw new IOException("Reflection", e);
+ }
+ }
}
@Override
public long getLength() throws IOException {
return this.writer.getLength();
}
-}
+
+ /**
+ * @return The dfsclient out stream up inside SF.Writer made accessible, or
+ * null if not available.
+ */
+ public OutputStream getDFSCOutputStream() {
+ return this.dfsClient_out;
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index efd5f7335dc..4e290a157ea 100644
--- a/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -28,15 +28,14 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -129,6 +128,7 @@ public class MiniHBaseCluster implements HConstants {
*/
public static class MiniHBaseClusterRegionServer extends HRegionServer {
private static int index = 0;
+ private Thread shutdownThread = null;
public MiniHBaseClusterRegionServer(Configuration conf)
throws IOException {
@@ -159,11 +159,20 @@ public class MiniHBaseCluster implements HConstants {
@Override
protected void init(MapWritable c) throws IOException {
super.init(c);
- // Change shutdown hook to only shutdown the FileSystem added above by
- // {@link #getFileSystem(HBaseConfiguration)
- if (getFileSystem() instanceof DistributedFileSystem) {
- Thread t = new SingleFileSystemShutdownThread(getFileSystem());
- Runtime.getRuntime().addShutdownHook(t);
+ // Run this thread to shutdown our filesystem on way out.
+ this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } finally {
+ // Run this on the way out.
+ if (this.shutdownThread != null) {
+ this.shutdownThread.start();
+ Threads.shutdown(this.shutdownThread, 30000);
+ }
}
}
diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index cb4eab545af..76a6b864e72 100644
--- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -19,6 +19,11 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -27,15 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
private Path dir;
@@ -322,4 +322,70 @@ public class TestHLog extends HBaseTestCase implements HConstants {
}
}
}
-}
+
+ /**
+ * @throws IOException
+ */
+ public void testAppend() throws IOException {
+ final int COL_COUNT = 10;
+ final byte [] tableName = Bytes.toBytes("tablename");
+ final byte [] row = Bytes.toBytes("row");
+ this.conf.setBoolean("dfs.support.append", true);
+ Reader reader = null;
+ HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
+ try {
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long timestamp = System.currentTimeMillis();
+ WALEdit cols = new WALEdit();
+ for (int i = 0; i < COL_COUNT; i++) {
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[] { (byte)(i + '0') }));
+ }
+ HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ log.append(hri, tableName, cols, System.currentTimeMillis());
+ long logSeqId = log.startCacheFlush();
+ log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+ log = null;
+ // Now open a reader on the log and assert append worked.
+ reader = HLog.getReader(fs, filename, conf);
+ HLog.Entry entry = reader.next();
+ assertEquals(COL_COUNT, entry.getEdit().size());
+ int idx = 0;
+ for (KeyValue val : entry.getEdit().getKeyValues()) {
+ assertTrue(Bytes.equals(hri.getRegionName(),
+ entry.getKey().getRegionName()));
+ assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+ assertTrue(Bytes.equals(row, val.getRow()));
+ assertEquals((byte)(idx + '0'), val.getValue()[0]);
+ System.out.println(entry.getKey() + " " + val);
+ idx++;
+ }
+
+ // Get next row... the meta flushed row.
+ entry = reader.next();
+ assertEquals(1, entry.getEdit().size());
+ for (KeyValue val : entry.getEdit().getKeyValues()) {
+ assertTrue(Bytes.equals(hri.getRegionName(),
+ entry.getKey().getRegionName()));
+ assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+ assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
+ assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
+ assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
+ val.getValue()));
+ System.out.println(entry.getKey() + " " + val);
+ }
+ } finally {
+ if (log != null) {
+ log.closeAndDelete();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+}
\ No newline at end of file