HBASE-5569 Do not collect deleted KVs when they are still in use by a scanner.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5856e90720
commit
41f31490ff
|
@ -696,7 +696,7 @@ public class KeyValue implements Writable, HeapSize {
|
||||||
return "empty";
|
return "empty";
|
||||||
}
|
}
|
||||||
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
|
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
|
||||||
"/vlen=" + getValueLength();
|
"/vlen=" + getValueLength() + "/ts=" + memstoreTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -280,10 +280,15 @@ public class ScanQueryMatcher {
|
||||||
if (!keepDeletedCells) {
|
if (!keepDeletedCells) {
|
||||||
// first ignore delete markers if the scanner can do so, and the
|
// first ignore delete markers if the scanner can do so, and the
|
||||||
// range does not include the marker
|
// range does not include the marker
|
||||||
|
//
|
||||||
|
// during flushes and compactions also ignore delete markers newer
|
||||||
|
// than the readpoint of any open scanner, this prevents deleted
|
||||||
|
// rows that could still be seen by a scanner from being collected
|
||||||
boolean includeDeleteMarker = seePastDeleteMarkers ?
|
boolean includeDeleteMarker = seePastDeleteMarkers ?
|
||||||
tr.withinTimeRange(timestamp) :
|
tr.withinTimeRange(timestamp) :
|
||||||
tr.withinOrAfterTimeRange(timestamp);
|
tr.withinOrAfterTimeRange(timestamp);
|
||||||
if (includeDeleteMarker) {
|
if (includeDeleteMarker
|
||||||
|
&& kv.getMemstoreTS() <= maxReadPointToTrackVersions) {
|
||||||
this.deletes.add(bytes, offset, qualLength, timestamp, type);
|
this.deletes.add(bytes, offset, qualLength, timestamp, type);
|
||||||
}
|
}
|
||||||
// Can't early out now, because DelFam come before any other keys
|
// Can't early out now, because DelFam come before any other keys
|
||||||
|
|
|
@ -404,7 +404,7 @@ public class TestKeyValue extends TestCase {
|
||||||
System.err.println("kv=" + kv);
|
System.err.println("kv=" + kv);
|
||||||
System.err.println("kvFromKey=" + kvFromKey);
|
System.err.println("kvFromKey=" + kvFromKey);
|
||||||
assertEquals(kvFromKey.toString(),
|
assertEquals(kvFromKey.toString(),
|
||||||
kv.toString().replaceAll("=[0-9]+$", "=0"));
|
kv.toString().replaceAll("=[0-9]+", "=0"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.Rule
|
@org.junit.Rule
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -55,11 +54,9 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
|
|
||||||
HRegion region = null;
|
HRegion region = null;
|
||||||
private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private final String DIR = TEST_UTIL.getDataTestDir("TestIncrement").toString();
|
private final String DIR = TEST_UTIL.getDataTestDir("TestAtomicOperation").toString();
|
||||||
|
|
||||||
|
|
||||||
private final int MAX_VERSIONS = 2;
|
|
||||||
|
|
||||||
// Test names
|
// Test names
|
||||||
static final byte[] tableName = Bytes.toBytes("testtable");;
|
static final byte[] tableName = Bytes.toBytes("testtable");;
|
||||||
static final byte[] qual1 = Bytes.toBytes("qual1");
|
static final byte[] qual1 = Bytes.toBytes("qual1");
|
||||||
|
@ -260,8 +257,8 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
|
|
||||||
// create 100 threads, each will alternate between adding and
|
// create 100 threads, each will alternate between adding and
|
||||||
// removing a column
|
// removing a column
|
||||||
int numThreads = 100;
|
int numThreads = 10;
|
||||||
int opsPerThread = 1000;
|
int opsPerThread = 500;
|
||||||
AtomicOperation[] all = new AtomicOperation[numThreads];
|
AtomicOperation[] all = new AtomicOperation[numThreads];
|
||||||
|
|
||||||
AtomicLong timeStamps = new AtomicLong(0);
|
AtomicLong timeStamps = new AtomicLong(0);
|
||||||
|
@ -275,10 +272,12 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
for (int i=0; i<numOps; i++) {
|
for (int i=0; i<numOps; i++) {
|
||||||
try {
|
try {
|
||||||
// throw in some flushes
|
// throw in some flushes
|
||||||
if (r.nextFloat() < 0.001) {
|
if (i%10==0) {
|
||||||
|
synchronized(region) {
|
||||||
LOG.debug("flushing");
|
LOG.debug("flushing");
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
long ts = timeStamps.incrementAndGet();
|
long ts = timeStamps.incrementAndGet();
|
||||||
RowMutations rm = new RowMutations(row);
|
RowMutations rm = new RowMutations(row);
|
||||||
if (op) {
|
if (op) {
|
||||||
|
@ -342,7 +341,7 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
|
|
||||||
// create 100 threads, each will alternate between adding and
|
// create 100 threads, each will alternate between adding and
|
||||||
// removing a column
|
// removing a column
|
||||||
int numThreads = 100;
|
int numThreads = 10;
|
||||||
int opsPerThread = 1000;
|
int opsPerThread = 1000;
|
||||||
AtomicOperation[] all = new AtomicOperation[numThreads];
|
AtomicOperation[] all = new AtomicOperation[numThreads];
|
||||||
|
|
||||||
|
@ -358,10 +357,12 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
for (int i=0; i<numOps; i++) {
|
for (int i=0; i<numOps; i++) {
|
||||||
try {
|
try {
|
||||||
// throw in some flushes
|
// throw in some flushes
|
||||||
if (r.nextFloat() < 0.001) {
|
if (i%10==0) {
|
||||||
|
synchronized(region) {
|
||||||
LOG.debug("flushing");
|
LOG.debug("flushing");
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
long ts = timeStamps.incrementAndGet();
|
long ts = timeStamps.incrementAndGet();
|
||||||
List<Mutation> mrm = new ArrayList<Mutation>();
|
List<Mutation> mrm = new ArrayList<Mutation>();
|
||||||
if (op) {
|
if (op) {
|
||||||
|
@ -386,6 +387,7 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
RegionScanner rs = region.getScanner(s);
|
RegionScanner rs = region.getScanner(s);
|
||||||
List<KeyValue> r = new ArrayList<KeyValue>();
|
List<KeyValue> r = new ArrayList<KeyValue>();
|
||||||
while(rs.next(r));
|
while(rs.next(r));
|
||||||
|
rs.close();
|
||||||
if (r.size() != 1) {
|
if (r.size() != 1) {
|
||||||
LOG.debug(r);
|
LOG.debug(r);
|
||||||
failures.incrementAndGet();
|
failures.incrementAndGet();
|
||||||
|
|
|
@ -131,6 +131,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
r.delete(new Delete(results.get(0).getRow()), null, false);
|
r.delete(new Delete(results.get(0).getRow()), null, false);
|
||||||
if (!result) break;
|
if (!result) break;
|
||||||
} while(true);
|
} while(true);
|
||||||
|
s.close();
|
||||||
// Flush
|
// Flush
|
||||||
r.flushcache();
|
r.flushcache();
|
||||||
// Major compact.
|
// Major compact.
|
||||||
|
|
Loading…
Reference in New Issue