HBASE-4853 HBASE-4789 does overzealous pruning of seqids
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1205719 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2c1c5223f4
commit
054c0a2418
|
@ -1284,8 +1284,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// rows then)
|
||||
status.setStatus("Obtaining lock to block concurrent updates");
|
||||
this.updatesLock.writeLock().lock();
|
||||
long flushsize = this.memstoreSize.get();
|
||||
status.setStatus("Preparing to flush by snapshotting stores");
|
||||
long currentMemStoreSize = 0;
|
||||
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
|
||||
try {
|
||||
// Record the mvcc for all transactions in progress.
|
||||
|
@ -1307,8 +1307,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} finally {
|
||||
this.updatesLock.writeLock().unlock();
|
||||
}
|
||||
status.setStatus("Waiting for mvcc");
|
||||
LOG.debug("Finished snapshotting " + this + ", commencing wait for mvcc");
|
||||
String s = "Finished snapshotting " + this +
|
||||
", commencing wait for mvcc, flushsize=" + flushsize;
|
||||
status.setStatus(s);
|
||||
LOG.debug(s);
|
||||
|
||||
// wait for all in-progress transactions to commit to HLog before
|
||||
// we can start the flush. This prevents
|
||||
|
@ -1346,8 +1348,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
storeFlushers.clear();
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
currentMemStoreSize =
|
||||
this.addAndGetGlobalMemstoreSize(-this.memstoreSize.get());
|
||||
this.addAndGetGlobalMemstoreSize(-flushsize);
|
||||
} catch (Throwable t) {
|
||||
// An exception here means that the snapshot was not persisted.
|
||||
// The hlog needs to be replayed so its content is restored to memstore.
|
||||
|
@ -1385,14 +1386,17 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
long memstoresize = this.memstoreSize.get();
|
||||
String msg = "Finished memstore flush of ~" +
|
||||
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
|
||||
this + " in " + time + "ms, sequenceid=" + sequenceId +
|
||||
", compaction requested=" + compactionRequested +
|
||||
((wal == null)? "; wal=null": "");
|
||||
StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
|
||||
", currentsize=" +
|
||||
StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
|
||||
" for region " + this + " in " + time + "ms, sequenceid=" + sequenceId +
|
||||
", compaction requested=" + compactionRequested +
|
||||
((wal == null)? "; wal=null": "");
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
|
||||
|
||||
return compactionRequested;
|
||||
}
|
||||
|
|
|
@ -1538,11 +1538,6 @@ public class HLog implements Syncable {
|
|||
// Cleaning up of lastSeqWritten is in the finally clause because we
|
||||
// don't want to confuse getOldestOutstandingSeqNum()
|
||||
this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
|
||||
Long l = this.lastSeqWritten.remove(encodedRegionName);
|
||||
if (l != null) {
|
||||
LOG.warn("Why is there a raw encodedRegionName in lastSeqWritten? name=" +
|
||||
Bytes.toString(encodedRegionName) + ", seqid=" + l);
|
||||
}
|
||||
this.cacheFlushLock.unlock();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class TestGlobalMemStoreSize {
|
|||
cluster = TEST_UTIL.getHBaseCluster();
|
||||
LOG.info("Waiting for active/ready master");
|
||||
cluster.waitForActiveAndReadyMaster();
|
||||
|
||||
|
||||
// Create a table with regions
|
||||
byte [] table = Bytes.toBytes("TestGlobalMemStoreSize");
|
||||
byte [] family = Bytes.toBytes("family");
|
||||
|
@ -78,7 +78,7 @@ public class TestGlobalMemStoreSize {
|
|||
regionNum);
|
||||
assertEquals(regionNum,numRegions);
|
||||
waitForAllRegionsAssigned();
|
||||
|
||||
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
long globalMemStoreSize = 0;
|
||||
for (HRegionInfo regionInfo : server.getOnlineRegions()) {
|
||||
|
@ -89,29 +89,66 @@ public class TestGlobalMemStoreSize {
|
|||
assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(),
|
||||
globalMemStoreSize);
|
||||
}
|
||||
|
||||
|
||||
// check the global memstore size after flush
|
||||
int i = 0;
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
LOG.info("Starting flushes on " + server.getServerName() + ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
LOG.info("Starting flushes on " + server.getServerName() +
|
||||
", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
// If meta region on this server, flush it last since if we flush it first,
|
||||
// it could get edits while other flushes are running since this a
|
||||
// running cluster.
|
||||
HRegion meta = null;
|
||||
for (HRegionInfo regionInfo : server.getOnlineRegions()) {
|
||||
HRegion r = server.getFromOnlineRegions(regionInfo.getEncodedName());
|
||||
LOG.info("Flush " + r.toString() + " on " + server.getServerName() + ", " + r.flushcache() + ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize());;
|
||||
if (regionInfo.isMetaRegion()) {
|
||||
meta = r;
|
||||
continue;
|
||||
}
|
||||
flush(r, server);
|
||||
}
|
||||
// If meta, flush it last
|
||||
if (meta != null) flush(meta, server);
|
||||
LOG.info("Post flush on " + server.getServerName());
|
||||
long now = System.currentTimeMillis();
|
||||
long timeout = now + 3000;
|
||||
long timeout = now + 1000;
|
||||
while(server.getRegionServerAccounting().getGlobalMemstoreSize() != 0 &&
|
||||
timeout < System.currentTimeMillis()) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
assertEquals("Server=" + server.getServerName() + ", i=" + i++, 0,
|
||||
server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
long size = server.getRegionServerAccounting().getGlobalMemstoreSize();
|
||||
if (size > 0) {
|
||||
// If size > 0, see if its because the meta region got edits while
|
||||
// our test was running....
|
||||
for (HRegionInfo regionInfo : server.getOnlineRegions()) {
|
||||
HRegion r = server.getFromOnlineRegions(regionInfo.getEncodedName());
|
||||
long l = r.getMemstoreSize().longValue();
|
||||
if (l > 0) {
|
||||
LOG.info(r.toString() + " " + l + ", reflushing");
|
||||
r.flushcache();
|
||||
}
|
||||
}
|
||||
}
|
||||
size = server.getRegionServerAccounting().getGlobalMemstoreSize();
|
||||
assertEquals("Server=" + server.getServerName() + ", i=" + i++, 0, size);
|
||||
}
|
||||
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Flush and log stats on flush
|
||||
* @param r
|
||||
* @param server
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flush(final HRegion r, final HRegionServer server)
|
||||
throws IOException {
|
||||
LOG.info("Flush " + r.toString() + " on " + server.getServerName() +
|
||||
", " + r.flushcache() + ", size=" +
|
||||
server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
}
|
||||
|
||||
/** figure out how many regions are currently being served. */
|
||||
private int getRegionCount() throws IOException {
|
||||
int total = 0;
|
||||
|
@ -143,5 +180,4 @@ public class TestGlobalMemStoreSize {
|
|||
} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue