hdfs-1915
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@827858 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e1f057be13
commit
3fe4832c21
|
@ -63,7 +63,7 @@ Release 0.21.0 - Unreleased
|
|||
Filters client-side
|
||||
HBASE-1890 hbase-1506 where assignment is done at regionserver doesn't
|
||||
work
|
||||
HBASE-1889 ClassNotFoundException on trunk for REST
|
||||
HBASE-1889 ClassNotFoundException on trunk for REST
|
||||
HBASE-1905 Remove unused config. hbase.hstore.blockCache.blockSize
|
||||
HBASE-1906 FilterList of prefix and columnvalue not working properly with
|
||||
deletes and multiple values
|
||||
|
@ -73,6 +73,8 @@ Release 0.21.0 - Unreleased
|
|||
Purtell)
|
||||
HBASE-1916 FindBugs and javac warnings cleanup
|
||||
HBASE-1908 ROOT not reassigned if only one regionserver left
|
||||
HBASE-1915 HLog.sync is called way too often, needs to be only called 1x per
|
||||
RPC
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -223,7 +223,7 @@ public interface HConstants {
|
|||
/**
|
||||
* Max length a row can have because of the limitation in TFile.
|
||||
*/
|
||||
static final int MAX_ROW_LENGTH = 1024*64;
|
||||
static final int MAX_ROW_LENGTH = Short.MAX_VALUE;
|
||||
|
||||
/** When we encode strings, we always specify UTF8 encoding */
|
||||
static final String UTF8_ENCODING = "UTF-8";
|
||||
|
|
|
@ -1760,6 +1760,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
try {
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
region.put(put, getLockFromId(put.getLockId()));
|
||||
|
||||
this.hlog.sync(region.getRegionInfo().isMetaRegion());
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
|
@ -1770,8 +1772,11 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
// Count of Puts processed.
|
||||
int i = 0;
|
||||
checkOpen();
|
||||
boolean isMetaRegion = false;
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
isMetaRegion = region.getRegionInfo().isMetaRegion();
|
||||
|
||||
this.cacheFlusher.reclaimMemStoreMemory();
|
||||
Integer[] locks = new Integer[puts.length];
|
||||
for (i = 0; i < puts.length; i++) {
|
||||
|
@ -1779,16 +1784,22 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
locks[i] = getLockFromId(puts[i].getLockId());
|
||||
region.put(puts[i], locks[i]);
|
||||
}
|
||||
|
||||
} catch (WrongRegionException ex) {
|
||||
LOG.debug("Batch puts: " + i, ex);
|
||||
return i;
|
||||
} catch (NotServingRegionException ex) {
|
||||
return i;
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
// All have been processed successfully.
|
||||
return -1;
|
||||
|
||||
this.hlog.sync(isMetaRegion);
|
||||
|
||||
if (i == puts.length) {
|
||||
return -1;
|
||||
} else {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1814,8 +1825,11 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
HRegion region = getRegion(regionName);
|
||||
try {
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
return region.checkAndPut(row, family, qualifier, value, put,
|
||||
boolean retval = region.checkAndPut(row, family, qualifier, value, put,
|
||||
getLockFromId(put.getLockId()), true);
|
||||
|
||||
this.hlog.sync(region.getRegionInfo().isMetaRegion());
|
||||
return retval;
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
|
@ -1962,8 +1976,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
Integer lid = getLockFromId(delete.getLockId());
|
||||
HRegion region = getRegion(regionName);
|
||||
region.delete(delete, lid, writeToWAL);
|
||||
} catch(WrongRegionException ex) {
|
||||
// ignore
|
||||
|
||||
this.hlog.sync(region.getRegionInfo().isMetaRegion());
|
||||
} catch (WrongRegionException ex) {
|
||||
} catch (NotServingRegionException ex) {
|
||||
// ignore
|
||||
} catch (Throwable t) {
|
||||
|
@ -1976,11 +1991,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
// Count of Deletes processed.
|
||||
int i = 0;
|
||||
checkOpen();
|
||||
boolean isMetaRegion = false;
|
||||
try {
|
||||
boolean writeToWAL = true;
|
||||
this.cacheFlusher.reclaimMemStoreMemory();
|
||||
Integer[] locks = new Integer[deletes.length];
|
||||
HRegion region = getRegion(regionName);
|
||||
isMetaRegion = region.getRegionInfo().isMetaRegion();
|
||||
|
||||
for (i = 0; i < deletes.length; i++) {
|
||||
this.requestCount.incrementAndGet();
|
||||
locks[i] = getLockFromId(deletes[i].getLockId());
|
||||
|
@ -1994,6 +2012,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
|
||||
this.hlog.sync(isMetaRegion);
|
||||
// All have been processed successfully.
|
||||
return -1;
|
||||
}
|
||||
|
@ -2438,8 +2458,12 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
return region.incrementColumnValue(row, family, qualifier, amount,
|
||||
long retval = region.incrementColumnValue(row, family, qualifier, amount,
|
||||
writeToWAL);
|
||||
|
||||
this.hlog.sync(region.getRegionInfo().isMetaRegion());
|
||||
|
||||
return retval;
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -51,7 +52,11 @@ public class LogFlusher extends Chore {
|
|||
protected void chore() {
|
||||
HLog hlog = log.get();
|
||||
if (hlog != null) {
|
||||
hlog.optionalSync();
|
||||
try {
|
||||
hlog.sync(true); // force a flush
|
||||
} catch (IOException e) {
|
||||
LOG.error("LogFlusher got exception while syncing: " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -354,7 +354,6 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
this.numEntries.set(0);
|
||||
this.editsSize.set(0);
|
||||
updateLock.notifyAll();
|
||||
}
|
||||
} finally {
|
||||
this.cacheFlushLock.unlock();
|
||||
|
@ -600,7 +599,6 @@ public class HLog implements HConstants, Syncable {
|
|||
LOG.debug("closing hlog writer in " + this.dir.toString());
|
||||
}
|
||||
this.writer.close();
|
||||
updateLock.notifyAll();
|
||||
}
|
||||
} finally {
|
||||
cacheFlushLock.unlock();
|
||||
|
@ -657,8 +655,9 @@ public class HLog implements HConstants, Syncable {
|
|||
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
|
||||
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
|
||||
doWrite(logKey, logEdit, sync, logKey.getWriteTime());
|
||||
|
||||
this.unflushedEntries.incrementAndGet();
|
||||
this.numEntries.incrementAndGet();
|
||||
updateLock.notifyAll();
|
||||
}
|
||||
if (this.editsSize.get() > this.logrollsize) {
|
||||
if (listener != null) {
|
||||
|
@ -710,7 +709,9 @@ public class HLog implements HConstants, Syncable {
|
|||
doWrite(logKey, kv, sync, now);
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
updateLock.notifyAll();
|
||||
|
||||
// Only count 1 row as an unflushed entry.
|
||||
this.unflushedEntries.incrementAndGet();
|
||||
}
|
||||
if (this.editsSize.get() > this.logrollsize) {
|
||||
requestLogRoll();
|
||||
|
@ -718,40 +719,45 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
|
||||
public void sync() throws IOException {
|
||||
lastLogFlushTime = System.currentTimeMillis();
|
||||
if (this.append && syncfs != null) {
|
||||
try {
|
||||
this.syncfs.invoke(this.writer, NO_ARGS);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Reflection", e);
|
||||
}
|
||||
} else {
|
||||
this.writer.sync();
|
||||
// Above is sequencefile.writer sync. It doesn't actually synce the
|
||||
// backing stream. Need to do the below to do that.
|
||||
if (this.writer_out != null) this.writer_out.sync();
|
||||
}
|
||||
this.unflushedEntries.set(0);
|
||||
sync(false);
|
||||
}
|
||||
|
||||
public void optionalSync() {
|
||||
if (!this.closed) {
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized (updateLock) {
|
||||
if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
|
||||
this.unflushedEntries.get() > 0) {
|
||||
try {
|
||||
sync();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error flushing hlog", e);
|
||||
/**
|
||||
* Multiple threads will call sync() at the same time, only the winner
|
||||
* will actually flush if there is any race or build up.
|
||||
*
|
||||
* @param force sync regardless (for meta updates) if there is data
|
||||
* @throws IOException
|
||||
*/
|
||||
public void sync(boolean force) throws IOException {
|
||||
synchronized (this.updateLock) {
|
||||
if (this.closed)
|
||||
return;
|
||||
|
||||
if (this.unflushedEntries.get() == 0)
|
||||
return; // win
|
||||
|
||||
if (force || this.unflushedEntries.get() > this.flushlogentries) {
|
||||
try {
|
||||
lastLogFlushTime = System.currentTimeMillis();
|
||||
if (this.append && syncfs != null) {
|
||||
try {
|
||||
this.syncfs.invoke(this.writer, NO_ARGS);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Reflection", e);
|
||||
}
|
||||
} else {
|
||||
this.writer.sync();
|
||||
if (this.writer_out != null)
|
||||
this.writer_out.sync();
|
||||
}
|
||||
this.unflushedEntries.set(0);
|
||||
} catch (IOException e) {
|
||||
LOG.fatal("Could not append. Requesting close of hlog", e);
|
||||
requestLogRoll();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
long took = System.currentTimeMillis() - now;
|
||||
if (took > 1000) {
|
||||
LOG.warn(Thread.currentThread().getName() + " took " + took +
|
||||
"ms optional sync'ing hlog; editcount=" + this.numEntries.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -770,9 +776,6 @@ public class HLog implements HConstants, Syncable {
|
|||
try {
|
||||
this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
|
||||
this.writer.append(logKey, logEdit);
|
||||
if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
|
||||
sync();
|
||||
}
|
||||
long took = System.currentTimeMillis() - now;
|
||||
if (took > 1000) {
|
||||
LOG.warn(Thread.currentThread().getName() + " took " + took +
|
||||
|
@ -858,7 +861,6 @@ public class HLog implements HConstants, Syncable {
|
|||
if (seq != null && logSeqId >= seq.longValue()) {
|
||||
this.lastSeqWritten.remove(regionName);
|
||||
}
|
||||
updateLock.notifyAll();
|
||||
}
|
||||
} finally {
|
||||
this.cacheFlushLock.unlock();
|
||||
|
|
|
@ -48,6 +48,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
this.conf.setBoolean("dfs.support.append", true);
|
||||
// Make block sizes small.
|
||||
this.conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||
this.conf.setInt("hbase.regionserver.flushlogentries", 1);
|
||||
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
|
||||
// Set the hbase.rootdir to be the home directory in mini dfs.
|
||||
this.conf.set(HConstants.HBASE_DIR,
|
||||
|
@ -125,8 +126,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
assertEquals(bytes.length, read);
|
||||
out.close();
|
||||
in.close();
|
||||
// To be sure, set our flush to be at 100 edits.
|
||||
this.conf.setInt("hbase.regionserver.flushlogentries", 100);
|
||||
Path subdir = new Path(this.dir, "hlogdir");
|
||||
HLog wal = new HLog(this.fs, subdir, this.conf, null);
|
||||
final int total = 20;
|
||||
|
|
Loading…
Reference in New Issue