HBASE-1500, 1513, 1516, 1520
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@784649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8f41f8593f
commit
0f67b602c7
|
@ -171,6 +171,11 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1504 Remove left-over debug from 1304 commit
|
||||
HBASE-1518 Delete Trackers using compareRow, should just use raw
|
||||
binary comparator (Jon Gray via Stack)
|
||||
HBASE-1500 KeyValue$KeyComparator array overrun
|
||||
HBASE-1513 Compactions too slow
|
||||
HBASE-1516 Investigate if StoreScanner will not return the next row if
|
||||
earlied-out of previous row (Jon Gray)
|
||||
HBASE-1520 StoreFileScanner catches and ignore IOExceptions from HFile
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -130,7 +130,7 @@ public class KeyValue implements Writable, HeapSize {
|
|||
* @param tableName The table name.
|
||||
* @return The comparator.
|
||||
*/
|
||||
public static RawComparator<byte []> getRowComparator(byte [] tableName) {
|
||||
public static KeyComparator getRowComparator(byte [] tableName) {
|
||||
if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) {
|
||||
return ROOT_COMPARATOR.getRawComparator();
|
||||
}
|
||||
|
|
|
@ -697,7 +697,8 @@ public class HConnectionManager implements HConstants {
|
|||
// signifying that the region we're checking is actually the last
|
||||
// region in the table.
|
||||
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
|
||||
KeyValue.getRowComparator(tableName).compare(endKey, row) > 0) {
|
||||
KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
|
||||
row, 0, row.length) > 0) {
|
||||
return possibleRegion;
|
||||
}
|
||||
}
|
||||
|
@ -734,7 +735,8 @@ public class HConnectionManager implements HConstants {
|
|||
|
||||
// by nature of the map, we know that the start key has to be <
|
||||
// otherwise it wouldn't be in the headMap.
|
||||
if (KeyValue.getRowComparator(tableName).compare(endKey, row) <= 0) {
|
||||
if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
|
||||
row, 0, row.length) <= 0) {
|
||||
// delete any matching entry
|
||||
HRegionLocation rl =
|
||||
tableLocations.remove(matchingRegions.lastKey());
|
||||
|
|
|
@ -51,7 +51,7 @@ public class HalfHFileReader extends HFile.Reader {
|
|||
final boolean top;
|
||||
// This is the key we split around. Its the first possible entry on a row:
|
||||
// i.e. empty column and a timestamp of LATEST_TIMESTAMP.
|
||||
final byte [] splitkey;
|
||||
protected final byte [] splitkey;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
|
@ -83,36 +83,50 @@ public class HalfHFileReader extends HFile.Reader {
|
|||
final HFileScanner s = super.getScanner();
|
||||
return new HFileScanner() {
|
||||
final HFileScanner delegate = s;
|
||||
public boolean atEnd = false;
|
||||
|
||||
public ByteBuffer getKey() {
|
||||
if (atEnd) return null;
|
||||
return delegate.getKey();
|
||||
}
|
||||
|
||||
public String getKeyString() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getKeyString();
|
||||
}
|
||||
|
||||
public ByteBuffer getValue() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getValue();
|
||||
}
|
||||
|
||||
public String getValueString() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getValueString();
|
||||
}
|
||||
|
||||
public KeyValue getKeyValue() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getKeyValue();
|
||||
}
|
||||
|
||||
public boolean next() throws IOException {
|
||||
if (atEnd) return false;
|
||||
|
||||
boolean b = delegate.next();
|
||||
if (!b) {
|
||||
return b;
|
||||
}
|
||||
// constrain the bottom.
|
||||
if (!top) {
|
||||
ByteBuffer bb = getKey();
|
||||
if (getComparator().compare(bb.array(), bb.arrayOffset(), bb.limit(),
|
||||
splitkey, 0, splitkey.length) >= 0) {
|
||||
atEnd = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -151,7 +165,7 @@ public class HalfHFileReader extends HFile.Reader {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
boolean b = delegate.seekTo();
|
||||
if (!b) {
|
||||
return b;
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -33,22 +35,18 @@ import java.io.IOException;
|
|||
* and optionally the memcache-snapshot.
|
||||
*/
|
||||
public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner {
|
||||
private QueryMatcher matcher;
|
||||
|
||||
private KeyValueHeap heap;
|
||||
|
||||
private ScanDeleteTracker deleteTracker;
|
||||
private KeyValue.KVComparator comparator;
|
||||
|
||||
MinorCompactingStoreScanner(Store store,
|
||||
KeyValueScanner [] scanners) {
|
||||
Scan scan = new Scan();
|
||||
|
||||
// No max version, no ttl matching, start at first row, all columns.
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
null, Long.MAX_VALUE, store.comparator.getRawComparator(),
|
||||
store.versionsToReturn(Integer.MAX_VALUE));
|
||||
|
||||
comparator = store.comparator;
|
||||
deleteTracker = new ScanDeleteTracker(store.comparator.getRawComparator());
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
for (KeyValueScanner scanner : scanners ) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
scanner.seek(firstKv);
|
||||
}
|
||||
|
||||
heap = new KeyValueHeap(scanners, store.comparator);
|
||||
|
@ -56,13 +54,12 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
|
||||
MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
|
||||
KeyValueScanner [] scanners) {
|
||||
Scan scan = new Scan();
|
||||
matcher = new ScanQueryMatcher(scan, Bytes.toBytes(cfName),
|
||||
null, Long.MAX_VALUE, comparator.getRawComparator(),
|
||||
Integer.MAX_VALUE);
|
||||
this.comparator = comparator;
|
||||
deleteTracker = new ScanDeleteTracker(comparator.getRawComparator());
|
||||
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
for (KeyValueScanner scanner : scanners ) {
|
||||
scanner.seek(matcher.getStartKey());
|
||||
scanner.seek(firstKv);
|
||||
}
|
||||
|
||||
heap = new KeyValueHeap(scanners, comparator);
|
||||
|
@ -82,32 +79,94 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
KeyValue peeked = heap.peek();
|
||||
if (peeked == null) {
|
||||
/**
|
||||
* High performance merge scan.
|
||||
* @param writer
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(HFile.Writer writer) throws IOException {
|
||||
KeyValue row = heap.peek();
|
||||
if (row == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
matcher.setRow(peeked.getRow());
|
||||
// between rows.
|
||||
deleteTracker.reset();
|
||||
|
||||
KeyValue kv;
|
||||
while ((kv = heap.peek()) != null) {
|
||||
// if delete type, output no matter what:
|
||||
if (kv.getType() != KeyValue.Type.Put.getCode())
|
||||
results.add(kv);
|
||||
|
||||
switch (matcher.match(kv)) {
|
||||
case INCLUDE:
|
||||
results.add(heap.next());
|
||||
continue;
|
||||
case DONE:
|
||||
if (results.isEmpty()) {
|
||||
matcher.setRow(heap.peek().getRow());
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
// check to see if this is a different row
|
||||
if (comparator.compareRows(row, kv) != 0) {
|
||||
// reached next row
|
||||
return true;
|
||||
}
|
||||
heap.next();
|
||||
|
||||
// if delete type, output no matter what:
|
||||
if (kv.getType() != KeyValue.Type.Put.getCode()) {
|
||||
deleteTracker.add(kv.getBuffer(),
|
||||
kv.getQualifierOffset(),
|
||||
kv.getQualifierLength(),
|
||||
kv.getTimestamp(),
|
||||
kv.getType());
|
||||
|
||||
writer.append(heap.next());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (deleteTracker.isDeleted(kv.getBuffer(),
|
||||
kv.getQualifierOffset(),
|
||||
kv.getQualifierLength(),
|
||||
kv.getTimestamp())) {
|
||||
heap.next();
|
||||
continue;
|
||||
}
|
||||
|
||||
writer.append(heap.next());
|
||||
}
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
KeyValue row = heap.peek();
|
||||
if (row == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
// between rows.
|
||||
deleteTracker.reset();
|
||||
|
||||
KeyValue kv;
|
||||
while ((kv = heap.peek()) != null) {
|
||||
// check to see if this is a different row
|
||||
if (comparator.compareRows(row, kv) != 0) {
|
||||
// reached next row
|
||||
return true;
|
||||
}
|
||||
|
||||
// if delete type, output no matter what:
|
||||
if (kv.getType() != KeyValue.Type.Put.getCode()) {
|
||||
deleteTracker.add(kv.getBuffer(),
|
||||
kv.getQualifierOffset(),
|
||||
kv.getQualifierLength(),
|
||||
kv.getTimestamp(),
|
||||
kv.getType());
|
||||
|
||||
results.add(heap.next());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (deleteTracker.isDeleted(kv.getBuffer(),
|
||||
kv.getQualifierOffset(),
|
||||
kv.getQualifierLength(),
|
||||
kv.getTimestamp())) {
|
||||
heap.next();
|
||||
continue;
|
||||
}
|
||||
|
||||
results.add(heap.next());
|
||||
}
|
||||
close();
|
||||
return false;
|
||||
|
|
|
@ -54,7 +54,6 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
public ScanQueryMatcher(Scan scan, byte [] family,
|
||||
NavigableSet<byte[]> columns, long ttl,
|
||||
KeyValue.KeyComparator rowComparator, int maxVersions) {
|
||||
this.row = row;
|
||||
this.tr = scan.getTimeRange();
|
||||
this.oldestStamp = System.currentTimeMillis() - ttl;
|
||||
this.rowComparator = rowComparator;
|
||||
|
|
|
@ -834,30 +834,43 @@ public class Store implements HConstants {
|
|||
scanners[i] = new StoreFileScanner(filesToCompact.get(i).getReader().getScanner());
|
||||
}
|
||||
|
||||
InternalScanner scanner;
|
||||
if (majorCompaction) {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(family.getMaxVersions());
|
||||
// TODO pass in the scanners/store files.
|
||||
scanner = new StoreScanner(this, scan, null);
|
||||
} else {
|
||||
scanner = new MinorCompactingStoreScanner(this, scanners);
|
||||
}
|
||||
InternalScanner scanner = null;
|
||||
|
||||
// since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
ArrayList<KeyValue> row = new ArrayList<KeyValue>();
|
||||
boolean more = true;
|
||||
while ( more ) {
|
||||
more = scanner.next(row);
|
||||
// output to writer:
|
||||
for (KeyValue kv : row) {
|
||||
writer.append(kv);
|
||||
try {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(family.getMaxVersions());
|
||||
// TODO pass in the scanners/store files.
|
||||
scanner = new StoreScanner(this, scan, null);
|
||||
|
||||
// since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
ArrayList<KeyValue> row = new ArrayList<KeyValue>();
|
||||
boolean more = true;
|
||||
while ( more ) {
|
||||
more = scanner.next(row);
|
||||
// output to writer:
|
||||
for (KeyValue kv : row) {
|
||||
writer.append(kv);
|
||||
}
|
||||
row.clear();
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
} else {
|
||||
MinorCompactingStoreScanner scanner = null;
|
||||
try {
|
||||
scanner = new MinorCompactingStoreScanner(this, scanners);
|
||||
|
||||
while ( scanner.next(writer) ) { }
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
row.clear();
|
||||
}
|
||||
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -333,7 +333,8 @@ public class StoreFile implements HConstants {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + (isTop()? ", half=top": ", half=bottom");
|
||||
return super.toString() + (isTop()? ", half=top": ", half=bottom") +
|
||||
" splitKey: " + KeyValue.keyToString(splitkey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,6 +40,10 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
public StoreFileScanner(HFileScanner hfs) {
|
||||
this.hfs = hfs;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
|
||||
}
|
||||
|
||||
public KeyValue peek() {
|
||||
return cur;
|
||||
|
@ -49,10 +53,12 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
KeyValue retKey = cur;
|
||||
cur = hfs.getKeyValue();
|
||||
try {
|
||||
hfs.next();
|
||||
// only seek if we arent at the end. cur == null implies 'end'.
|
||||
if (cur != null)
|
||||
hfs.next();
|
||||
} catch(IOException e) {
|
||||
// Only occurs if the scanner is not seeked, this is never the case
|
||||
// as we seek immediately after construction in StoreScanner
|
||||
// Turn checked exception into runtime exception.
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return retKey;
|
||||
}
|
||||
|
|
|
@ -124,8 +124,6 @@ ChangedReadersObserver {
|
|||
* @return true if there are more rows, false if scanner is done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result) throws IOException {
|
||||
// this wont get us the next row if the previous round hasn't iterated
|
||||
// past all the cols from the previous row. Potential bug!
|
||||
KeyValue peeked = this.heap.peek();
|
||||
if (peeked == null) {
|
||||
close();
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -221,6 +222,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(row1, fam4, null, null));
|
||||
|
||||
|
||||
//testing existing family
|
||||
byte [] family = fam2;
|
||||
try {
|
||||
|
@ -238,6 +240,55 @@ public class TestHRegion extends HBaseTestCase {
|
|||
ok = true;
|
||||
}
|
||||
assertEquals("Family " +new String(family)+ " does exist", true, ok);
|
||||
}
|
||||
|
||||
public void testDelete_mixed() throws IOException {
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] fam = Bytes.toBytes("info");
|
||||
byte [][] families = {fam};
|
||||
String method = this.getName();
|
||||
initHRegion(tableName, method, families);
|
||||
|
||||
byte [] row = Bytes.toBytes("table_name");
|
||||
// column names
|
||||
byte [] serverinfo = Bytes.toBytes("serverinfo");
|
||||
byte [] splitA = Bytes.toBytes("splitA");
|
||||
byte [] splitB = Bytes.toBytes("splitB");
|
||||
|
||||
// add some data:
|
||||
Put put = new Put(row);
|
||||
put.add(fam, splitA, Bytes.toBytes("reference_A"));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row);
|
||||
put.add(fam, splitB, Bytes.toBytes("reference_B"));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row);
|
||||
put.add(fam, serverinfo, Bytes.toBytes("ip_address"));
|
||||
region.put(put);
|
||||
|
||||
// ok now delete a split:
|
||||
Delete delete = new Delete(row);
|
||||
delete.deleteColumns(fam, splitA);
|
||||
region.delete(delete, null, true);
|
||||
|
||||
// assert some things:
|
||||
Get get = new Get(row).addColumn(fam, serverinfo);
|
||||
Result result = region.get(get, null);
|
||||
assertEquals(1, result.size());
|
||||
|
||||
get = new Get(row).addColumn(fam, splitA);
|
||||
result = region.get(get, null);
|
||||
assertEquals(0, result.size());
|
||||
|
||||
get = new Get(row).addColumn(fam, splitB);
|
||||
result = region.get(get, null);
|
||||
assertEquals(1, result.size());
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
//Visual test, since the method doesn't return anything
|
||||
|
|
Loading…
Reference in New Issue