From 70312ea6258a049b7be72663e1548fb3ae73e444 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sun, 3 Jan 2010 04:08:51 +0000 Subject: [PATCH] HBASE-2052 Upper bound of outstanding WALs can be overrun git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@895331 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/regionserver/LogRoller.java | 6 +- .../hadoop/hbase/regionserver/wal/HLog.java | 90 +++++++++++++------ .../hbase/regionserver/wal/TestHLog.java | 27 ++++++ 4 files changed, 92 insertions(+), 32 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 26ca92ba672..4a8bd490292 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -278,6 +278,7 @@ Release 0.21.0 - Unreleased HBASE-2036 Use Configuration instead of HBaseConfiguration (Enis Soztutar via Stack) HBASE-2085 StringBuffer -> StringBuilder - conversion of references as necessary (Kay Kay via Stack) + HBASE-2052 Upper bound of outstanding WALs can be overrun NEW FEATURES HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write diff --git a/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index af3d4b5c5af..c485837161b 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -82,9 +82,9 @@ class LogRoller extends Thread implements LogRollListener { rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH try { this.lastrolltime = now; - byte [] regionToFlush = server.getLog().rollWriter(); - if (regionToFlush != null) { - scheduleFlush(regionToFlush); + byte [][] regionsToFlush = server.getLog().rollWriter(); + if (regionsToFlush != null) { + for (byte [] r: regionsToFlush) scheduleFlush(r); } } catch (FailedLogCloseException e) { LOG.fatal("Forcing server shutdown", e); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index f4f84c3d1ee..4895cba9e0a 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -154,7 +154,7 @@ public class HLog implements HConstants, Syncable { Collections.synchronizedSortedMap(new TreeMap()); /* - * Map of region to last sequence/edit id. + * Map of regions to first sequence/edit id in their memstore. */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); @@ -327,21 +327,21 @@ public class HLog implements HConstants, Syncable { * cacheFlushLock and then completeCacheFlush could be called which would wait * for the lock on this and consequently never release the cacheFlushLock * - * @return If lots of logs, flush the returned region so next time through + * @return If lots of logs, flush the returned regions so next time through * we can clean logs. Returns null if nothing to flush. * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException * @throws IOException */ - public byte [] rollWriter() throws FailedLogCloseException, IOException { + public byte [][] rollWriter() throws FailedLogCloseException, IOException { // Return if nothing to flush. if (this.writer != null && this.numEntries.get() <= 0) { return null; } - byte [] regionToFlush = null; + byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); try { if (closed) { - return regionToFlush; + return regionsToFlush; } synchronized (updateLock) { // Clean up current writer. @@ -367,7 +367,7 @@ public class HLog implements HConstants, Syncable { } this.outputfiles.clear(); } else { - regionToFlush = cleanOldLogs(); + regionsToFlush = cleanOldLogs(); } } this.numEntries.set(0); @@ -376,7 +376,7 @@ public class HLog implements HConstants, Syncable { } finally { this.cacheFlushLock.unlock(); } - return regionToFlush; + return regionsToFlush; } /** @@ -433,8 +433,7 @@ public class HLog implements HConstants, Syncable { * we can clean logs. Returns null if nothing to flush. * @throws IOException */ - private byte [] cleanOldLogs() throws IOException { - byte [] regionToFlush = null; + private byte [][] cleanOldLogs() throws IOException { Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum(); // Get the set of all log files whose final ID is older than or // equal to the oldest pending region operation @@ -442,29 +441,60 @@ public class HLog implements HConstants, Syncable { new TreeSet(this.outputfiles.headMap( (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); // Now remove old log files (if any) - byte [] oldestRegion = null; - if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - oldestRegion = getOldestRegion(oldestOutstandingSeqNum); - LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " + - " out of total " + this.outputfiles.size() + "; " + - "oldest outstanding seqnum is " + oldestOutstandingSeqNum + - " from region " + Bytes.toStringBinary(oldestRegion)); - } - if (sequenceNumbers.size() > 0) { + int logsToRemove = sequenceNumbers.size(); + if (logsToRemove > 0) { + if (LOG.isDebugEnabled()) { + // Find associated region; helps debugging. + byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum); + LOG.debug("Found " + logsToRemove + " hlogs to remove " + + " out of total " + this.outputfiles.size() + "; " + + "oldest outstanding seqnum is " + oldestOutstandingSeqNum + + " from region " + Bytes.toString(oldestRegion)); + } for (Long seq : sequenceNumbers) { deleteLogFile(this.outputfiles.remove(seq), seq); } } - int countOfLogs = this.outputfiles.size() - sequenceNumbers.size(); - if (countOfLogs > this.maxLogs) { - regionToFlush = oldestRegion != null? - oldestRegion: getOldestRegion(oldestOutstandingSeqNum); - LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" + - this.maxLogs + "; forcing flush of region with oldest edits: " + - Bytes.toStringBinary(regionToFlush)); + + // If too many log files, figure which regions we need to flush. + byte [][] regions = null; + int logCount = this.outputfiles.size() - logsToRemove; + if (logCount > this.maxLogs && this.outputfiles != null && + this.outputfiles.size() > 0) { + regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(), + this.lastSeqWritten); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) sb.append(", "); + sb.append(Bytes.toStringBinary(regions[i])); + } + LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + + this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + + sb.toString()); } - return regionToFlush; + return regions; + } + + /** + * Return regions (memstores) that have edits that are less than the passed + * oldestWALseqid. + * @param oldestWALseqid + * @param regionsToSeqids + * @return All regions whose seqid is < than oldestWALseqid (Not + * necessarily in order). Null if no regions found. + */ + static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid, + final Map regionsToSeqids) { + // This method is static so it can be unit tested the easier. + List regions = null; + for (Map.Entry e: regionsToSeqids.entrySet()) { + if (e.getValue().longValue() < oldestWALseqid) { + if (regions == null) regions = new ArrayList(); + regions.add(e.getKey()); + } + } + return regions == null? + null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY}); } /* @@ -614,7 +644,8 @@ public class HLog implements HConstants, Syncable { long seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the + // write for each region (i.e. the first edit added to the particular + // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); @@ -661,7 +692,8 @@ public class HLog implements HConstants, Syncable { long seqNum [] = obtainSeqNum(edits.size()); synchronized (this.updateLock) { // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 9a78911335f..06a25aee79d 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -183,6 +185,31 @@ public class TestHLog extends HBaseTestCase implements HConstants { assertEquals(total * 3, count); reader.close(); } + + /** + * Test the findMemstoresWithEditsOlderThan method. + * @throws IOException + */ + public void testFindMemstoresWithEditsOlderThan() throws IOException { + Map regionsToSeqids = new HashMap(); + for (int i = 0; i < 10; i++) { + Long l = new Long(i); + regionsToSeqids.put(l.toString().getBytes(), l); + } + byte [][] regions = + HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids); + assertEquals(1, regions.length); + assertTrue(Bytes.equals(regions[0], "0".getBytes())); + regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids); + int count = 3; + assertEquals(count, regions.length); + // Regions returned are not ordered. + for (int i = 0; i < count; i++) { + assertTrue(Bytes.equals(regions[i], "0".getBytes()) || + Bytes.equals(regions[i], "1".getBytes()) || + Bytes.equals(regions[i], "2".getBytes())); + } + } private void verifySplits(List splits, final int howmany) throws IOException {