From 3fe4832c21c7987e2c7cf302953ced381022b62b Mon Sep 17 00:00:00 2001 From: Ryan Rawson Date: Wed, 21 Oct 2009 00:38:25 +0000 Subject: [PATCH] hdfs-1915 git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@827858 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 +- .../org/apache/hadoop/hbase/HConstants.java | 2 +- .../hbase/regionserver/HRegionServer.java | 38 ++++++++-- .../hadoop/hbase/regionserver/LogFlusher.java | 7 +- .../hadoop/hbase/regionserver/wal/HLog.java | 76 ++++++++++--------- .../hbase/regionserver/wal/TestHLog.java | 3 +- 6 files changed, 81 insertions(+), 49 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8411ef3a69d..72ef6418811 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -63,7 +63,7 @@ Release 0.21.0 - Unreleased Filters client-side HBASE-1890 hbase-1506 where assignment is done at regionserver doesn't work - HBASE-1889 ClassNotFoundException on trunk for REST + HBASE-1889 ClassNotFoundException on trunk for REST HBASE-1905 Remove unused config. hbase.hstore.blockCache.blockSize HBASE-1906 FilterList of prefix and columnvalue not working properly with deletes and multiple values @@ -73,6 +73,8 @@ Release 0.21.0 - Unreleased Purtell) HBASE-1916 FindBugs and javac warnings cleanup HBASE-1908 ROOT not reassigned if only one regionserver left + HBASE-1915 HLog.sync is called way too often, needs to be only called 1x per + RPC IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 62d5c896abd..07446cf1042 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -223,7 +223,7 @@ public interface HConstants { /** * Max length a row can have because of the limitation in TFile. */ - static final int MAX_ROW_LENGTH = 1024*64; + static final int MAX_ROW_LENGTH = Short.MAX_VALUE; /** When we encode strings, we always specify UTF8 encoding */ static final String UTF8_ENCODING = "UTF-8"; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4e7f45ad8a4..87e5b6ebe9c 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1760,6 +1760,8 @@ public class HRegionServer implements HConstants, HRegionInterface, try { cacheFlusher.reclaimMemStoreMemory(); region.put(put, getLockFromId(put.getLockId())); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1770,8 +1772,11 @@ public class HRegionServer implements HConstants, HRegionInterface, // Count of Puts processed. int i = 0; checkOpen(); + boolean isMetaRegion = false; try { HRegion region = getRegion(regionName); + isMetaRegion = region.getRegionInfo().isMetaRegion(); + this.cacheFlusher.reclaimMemStoreMemory(); Integer[] locks = new Integer[puts.length]; for (i = 0; i < puts.length; i++) { @@ -1779,16 +1784,22 @@ public class HRegionServer implements HConstants, HRegionInterface, locks[i] = getLockFromId(puts[i].getLockId()); region.put(puts[i], locks[i]); } + } catch (WrongRegionException ex) { LOG.debug("Batch puts: " + i, ex); - return i; } catch (NotServingRegionException ex) { - return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } // All have been processed successfully. - return -1; + + this.hlog.sync(isMetaRegion); + + if (i == puts.length) { + return -1; + } else { + return i; + } } /** @@ -1814,8 +1825,11 @@ public class HRegionServer implements HConstants, HRegionInterface, HRegion region = getRegion(regionName); try { cacheFlusher.reclaimMemStoreMemory(); - return region.checkAndPut(row, family, qualifier, value, put, + boolean retval = region.checkAndPut(row, family, qualifier, value, put, getLockFromId(put.getLockId()), true); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + return retval; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1962,8 +1976,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Integer lid = getLockFromId(delete.getLockId()); HRegion region = getRegion(regionName); region.delete(delete, lid, writeToWAL); - } catch(WrongRegionException ex) { - // ignore + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + } catch (WrongRegionException ex) { } catch (NotServingRegionException ex) { // ignore } catch (Throwable t) { @@ -1976,11 +1991,14 @@ public class HRegionServer implements HConstants, HRegionInterface, // Count of Deletes processed. int i = 0; checkOpen(); + boolean isMetaRegion = false; try { boolean writeToWAL = true; this.cacheFlusher.reclaimMemStoreMemory(); Integer[] locks = new Integer[deletes.length]; HRegion region = getRegion(regionName); + isMetaRegion = region.getRegionInfo().isMetaRegion(); + for (i = 0; i < deletes.length; i++) { this.requestCount.incrementAndGet(); locks[i] = getLockFromId(deletes[i].getLockId()); @@ -1994,6 +2012,8 @@ public class HRegionServer implements HConstants, HRegionInterface, } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } + + this.hlog.sync(isMetaRegion); // All have been processed successfully. return -1; } @@ -2438,8 +2458,12 @@ public class HRegionServer implements HConstants, HRegionInterface, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - return region.incrementColumnValue(row, family, qualifier, amount, + long retval = region.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + + return retval; } catch (IOException e) { checkFileSystem(); throw e; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java b/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java index 9568b852ed5..7989b990417 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,7 +52,11 @@ public class LogFlusher extends Chore { protected void chore() { HLog hlog = log.get(); if (hlog != null) { - hlog.optionalSync(); + try { + hlog.sync(true); // force a flush + } catch (IOException e) { + LOG.error("LogFlusher got exception while syncing: " + e); + } } } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index d07f51fa881..6f833eee124 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -354,7 +354,6 @@ public class HLog implements HConstants, Syncable { } this.numEntries.set(0); this.editsSize.set(0); - updateLock.notifyAll(); } } finally { this.cacheFlushLock.unlock(); @@ -600,7 +599,6 @@ public class HLog implements HConstants, Syncable { LOG.debug("closing hlog writer in " + this.dir.toString()); } this.writer.close(); - updateLock.notifyAll(); } } finally { cacheFlushLock.unlock(); @@ -657,8 +655,9 @@ public class HLog implements HConstants, Syncable { this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion(); doWrite(logKey, logEdit, sync, logKey.getWriteTime()); + + this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); - updateLock.notifyAll(); } if (this.editsSize.get() > this.logrollsize) { if (listener != null) { @@ -710,7 +709,9 @@ public class HLog implements HConstants, Syncable { doWrite(logKey, kv, sync, now); this.numEntries.incrementAndGet(); } - updateLock.notifyAll(); + + // Only count 1 row as an unflushed entry. + this.unflushedEntries.incrementAndGet(); } if (this.editsSize.get() > this.logrollsize) { requestLogRoll(); @@ -718,40 +719,45 @@ public class HLog implements HConstants, Syncable { } public void sync() throws IOException { - lastLogFlushTime = System.currentTimeMillis(); - if (this.append && syncfs != null) { - try { - this.syncfs.invoke(this.writer, NO_ARGS); - } catch (Exception e) { - throw new IOException("Reflection", e); - } - } else { - this.writer.sync(); - // Above is sequencefile.writer sync. It doesn't actually synce the - // backing stream. Need to do the below to do that. - if (this.writer_out != null) this.writer_out.sync(); - } - this.unflushedEntries.set(0); + sync(false); } - public void optionalSync() { - if (!this.closed) { - long now = System.currentTimeMillis(); - synchronized (updateLock) { - if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) && - this.unflushedEntries.get() > 0) { - try { - sync(); - } catch (IOException e) { - LOG.error("Error flushing hlog", e); + /** + * Multiple threads will call sync() at the same time, only the winner + * will actually flush if there is any race or build up. + * + * @param force sync regardless (for meta updates) if there is data + * @throws IOException + */ + public void sync(boolean force) throws IOException { + synchronized (this.updateLock) { + if (this.closed) + return; + + if (this.unflushedEntries.get() == 0) + return; // win + + if (force || this.unflushedEntries.get() > this.flushlogentries) { + try { + lastLogFlushTime = System.currentTimeMillis(); + if (this.append && syncfs != null) { + try { + this.syncfs.invoke(this.writer, NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } + } else { + this.writer.sync(); + if (this.writer_out != null) + this.writer_out.sync(); } + this.unflushedEntries.set(0); + } catch (IOException e) { + LOG.fatal("Could not append. Requesting close of hlog", e); + requestLogRoll(); + throw e; } } - long took = System.currentTimeMillis() - now; - if (took > 1000) { - LOG.warn(Thread.currentThread().getName() + " took " + took + - "ms optional sync'ing hlog; editcount=" + this.numEntries.get()); - } } } @@ -770,9 +776,6 @@ public class HLog implements HConstants, Syncable { try { this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize()); this.writer.append(logKey, logEdit); - if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) { - sync(); - } long took = System.currentTimeMillis() - now; if (took > 1000) { LOG.warn(Thread.currentThread().getName() + " took " + took + @@ -858,7 +861,6 @@ public class HLog implements HConstants, Syncable { if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(regionName); } - updateLock.notifyAll(); } } finally { this.cacheFlushLock.unlock(); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index fb2fd5fc6ce..8348c235182 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -48,6 +48,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { this.conf.setBoolean("dfs.support.append", true); // Make block sizes small. this.conf.setInt("dfs.blocksize", 1024 * 1024); + this.conf.setInt("hbase.regionserver.flushlogentries", 1); cluster = new MiniDFSCluster(conf, 3, true, (String[])null); // Set the hbase.rootdir to be the home directory in mini dfs. this.conf.set(HConstants.HBASE_DIR, @@ -125,8 +126,6 @@ public class TestHLog extends HBaseTestCase implements HConstants { assertEquals(bytes.length, read); out.close(); in.close(); - // To be sure, set our flush to be at 100 edits. - this.conf.setInt("hbase.regionserver.flushlogentries", 100); Path subdir = new Path(this.dir, "hlogdir"); HLog wal = new HLog(this.fs, subdir, this.conf, null); final int total = 20;