HBASE-2052 Upper bound of outstanding WALs can be overrun

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@895331 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-01-03 04:08:51 +00:00
parent 94464432f8
commit 70312ea625
4 changed files with 92 additions and 32 deletions

View File

@ -278,6 +278,7 @@ Release 0.21.0 - Unreleased
HBASE-2036 Use Configuration instead of HBaseConfiguration (Enis Soztutar via Stack) HBASE-2036 Use Configuration instead of HBaseConfiguration (Enis Soztutar via Stack)
HBASE-2085 StringBuffer -> StringBuilder - conversion of references as necessary HBASE-2085 StringBuffer -> StringBuilder - conversion of references as necessary
(Kay Kay via Stack) (Kay Kay via Stack)
HBASE-2052 Upper bound of outstanding WALs can be overrun
NEW FEATURES NEW FEATURES
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write

View File

@ -82,9 +82,9 @@ class LogRoller extends Thread implements LogRollListener {
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try { try {
this.lastrolltime = now; this.lastrolltime = now;
byte [] regionToFlush = server.getLog().rollWriter(); byte [][] regionsToFlush = server.getLog().rollWriter();
if (regionToFlush != null) { if (regionsToFlush != null) {
scheduleFlush(regionToFlush); for (byte [] r: regionsToFlush) scheduleFlush(r);
} }
} catch (FailedLogCloseException e) { } catch (FailedLogCloseException e) {
LOG.fatal("Forcing server shutdown", e); LOG.fatal("Forcing server shutdown", e);

View File

@ -154,7 +154,7 @@ public class HLog implements HConstants, Syncable {
Collections.synchronizedSortedMap(new TreeMap<Long, Path>()); Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
/* /*
* Map of region to last sequence/edit id. * Map of regions to first sequence/edit id in their memstore.
*/ */
private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten = private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
@ -327,21 +327,21 @@ public class HLog implements HConstants, Syncable {
* cacheFlushLock and then completeCacheFlush could be called which would wait * cacheFlushLock and then completeCacheFlush could be called which would wait
* for the lock on this and consequently never release the cacheFlushLock * for the lock on this and consequently never release the cacheFlushLock
* *
* @return If lots of logs, flush the returned region so next time through * @return If lots of logs, flush the returned regions so next time through
* we can clean logs. Returns null if nothing to flush. * we can clean logs. Returns null if nothing to flush.
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
* @throws IOException * @throws IOException
*/ */
public byte [] rollWriter() throws FailedLogCloseException, IOException { public byte [][] rollWriter() throws FailedLogCloseException, IOException {
// Return if nothing to flush. // Return if nothing to flush.
if (this.writer != null && this.numEntries.get() <= 0) { if (this.writer != null && this.numEntries.get() <= 0) {
return null; return null;
} }
byte [] regionToFlush = null; byte [][] regionsToFlush = null;
this.cacheFlushLock.lock(); this.cacheFlushLock.lock();
try { try {
if (closed) { if (closed) {
return regionToFlush; return regionsToFlush;
} }
synchronized (updateLock) { synchronized (updateLock) {
// Clean up current writer. // Clean up current writer.
@ -367,7 +367,7 @@ public class HLog implements HConstants, Syncable {
} }
this.outputfiles.clear(); this.outputfiles.clear();
} else { } else {
regionToFlush = cleanOldLogs(); regionsToFlush = cleanOldLogs();
} }
} }
this.numEntries.set(0); this.numEntries.set(0);
@ -376,7 +376,7 @@ public class HLog implements HConstants, Syncable {
} finally { } finally {
this.cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
} }
return regionToFlush; return regionsToFlush;
} }
/** /**
@ -433,8 +433,7 @@ public class HLog implements HConstants, Syncable {
* we can clean logs. Returns null if nothing to flush. * we can clean logs. Returns null if nothing to flush.
* @throws IOException * @throws IOException
*/ */
private byte [] cleanOldLogs() throws IOException { private byte [][] cleanOldLogs() throws IOException {
byte [] regionToFlush = null;
Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum(); Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
// Get the set of all log files whose final ID is older than or // Get the set of all log files whose final ID is older than or
// equal to the oldest pending region operation // equal to the oldest pending region operation
@ -442,29 +441,60 @@ public class HLog implements HConstants, Syncable {
new TreeSet<Long>(this.outputfiles.headMap( new TreeSet<Long>(this.outputfiles.headMap(
(Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
// Now remove old log files (if any) // Now remove old log files (if any)
byte [] oldestRegion = null; int logsToRemove = sequenceNumbers.size();
if (LOG.isDebugEnabled()) { if (logsToRemove > 0) {
// Find region associated with oldest key -- helps debugging. if (LOG.isDebugEnabled()) {
oldestRegion = getOldestRegion(oldestOutstandingSeqNum); // Find associated region; helps debugging.
LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " + byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
" out of total " + this.outputfiles.size() + "; " + LOG.debug("Found " + logsToRemove + " hlogs to remove " +
"oldest outstanding seqnum is " + oldestOutstandingSeqNum + " out of total " + this.outputfiles.size() + "; " +
" from region " + Bytes.toStringBinary(oldestRegion)); "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
} " from region " + Bytes.toString(oldestRegion));
if (sequenceNumbers.size() > 0) { }
for (Long seq : sequenceNumbers) { for (Long seq : sequenceNumbers) {
deleteLogFile(this.outputfiles.remove(seq), seq); deleteLogFile(this.outputfiles.remove(seq), seq);
} }
} }
int countOfLogs = this.outputfiles.size() - sequenceNumbers.size();
if (countOfLogs > this.maxLogs) { // If too many log files, figure which regions we need to flush.
regionToFlush = oldestRegion != null? byte [][] regions = null;
oldestRegion: getOldestRegion(oldestOutstandingSeqNum); int logCount = this.outputfiles.size() - logsToRemove;
LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" + if (logCount > this.maxLogs && this.outputfiles != null &&
this.maxLogs + "; forcing flush of region with oldest edits: " + this.outputfiles.size() > 0) {
Bytes.toStringBinary(regionToFlush)); regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
this.lastSeqWritten);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
sb.toString());
} }
return regionToFlush; return regions;
}
/**
* Return regions (memstores) that have edits that are less than the passed
* <code>oldestWALseqid</code>.
* @param oldestWALseqid
* @param regionsToSeqids
* @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
* necessarily in order). Null if no regions found.
*/
static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid,
final Map<byte [], Long> regionsToSeqids) {
// This method is static so it can be unit tested the easier.
List<byte []> regions = null;
for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
if (e.getValue().longValue() < oldestWALseqid) {
if (regions == null) regions = new ArrayList<byte []>();
regions.add(e.getKey());
}
}
return regions == null?
null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
} }
/* /*
@ -614,7 +644,8 @@ public class HLog implements HConstants, Syncable {
long seqNum = obtainSeqNum(); long seqNum = obtainSeqNum();
logKey.setLogSeqNum(seqNum); logKey.setLogSeqNum(seqNum);
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the // write for each region (i.e. the first edit added to the particular
// memstore). When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush // region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten. // is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
@ -661,7 +692,8 @@ public class HLog implements HConstants, Syncable {
long seqNum [] = obtainSeqNum(edits.size()); long seqNum [] = obtainSeqNum(edits.size());
synchronized (this.updateLock) { synchronized (this.updateLock) {
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the // write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush // region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten. // is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -184,6 +186,31 @@ public class TestHLog extends HBaseTestCase implements HConstants {
reader.close(); reader.close();
} }
/**
* Test the findMemstoresWithEditsOlderThan method.
* @throws IOException
*/
public void testFindMemstoresWithEditsOlderThan() throws IOException {
Map<byte [], Long> regionsToSeqids = new HashMap<byte [], Long>();
for (int i = 0; i < 10; i++) {
Long l = new Long(i);
regionsToSeqids.put(l.toString().getBytes(), l);
}
byte [][] regions =
HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids);
assertEquals(1, regions.length);
assertTrue(Bytes.equals(regions[0], "0".getBytes()));
regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids);
int count = 3;
assertEquals(count, regions.length);
// Regions returned are not ordered.
for (int i = 0; i < count; i++) {
assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
Bytes.equals(regions[i], "1".getBytes()) ||
Bytes.equals(regions[i], "2".getBytes()));
}
}
private void verifySplits(List<Path> splits, final int howmany) private void verifySplits(List<Path> splits, final int howmany)
throws IOException { throws IOException {
assertEquals(howmany, splits.size()); assertEquals(howmany, splits.size());