HBASE-6200 KeyComparator.compareWithoutRow can be wrong when families have the same prefix (Jieshan)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1354290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-26 23:40:41 +00:00
parent f4165e27ce
commit 275bfdcc40
2 changed files with 107 additions and 34 deletions

View File

@ -2413,33 +2413,29 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Compare column, timestamp, and key type (everything except the row).
* This method is used both in the normal comparator and the "same-prefix"
* comparator. Note that we are assuming that row portions of both KVs have
* already been parsed and found identical, and we don't validate that
* assumption here.
* @param commonPrefix the length of the common prefix of the two
* key-values being compared, including row length and row
* Compare columnFamily, qualifier, timestamp, and key type (everything
* except the row). This method is used both in the normal comparator and
* the "same-prefix" comparator. Note that we are assuming that row portions
* of both KVs have already been parsed and found identical, and we don't
* validate that assumption here.
* @param commonPrefix
* the length of the common prefix of the two key-values being
* compared, including row length and row
*/
private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
int llength, byte[] right, int roffset, int rlength, short rowlength) {
// Compare column family. Start comparing past row and family length.
int lcolumnoffset = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE +
rowlength + loffset;
int rcolumnoffset = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE +
rowlength + roffset;
int lcolumnlength = llength - TIMESTAMP_TYPE_SIZE -
(lcolumnoffset - loffset);
int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
(rcolumnoffset - roffset);
/***
* KeyValue Format and commonLength:
* |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
* ------------------|-------commonLength--------|--------------
*/
int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
// If row matches, and no column in the 'left' AND put type is 'minimum',
// then return that left is larger than right.
// This supports 'last key on a row' - the magic is if there is no column
// in the left operand, and the left operand has a type of '0' - magical
// value, then we say the left is bigger. This will let us seek to the
// last key in a row.
// commonLength + TIMESTAMP_TYPE_SIZE
int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
// ColumnFamily + Qualifier length.
int lcolumnlength = llength - commonLengthWithTSAndType;
int rcolumnlength = rlength - commonLengthWithTSAndType;
byte ltype = left[loffset + (llength - 1)];
byte rtype = right[roffset + (rlength - 1)];
@ -2457,20 +2453,38 @@ public class KeyValue implements Writable, HeapSize {
return -1;
}
int lfamilyoffset = commonLength + loffset;
int rfamilyoffset = commonLength + roffset;
// Column family length.
int lfamilylength = left[lfamilyoffset - 1];
int rfamilylength = right[rfamilyoffset - 1];
// If left family size is not equal to right family size, we need not
// compare the qualifiers.
boolean sameFamilySize = (lfamilylength == rfamilylength);
int common = 0;
if (commonPrefix > 0) {
common = Math.max(0, commonPrefix -
rowlength - ROW_LENGTH_SIZE - FAMILY_LENGTH_SIZE);
common = Math.max(0, commonPrefix - commonLength);
if (!sameFamilySize) {
// Common should not be larger than Math.min(lfamilylength,
// rfamilylength).
common = Math.min(common, Math.min(lfamilylength, rfamilylength));
} else {
common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
}
final int comparisonResult = Bytes.compareTo(
left, lcolumnoffset + common, lcolumnlength - common,
right, rcolumnoffset + common, rcolumnlength - common);
if (comparisonResult != 0) {
return comparisonResult;
}
if (!sameFamilySize) {
// comparing column family is enough.
return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
- common, right, rfamilyoffset + common, rfamilylength - common);
}
// Compare family & qualifier together.
final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
lcolumnlength - common, right, rfamilyoffset + common,
rcolumnlength - common);
if (comparison != 0) {
return comparison;
}
return compareTimestampAndType(left, loffset, llength, right, roffset,
rlength, ltype, rtype);
}

View File

@ -341,6 +341,65 @@ public class TestKeyValue extends TestCase {
assertTrue(cmp > 0);
}
private void assertKVLessWithoutRow(KeyValue.KeyComparator c, int common, KeyValue less,
KeyValue greater) {
int cmp = c.compareIgnoringPrefix(common, less.getBuffer(), less.getOffset()
+ KeyValue.ROW_OFFSET, less.getKeyLength(), greater.getBuffer(),
greater.getOffset() + KeyValue.ROW_OFFSET, greater.getKeyLength());
assertTrue(cmp < 0);
cmp = c.compareIgnoringPrefix(common, greater.getBuffer(), greater.getOffset()
+ KeyValue.ROW_OFFSET, greater.getKeyLength(), less.getBuffer(),
less.getOffset() + KeyValue.ROW_OFFSET, less.getKeyLength());
assertTrue(cmp > 0);
}
public void testCompareWithoutRow() {
final KeyValue.KeyComparator c = KeyValue.KEY_COMPARATOR;
byte[] row = Bytes.toBytes("row");
byte[] fa = Bytes.toBytes("fa");
byte[] fami = Bytes.toBytes("fami");
byte[] fami1 = Bytes.toBytes("fami1");
byte[] qual0 = Bytes.toBytes("");
byte[] qual1 = Bytes.toBytes("qf1");
byte[] qual2 = Bytes.toBytes("qf2");
long ts = 1;
// 'fa:'
KeyValue kv_0 = new KeyValue(row, fa, qual0, ts, Type.Put);
// 'fami:'
KeyValue kv0_0 = new KeyValue(row, fami, qual0, ts, Type.Put);
// 'fami:qf1'
KeyValue kv0_1 = new KeyValue(row, fami, qual1, ts, Type.Put);
// 'fami:qf2'
KeyValue kv0_2 = new KeyValue(row, fami, qual2, ts, Type.Put);
// 'fami1:'
KeyValue kv1_0 = new KeyValue(row, fami1, qual0, ts, Type.Put);
// 'fami:qf1' < 'fami:qf2'
assertKVLessWithoutRow(c, 0, kv0_1, kv0_2);
// 'fami:qf1' < 'fami1:'
assertKVLessWithoutRow(c, 0, kv0_1, kv1_0);
// Test comparison by skipping the same prefix bytes.
/***
* KeyValue Format and commonLength:
* |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
* ------------------|-------commonLength--------|--------------
*/
int commonLength = KeyValue.ROW_LENGTH_SIZE + KeyValue.FAMILY_LENGTH_SIZE
+ row.length;
// 'fa:' < 'fami:'. They have commonPrefix + 2 same prefix bytes.
assertKVLessWithoutRow(c, commonLength + 2, kv_0, kv0_0);
// 'fami:' < 'fami:qf1'. They have commonPrefix + 4 same prefix bytes.
assertKVLessWithoutRow(c, commonLength + 4, kv0_0, kv0_1);
// 'fami:qf1' < 'fami1:'. They have commonPrefix + 4 same prefix bytes.
assertKVLessWithoutRow(c, commonLength + 4, kv0_1, kv1_0);
// 'fami:qf1' < 'fami:qf2'. They have commonPrefix + 6 same prefix bytes.
assertKVLessWithoutRow(c, commonLength + 6, kv0_1, kv0_2);
}
public void testFirstLastOnRow() {
final KVComparator c = KeyValue.COMPARATOR;
long ts = 1;