HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@793145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a21d4759a
commit
67f428cc42
|
@ -468,6 +468,7 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1643 ScanDeleteTracker takes comparator but it unused
|
||||
HBASE-1603 MR failed "RetriesExhaustedException: Trying to contact region server
|
||||
Some server for region TestTable..." -- deubugging
|
||||
HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.EOFException;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.io.SequenceFile.Reader;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
||||
/**
|
||||
* HLog stores all the edits to the HStore.
|
||||
|
@ -112,7 +114,10 @@ public class HLog implements HConstants, Syncable {
|
|||
private final int flushlogentries;
|
||||
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
|
||||
private volatile long lastLogFlushTime;
|
||||
|
||||
private final boolean append;
|
||||
private final Method syncfs;
|
||||
private final static Object [] NO_ARGS = new Object []{};
|
||||
|
||||
/*
|
||||
* Current log file.
|
||||
*/
|
||||
|
@ -213,6 +218,21 @@ public class HLog implements HConstants, Syncable {
|
|||
", flushlogentries=" + this.flushlogentries +
|
||||
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
|
||||
rollWriter();
|
||||
// Test if syncfs is available.
|
||||
this.append = conf.getBoolean("dfs.support.append", false);
|
||||
Method m = null;
|
||||
if (this.append) {
|
||||
try {
|
||||
m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
|
||||
LOG.debug("Using syncFs--hadoop-4379");
|
||||
} catch (SecurityException e) {
|
||||
throw new IOException("Failed test for syncfs", e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
// This can happen
|
||||
LOG.info("syncFs--hadoop-4379 not available" );
|
||||
}
|
||||
}
|
||||
this.syncfs = m;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -585,7 +605,15 @@ public class HLog implements HConstants, Syncable {
|
|||
|
||||
public void sync() throws IOException {
|
||||
lastLogFlushTime = System.currentTimeMillis();
|
||||
this.writer.sync();
|
||||
if (this.append && syncfs != null) {
|
||||
try {
|
||||
this.syncfs.invoke(this.writer, NO_ARGS);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Reflection", e);
|
||||
}
|
||||
} else {
|
||||
this.writer.sync();
|
||||
}
|
||||
this.unflushedEntries.set(0);
|
||||
}
|
||||
|
||||
|
@ -821,15 +849,16 @@ public class HLog implements HConstants, Syncable {
|
|||
step * DEFAULT_NUMBER_CONCURRENT_LOG_READS +
|
||||
DEFAULT_NUMBER_CONCURRENT_LOG_READS;
|
||||
for (int i = (step * 10); i < endIndex; i++) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
|
||||
": " + logfiles[i].getPath() +
|
||||
", length=" + logfiles[i].getLen());
|
||||
}
|
||||
// Check for possibly empty file. With appends, currently Hadoop
|
||||
// reports a zero length even if the file has been sync'd. Revisit if
|
||||
// HADOOP-4751 is committed.
|
||||
long length = logfiles[i].getLen();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
|
||||
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
|
||||
}
|
||||
boolean append = conf.getBoolean("dfs.support.append", false);
|
||||
recoverLog(fs, logfiles[i].getPath(), append);
|
||||
SequenceFile.Reader in = null;
|
||||
int count = 0;
|
||||
try {
|
||||
|
@ -853,10 +882,10 @@ public class HLog implements HConstants, Syncable {
|
|||
key = new HLogKey();
|
||||
val = new KeyValue();
|
||||
}
|
||||
LOG.debug("Pushed " + count + " entries from " +
|
||||
LOG.debug("Pushed=" + count + " entries from " +
|
||||
logfiles[i].getPath());
|
||||
} catch (IOException e) {
|
||||
LOG.debug("IOE Pushed " + count + " entries from " +
|
||||
LOG.debug("IOE Pushed=" + count + " entries from " +
|
||||
logfiles[i].getPath());
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (!(e instanceof EOFException)) {
|
||||
|
@ -866,7 +895,7 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
} catch (IOException e) {
|
||||
if (length <= 0) {
|
||||
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
|
||||
LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
|
@ -943,9 +972,6 @@ public class HLog implements HConstants, Syncable {
|
|||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
}
|
||||
if (wap == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
wap.w.append(logEntry.getKey(), logEntry.getEdit());
|
||||
count++;
|
||||
}
|
||||
|
@ -1030,6 +1056,40 @@ public class HLog implements HConstants, Syncable {
|
|||
public static String getHLogDirectoryName(HServerInfo info) {
|
||||
return getHLogDirectoryName(HServerInfo.getServerName(info));
|
||||
}
|
||||
|
||||
/*
|
||||
* Recover log.
|
||||
* If append has been set, try and open log in append mode.
|
||||
* Doing this, we get a hold of the file that crashed writer
|
||||
* was writing to. Once we have it, close it. This will
|
||||
* allow subsequent reader to see up to last sync.
|
||||
* @param fs
|
||||
* @param p
|
||||
* @param append
|
||||
*/
|
||||
private static void recoverLog(final FileSystem fs, final Path p,
|
||||
final boolean append) {
|
||||
if (!append) {
|
||||
return;
|
||||
}
|
||||
// Trying recovery
|
||||
boolean recovered = false;
|
||||
while (!recovered) {
|
||||
try {
|
||||
FSDataOutputStream out = fs.append(p);
|
||||
out.close();
|
||||
recovered = true;
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
// ignore it and try again
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Past out lease recovery");
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the HLog directory name
|
||||
|
|
Loading…
Reference in New Issue