HBASE-5416 Improve performance of scans with some kind of filters (Max Lapan and Sergey)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1431103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-09 21:37:35 +00:00
parent afd485e88a
commit 39954ddcd4
18 changed files with 778 additions and 97 deletions

View File

@ -2048,6 +2048,19 @@ public class KeyValue implements Cell, HeapSize {
return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
}
/**
* Create a KeyValue that is smaller than all other possible KeyValues
* for the given row. That is any (valid) KeyValue on 'row' would sort
* _after_ the result.
*
* @param row - row key (arbitrary byte array)
* @return First possible KeyValue on passed <code>row</code>
*/
public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
return new KeyValue(row, roffset, rlength,
null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
}
/**
* Creates a KeyValue that is smaller than all other KeyValues that
* are older than the passed timestamp.

View File

@ -8992,6 +8992,10 @@ public final class ClientProtos {
// optional uint32 storeOffset = 12;
boolean hasStoreOffset();
int getStoreOffset();
// optional bool loadColumnFamiliesOnDemand = 13;
boolean hasLoadColumnFamiliesOnDemand();
boolean getLoadColumnFamiliesOnDemand();
}
public static final class Scan extends
com.google.protobuf.GeneratedMessage
@ -9170,6 +9174,16 @@ public final class ClientProtos {
return storeOffset_;
}
// optional bool loadColumnFamiliesOnDemand = 13;
public static final int LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER = 13;
private boolean loadColumnFamiliesOnDemand_;
public boolean hasLoadColumnFamiliesOnDemand() {
return ((bitField0_ & 0x00000400) == 0x00000400);
}
public boolean getLoadColumnFamiliesOnDemand() {
return loadColumnFamiliesOnDemand_;
}
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@ -9183,6 +9197,7 @@ public final class ClientProtos {
maxResultSize_ = 0L;
storeLimit_ = 0;
storeOffset_ = 0;
loadColumnFamiliesOnDemand_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -9250,6 +9265,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeUInt32(12, storeOffset_);
}
if (((bitField0_ & 0x00000400) == 0x00000400)) {
output.writeBool(13, loadColumnFamiliesOnDemand_);
}
getUnknownFields().writeTo(output);
}
@ -9307,6 +9325,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(12, storeOffset_);
}
if (((bitField0_ & 0x00000400) == 0x00000400)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -9384,6 +9406,11 @@ public final class ClientProtos {
result = result && (getStoreOffset()
== other.getStoreOffset());
}
result = result && (hasLoadColumnFamiliesOnDemand() == other.hasLoadColumnFamiliesOnDemand());
if (hasLoadColumnFamiliesOnDemand()) {
result = result && (getLoadColumnFamiliesOnDemand()
== other.getLoadColumnFamiliesOnDemand());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -9441,6 +9468,10 @@ public final class ClientProtos {
hash = (37 * hash) + STOREOFFSET_FIELD_NUMBER;
hash = (53 * hash) + getStoreOffset();
}
if (hasLoadColumnFamiliesOnDemand()) {
hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@ -9601,6 +9632,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000400);
storeOffset_ = 0;
bitField0_ = (bitField0_ & ~0x00000800);
loadColumnFamiliesOnDemand_ = false;
bitField0_ = (bitField0_ & ~0x00001000);
return this;
}
@ -9705,6 +9738,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000200;
}
result.storeOffset_ = storeOffset_;
if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
to_bitField0_ |= 0x00000400;
}
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -9803,6 +9840,9 @@ public final class ClientProtos {
if (other.hasStoreOffset()) {
setStoreOffset(other.getStoreOffset());
}
if (other.hasLoadColumnFamiliesOnDemand()) {
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -9922,6 +9962,11 @@ public final class ClientProtos {
storeOffset_ = input.readUInt32();
break;
}
case 104: {
bitField0_ |= 0x00001000;
loadColumnFamiliesOnDemand_ = input.readBool();
break;
}
}
}
}
@ -10654,6 +10699,27 @@ public final class ClientProtos {
return this;
}
// optional bool loadColumnFamiliesOnDemand = 13;
private boolean loadColumnFamiliesOnDemand_ ;
public boolean hasLoadColumnFamiliesOnDemand() {
return ((bitField0_ & 0x00001000) == 0x00001000);
}
public boolean getLoadColumnFamiliesOnDemand() {
return loadColumnFamiliesOnDemand_;
}
public Builder setLoadColumnFamiliesOnDemand(boolean value) {
bitField0_ |= 0x00001000;
loadColumnFamiliesOnDemand_ = value;
onChanged();
return this;
}
public Builder clearLoadColumnFamiliesOnDemand() {
bitField0_ = (bitField0_ & ~0x00001000);
loadColumnFamiliesOnDemand_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:Scan)
}
@ -21446,7 +21512,7 @@ public final class ClientProtos {
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\027\n\006mutat" +
"e\030\002 \002(\0132\007.Mutate\022\035\n\tcondition\030\003 \001(\0132\n.Co" +
"ndition\"<\n\016MutateResponse\022\027\n\006result\030\001 \001(" +
"\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\243\002\n\004Scan\022\027" +
"\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\307\002\n\004Scan\022\027" +
"\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribute\030\002 \003" +
"(\0132\016.NameBytesPair\022\020\n\010startRow\030\003 \001(\014\022\017\n\007" +
"stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.Filter\022\035" +
@ -21454,50 +21520,50 @@ public final class ClientProtos {
"ions\030\007 \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true" +
"\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001" +
"(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 " +
"\001(\r\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Re" +
"gionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tsc" +
"annerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014c" +
"loseScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\" +
"\n\014ScanResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022" +
"\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region",
"\030\001 \002(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n" +
"\017LockRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030" +
"\002 \001(\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(" +
"\0132\020.RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021U" +
"nlockRowResponse\"\260\001\n\024BulkLoadHFileReques" +
"t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nf" +
"amilyPath\030\002 \003(\0132 .BulkLoadHFileRequest.F" +
"amilyPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamil" +
"yPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025B" +
"ulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026",
"CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013s" +
"erviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007" +
"request\030\004 \002(\014\"d\n\031CoprocessorServiceReque" +
"st\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004" +
"call\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032C" +
"oprocessorServiceResponse\022 \n\006region\030\001 \002(" +
"\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.Nam" +
"eBytesPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(" +
"\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionR" +
"esult\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texcepti",
"on\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest" +
"\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006ac" +
"tion\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010" +
"\".\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Acti" +
"onResult2\223\003\n\rClientService\022 \n\003get\022\013.GetR" +
"equest\032\014.GetResponse\022)\n\006mutate\022\016.MutateR" +
"equest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRe" +
"quest\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRo" +
"wRequest\032\020.LockRowResponse\0222\n\tunlockRow\022" +
"\021.UnlockRowRequest\032\022.UnlockRowResponse\022>",
"\n\rbulkLoadHFile\022\025.BulkLoadHFileRequest\032\026" +
".BulkLoadHFileResponse\022F\n\013execService\022\032." +
"CoprocessorServiceRequest\032\033.CoprocessorS" +
"erviceResponse\022&\n\005multi\022\r.MultiRequest\032\016" +
".MultiResponseBB\n*org.apache.hadoop.hbas" +
"e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" +
"\001\001"
"\001(\r\022\"\n\032loadColumnFamiliesOnDemand\030\r \001(\010\"" +
"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Region" +
"Specifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscanne" +
"rId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014close" +
"Scanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Sc" +
"anResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\ts" +
"cannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003t",
"tl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002" +
"(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017Loc" +
"kRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(" +
"\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020." +
"RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021Unloc" +
"kRowResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamil" +
"yPath\030\002 \003(\0132 .BulkLoadHFileRequest.Famil" +
"yPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPat" +
"h\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkL",
"oadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026Copr" +
"ocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013servi" +
"ceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007requ" +
"est\030\004 \002(\014\"d\n\031CoprocessorServiceRequest\022 " +
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call" +
"\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032Copro" +
"cessorServiceResponse\022 \n\006region\030\001 \002(\0132\020." +
"RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameByt" +
"esPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007." +
"Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResul",
"t\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030\002" +
" \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006" +
"region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action" +
"\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\r" +
"MultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionRe" +
"sult2\223\003\n\rClientService\022 \n\003get\022\013.GetReque" +
"st\032\014.GetResponse\022)\n\006mutate\022\016.MutateReque" +
"st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
"t\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReq" +
"uest\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Un",
"lockRowRequest\032\022.UnlockRowResponse\022>\n\rbu" +
"lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
"kLoadHFileResponse\022F\n\013execService\022\032.Copr" +
"ocessorServiceRequest\032\033.CoprocessorServi" +
"ceResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -21597,7 +21663,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", },
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
internal_static_ScanRequest_descriptor =

View File

@ -186,6 +186,7 @@ message Scan {
optional uint64 maxResultSize = 10;
optional uint32 storeLimit = 11;
optional uint32 storeOffset = 12;
optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
}
/**

View File

@ -92,7 +92,7 @@ public class Scan extends OperationWithAttributes {
private int storeLimit = -1;
private int storeOffset = 0;
// If application wants to collect scan metrics, it needs to
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
@ -110,6 +110,7 @@ public class Scan extends OperationWithAttributes {
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
/**
* Create a Scan operation across all rows.
@ -159,6 +160,7 @@ public class Scan extends OperationWithAttributes {
maxResultSize = scan.getMaxResultSize();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
TimeRange ctr = scan.getTimeRange();
tr = new TimeRange(ctr.getMin(), ctr.getMax());
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@ -518,6 +520,41 @@ public class Scan extends OperationWithAttributes {
return cacheBlocks;
}
/**
* Set the value indicating whether loading CFs on demand should be allowed (cluster
* default is false). On-demand CF loading doesn't load column families until necessary, e.g.
* if you filter on one column, the other column family data will be loaded only for the rows
* that are included in result, not all rows like in normal case.
* With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true,
* this can deliver huge perf gains when there's a cf with lots of data; however, it can
* also lead to some inconsistent results, as follows:
* - if someone does a concurrent update to both column families in question you may get a row
* that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } }
* someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan
* filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 },
* { video => "my dog" } }.
* - if there's a concurrent split and you have more than 2 column families, some rows may be
* missing some column families.
*/
public void setLoadColumnFamiliesOnDemand(boolean value) {
this.loadColumnFamiliesOnDemand = value;
}
/**
* Get the raw loadColumnFamiliesOnDemand setting; if it's not set, can be null.
*/
public Boolean getLoadColumnFamiliesOnDemandValue() {
return this.loadColumnFamiliesOnDemand;
}
/**
* Get the logical value indicating whether on-demand CF loading should be allowed.
*/
public boolean doLoadColumnFamiliesOnDemand() {
return (this.loadColumnFamiliesOnDemand != null)
&& this.loadColumnFamiliesOnDemand.booleanValue();
}
/**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,
@ -547,7 +584,7 @@ public class Scan extends OperationWithAttributes {
* Useful for debugging, logging, and administration tools.
* @param maxCols a limit on the number of columns output prior to truncation
* @return Map
*/
*/
@Override
public Map<String, Object> toMap(int maxCols) {
// start with the fingerpring map and build on top of it
@ -564,6 +601,7 @@ public class Scan extends OperationWithAttributes {
map.put("caching", this.caching);
map.put("maxResultSize", this.maxResultSize);
map.put("cacheBlocks", this.cacheBlocks);
map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
List<Long> timeRange = new ArrayList<Long>();
timeRange.add(this.tr.getMin());
timeRange.add(this.tr.getMax());

View File

@ -171,6 +171,14 @@ public abstract class Filter {
*/
abstract public KeyValue getNextKeyHint(final KeyValue currentKV);
/**
* Check that given column family is essential for filter to check row. Most
* filters always return true here. But some could have more sophisticated
* logic which could significantly reduce scanning process by not even
* touching columns until we are 100% sure that it's data is needed in result.
*/
abstract public boolean isFamilyEssential(byte[] name);
/**
* @return The filter serialized using pb
*/

View File

@ -133,6 +133,16 @@ public abstract class FilterBase extends Filter {
return null;
}
/**
* By default, we require all scan's column families to be present. Our
* subclasses may be more precise.
*
* @inheritDoc
*/
public boolean isFamilyEssential(byte[] name) {
return true;
}
/**
* Given the filter's arguments it constructs the filter
* <p>

View File

@ -360,6 +360,16 @@ public class FilterList extends Filter {
return keyHint;
}
@Override
public boolean isFamilyEssential(byte[] name) {
for (Filter filter : filters) {
if (filter.isFamilyEssential(name)) {
return true;
}
}
return false;
}
@Override
public String toString() {
return toString(MAX_LOG_FILTERS);

View File

@ -136,6 +136,11 @@ public class FilterWrapper extends Filter {
}
}
@Override
public boolean isFamilyEssential(byte[] name) {
return filter.isFamilyEssential(name);
};
/**
* @param other
* @return true if and only if the fields of the filter that are serialized

View File

@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
/**
* A {@link Filter} that checks a single column value, but does not emit the
@ -96,16 +98,22 @@ public class SingleColumnValueExcludeFilter extends SingleColumnValueFilter {
matchedColumn,filterIfMissing,latestVersionOnly);
}
public ReturnCode filterKeyValue(KeyValue keyValue) {
ReturnCode superRetCode = super.filterKeyValue(keyValue);
if (superRetCode == ReturnCode.INCLUDE) {
// We cleaned result row in FilterRow to be consistent with scanning process.
public boolean hasFilterRow() {
return true;
}
// Here we remove from row all key values from testing column
public void filterRow(List<KeyValue> kvs) {
Iterator it = kvs.iterator();
while (it.hasNext()) {
KeyValue kv = (KeyValue)it.next();
// If the current column is actually the tested column,
// we will skip it instead.
if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
return ReturnCode.SKIP;
if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
it.remove();
}
}
return superRetCode;
}
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {

View File

@ -379,6 +379,15 @@ public class SingleColumnValueFilter extends FilterBase {
&& this.getLatestVersionOnly() == other.getLatestVersionOnly();
}
/**
* The only CF this filter needs is given column family. So, it's the only essential
* column in whole scan. If filterIfMissing == false, all families are essential,
* because of possibility of skipping the rows without any data in filtered CF.
*/
public boolean isFamilyEssential(byte[] name) {
return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
}
@Override
public String toString() {
return String.format("%s (%s, %s, %s, %s)",

View File

@ -138,6 +138,10 @@ public class SkipFilter extends FilterBase {
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
public boolean isFamilyEssential(byte[] name) {
return filter.isFamilyEssential(name);
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();

View File

@ -138,6 +138,10 @@ public class WhileMatchFilter extends FilterBase {
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
public boolean isFamilyEssential(byte[] name) {
return filter.isFamilyEssential(name);
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();

View File

@ -565,6 +565,10 @@ public final class ProtobufUtil {
if (scan.getMaxResultSize() > 0) {
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
}
Boolean loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
if (loadColumnFamiliesOnDemand != null) {
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
}
scanBuilder.setMaxVersions(scan.getMaxVersions());
TimeRange timeRange = scan.getTimeRange();
if (!timeRange.isAllTime()) {
@ -648,6 +652,9 @@ public final class ProtobufUtil {
if (proto.hasStoreOffset()) {
scan.setRowOffsetPerColumnFamily(proto.getStoreOffset());
}
if (proto.hasLoadColumnFamiliesOnDemand()) {
scan.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
}
if (proto.hasTimeRange()) {
HBaseProtos.TimeRange timeRange = proto.getTimeRange();
long minStamp = 0;

View File

@ -184,6 +184,9 @@ public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
private static final String MERGEDIR = ".merges";
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@ -280,6 +283,13 @@ public class HRegion implements HeapSize { // , Writable{
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
* The default setting for whether to enable on-demand CF loading for
* scan requests to this region. Requests can override it.
*/
private boolean isLoadingCfsOnDemandDefault = false;
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
@ -455,6 +465,8 @@ public class HRegion implements HeapSize { // , Writable{
.add(htd.getValues());
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
this.regionInfo = regionInfo;
this.htableDescriptor = htd;
this.rsServices = rsServices;
@ -915,6 +927,10 @@ public class HRegion implements HeapSize { // , Writable{
return mvcc;
}
public boolean isLoadingCfsOnDemandDefault() {
return this.isLoadingCfsOnDemandDefault;
}
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@ -3383,6 +3399,15 @@ public class HRegion implements HeapSize { // , Writable{
class RegionScannerImpl implements RegionScanner {
// Package local for testability
KeyValueHeap storeHeap = null;
/** Heap of key-values that are not essential for the provided filters and are thus read
* on demand, if on-demand column family loading is enabled.*/
KeyValueHeap joinedHeap = null;
/**
* If the joined heap data gathering is interrupted due to scan limits, this will
* contain the row for which we are populating the values.*/
private KeyValue joinedContinuationRow = null;
// KeyValue indicating that limit is reached when scanning
private final KeyValue KV_LIMIT = new KeyValue();
private final byte [] stopRow;
private Filter filter;
private List<KeyValue> results = new ArrayList<KeyValue>();
@ -3429,7 +3454,10 @@ public class HRegion implements HeapSize { // , Writable{
scannerReadPoints.put(this, this.readPt);
}
// Here we separate all scanners into two lists - scanner that provide data required
// by the filter to operate (scanners list) and all others (joinedScanners list).
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
@ -3438,9 +3466,17 @@ public class HRegion implements HeapSize { // , Writable{
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
}
}
RegionScannerImpl(Scan scan) throws IOException {
@ -3527,6 +3563,43 @@ public class HRegion implements HeapSize { // , Writable{
return next(outResults, batch, metric);
}
private void populateFromJoinedHeap(int limit, String metric) throws IOException {
assert joinedContinuationRow != null;
KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(),
joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric);
if (kv != KV_LIMIT) {
// We are done with this row, reset the continuation.
joinedContinuationRow = null;
}
// As the data is obtained from two independent heaps, we need to
// ensure that result list is sorted, because Result relies on that.
Collections.sort(results, comparator);
}
/**
* Fetches records with currentRow into results list, until next row or limit (if not -1).
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
* @param limit Max amount of KVs to place in result list, -1 means no limit.
* @param currentRow Byte array with key we are fetching.
* @param offset offset for currentRow
* @param length length for currentRow
* @param metric Metric key to be passed into KeyValueHeap::next().
* @return KV_LIMIT if limit reached, next KeyValue otherwise.
*/
private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset,
short length, String metric) throws IOException {
KeyValue nextKv;
do {
heap.next(results, limit - results.size(), metric);
if (limit > 0 && results.size() == limit) {
return KV_LIMIT;
}
nextKv = heap.peek();
} while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
return nextKv;
}
/*
* @return True if a filter rules the scanner is over, done.
*/
@ -3536,6 +3609,11 @@ public class HRegion implements HeapSize { // , Writable{
private boolean nextInternal(int limit, String metric) throws IOException {
RpcCallContext rpcCall = HBaseServer.getCurrentCall();
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
// "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
// and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
if (rpcCall != null) {
// If a user specifies a too-restrictive or too-slow scanner, the
@ -3545,7 +3623,9 @@ public class HRegion implements HeapSize { // , Writable{
rpcCall.throwExceptionIfCallerDisconnected();
}
// Let's see what we have in the storeHeap.
KeyValue current = this.storeHeap.peek();
byte[] currentRow = null;
int offset = 0;
short length = 0;
@ -3554,38 +3634,47 @@ public class HRegion implements HeapSize { // , Writable{
offset = current.getRowOffset();
length = current.getRowLength();
}
if (isStopRow(currentRow, offset, length)) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
return false;
} else if (filterRowKey(currentRow, offset, length)) {
nextRow(currentRow, offset, length);
} else {
KeyValue nextKv;
do {
this.storeHeap.next(results, limit - results.size(), metric);
if (limit > 0 && results.size() == limit) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // we are expecting more yes, but also limited to how many we can return.
boolean stopRow = isStopRow(currentRow, offset, length);
// Check if we were getting data from the joinedHeap and hit the limit.
// If not, then it's main path - getting results from storeHeap.
if (joinedContinuationRow == null) {
// First, check if we are at a stop row. If so, there are no more results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
nextKv = this.storeHeap.peek();
} while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
return false;
}
final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
nextRow(currentRow, offset, length);
continue;
}
// now that we have an entire row, lets process with a filters:
KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length,
metric);
// Ok, we are good, let's try to get some results from the main heap.
if (nextKv == KV_LIMIT) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // We hit the limit.
}
// first filter with the filterRow(List)
stopRow = nextKv == null ||
isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
// save that the row was empty before filters applied to it.
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List).
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
if (results.isEmpty()) {
if (isEmptyRow) {
// this seems like a redundant step - we already consumed the row
// there're no left overs.
// the reasons for calling this method are:
@ -3594,12 +3683,48 @@ public class HRegion implements HeapSize { // , Writable{
nextRow(currentRow, offset, length);
// This row was totally filtered out, if this is NOT the last row,
// we should continue on.
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
return false;
}
return !stopRow;
// Ok, we are done with storeHeap for this row.
// Now we may need to fetch additional, non-essential data into row.
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
KeyValue nextJoinedKv = joinedHeap.peek();
// If joinedHeap is pointing to some other row, try to seek to a correct one.
// We don't need to recheck that row here - populateResult will take care of that.
boolean mayHaveData =
(nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
|| this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(limit, metric);
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(limit, metric);
}
// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
return true;
}
// Finally, we are done with both joinedHeap and storeHeap.
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
nextRow(currentRow, offset, length);
if (!stopRow) continue;
}
// We are done. Return the result.
return !stopRow;
}
}
@ -3609,8 +3734,9 @@ public class HRegion implements HeapSize { // , Writable{
}
protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
KeyValue next;
while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST);
}
results.clear();
@ -3621,7 +3747,7 @@ public class HRegion implements HeapSize { // , Writable{
return currentRow == null ||
(stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length,
currentRow, offset, length) <= isScan);
currentRow, offset, length) <= isScan);
}
@Override
@ -3630,6 +3756,10 @@ public class HRegion implements HeapSize { // , Writable{
storeHeap.close();
storeHeap = null;
}
if (joinedHeap != null) {
joinedHeap.close();
joinedHeap = null;
}
// no need to sychronize here.
scannerReadPoints.remove(this);
this.filterClosed = true;
@ -3644,16 +3774,21 @@ public class HRegion implements HeapSize { // , Writable{
if (row == null) {
throw new IllegalArgumentException("Row cannot be null.");
}
boolean result = false;
startRegionOperation();
try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
KeyValue kv = KeyValue.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
return this.storeHeap.requestSeek(kv, true, true);
result = this.storeHeap.requestSeek(kv, true, true);
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
} finally {
closeRegionOperation();
}
return result;
}
}

View File

@ -2898,7 +2898,12 @@ public class HRegionServer implements ClientProtocol,
} else {
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
// if the request doesn't set this, get the default region setting.
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);

View File

@ -27,6 +27,9 @@ import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import java.util.List;
import java.util.ArrayList;
/**
* Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
* extends {@link SingleColumnValueFilter}, only the added functionality is
@ -52,16 +55,18 @@ public class TestSingleColumnValueExcludeFilter {
CompareOp.EQUAL, VAL_1);
// A 'match' situation
KeyValue kv;
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
// INCLUDE expected because test column has not yet passed
assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
// Test column will pass (will match), will SKIP because test columns are excluded
assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
// Test column has already passed and matched, all subsequent columns are INCLUDE
kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
List<KeyValue> kvs = new ArrayList<KeyValue>();
KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
filter.filterRow(kvs);
assertEquals("resultSize", kvs.size(), 2);
assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
// A 'mismatch' situation

View File

@ -67,9 +67,11 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -198,7 +200,7 @@ public class TestHRegion extends HBaseTestCase {
System.out.println(results);
assertEquals(0, results.size());
}
@Test
public void testToShowNPEOnRegionScannerReseek() throws Exception{
String method = "testToShowNPEOnRegionScannerReseek";
@ -2491,6 +2493,166 @@ public class TestHRegion extends HBaseTestCase {
}
}
/**
* Added for HBASE-5416
*
* Here we test scan optimization when only subset of CFs are used in filter
* conditions.
*/
public void testScanner_JoinedScanners() throws IOException {
byte [] tableName = Bytes.toBytes("testTable");
byte [] cf_essential = Bytes.toBytes("essential");
byte [] cf_joined = Bytes.toBytes("joined");
byte [] cf_alpha = Bytes.toBytes("alpha");
this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
try {
byte [] row1 = Bytes.toBytes("row1");
byte [] row2 = Bytes.toBytes("row2");
byte [] row3 = Bytes.toBytes("row3");
byte [] col_normal = Bytes.toBytes("d");
byte [] col_alpha = Bytes.toBytes("a");
byte [] filtered_val = Bytes.toBytes(3);
Put put = new Put(row1);
put.add(cf_essential, col_normal, Bytes.toBytes(1));
put.add(cf_joined, col_alpha, Bytes.toBytes(1));
region.put(put);
put = new Put(row2);
put.add(cf_essential, col_alpha, Bytes.toBytes(2));
put.add(cf_joined, col_normal, Bytes.toBytes(2));
put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
region.put(put);
put = new Put(row3);
put.add(cf_essential, col_normal, filtered_val);
put.add(cf_joined, col_normal, filtered_val);
region.put(put);
// Check two things:
// 1. result list contains expected values
// 2. result list is sorted properly
Scan scan = new Scan();
Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
CompareOp.NOT_EQUAL, filtered_val);
scan.setFilter(filter);
scan.setLoadColumnFamiliesOnDemand(true);
InternalScanner s = region.getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
assertTrue(s.next(results));
assertEquals(results.size(), 1);
results.clear();
assertTrue(s.next(results));
assertEquals(results.size(), 3);
assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha));
assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential));
assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined));
results.clear();
assertFalse(s.next(results));
assertEquals(results.size(), 0);
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
/**
* HBASE-5416
*
* Test case when scan limits amount of KVs returned on each next() call.
*/
public void testScanner_JoinedScannersWithLimits() throws IOException {
final byte [] tableName = Bytes.toBytes("testTable");
final byte [] cf_first = Bytes.toBytes("first");
final byte [] cf_second = Bytes.toBytes("second");
this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
try {
final byte [] col_a = Bytes.toBytes("a");
final byte [] col_b = Bytes.toBytes("b");
Put put;
for (int i = 0; i < 10; i++) {
put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
put.add(cf_first, col_a, Bytes.toBytes(i));
if (i < 5) {
put.add(cf_first, col_b, Bytes.toBytes(i));
put.add(cf_second, col_a, Bytes.toBytes(i));
put.add(cf_second, col_b, Bytes.toBytes(i));
}
region.put(put);
}
Scan scan = new Scan();
scan.setLoadColumnFamiliesOnDemand(true);
Filter bogusFilter = new FilterBase() {
@Override
public boolean isFamilyEssential(byte[] name) {
return Bytes.equals(name, cf_first);
}
};
scan.setFilter(bogusFilter);
InternalScanner s = region.getScanner(scan);
// Our data looks like this:
// r0: first:a, first:b, second:a, second:b
// r1: first:a, first:b, second:a, second:b
// r2: first:a, first:b, second:a, second:b
// r3: first:a, first:b, second:a, second:b
// r4: first:a, first:b, second:a, second:b
// r5: first:a
// r6: first:a
// r7: first:a
// r8: first:a
// r9: first:a
// But due to next's limit set to 3, we should get this:
// r0: first:a, first:b, second:a
// r0: second:b
// r1: first:a, first:b, second:a
// r1: second:b
// r2: first:a, first:b, second:a
// r2: second:b
// r3: first:a, first:b, second:a
// r3: second:b
// r4: first:a, first:b, second:a
// r4: second:b
// r5: first:a
// r6: first:a
// r7: first:a
// r8: first:a
// r9: first:a
List<KeyValue> results = new ArrayList<KeyValue>();
int index = 0;
while (true) {
boolean more = s.next(results, 3);
if ((index >> 1) < 5) {
if (index % 2 == 0)
assertEquals(results.size(), 3);
else
assertEquals(results.size(), 1);
}
else
assertEquals(results.size(), 1);
results.clear();
index++;
if (!more) break;
}
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
//////////////////////////////////////////////////////////////////////////////
// Split test
//////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,191 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test performance improvement of joined scanners optimization:
* https://issues.apache.org/jira/browse/HBASE-5416
*/
@Category(LargeTests.class)
public class TestJoinedScanners {
static final Log LOG = LogFactory.getLog(TestJoinedScanners.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString();
private static final byte[] tableName = Bytes.toBytes("testTable");
private static final byte[] cf_essential = Bytes.toBytes("essential");
private static final byte[] cf_joined = Bytes.toBytes("joined");
private static final byte[] col_name = Bytes.toBytes("a");
private static final byte[] flag_yes = Bytes.toBytes("Y");
private static final byte[] flag_no = Bytes.toBytes("N");
@Test
public void testJoinedScanners() throws Exception {
String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
int regionServersCount = 3;
HBaseTestingUtility htu = new HBaseTestingUtility();
final int DEFAULT_BLOCK_SIZE = 1024*1024;
htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
htu.getConfiguration().setInt("dfs.replication", 1);
htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L);
MiniHBaseCluster cluster = null;
try {
cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
byte [][] families = {cf_essential, cf_joined};
HTable ht = htu.createTable(
Bytes.toBytes(this.getClass().getSimpleName()), families);
long rows_to_insert = 10000;
int insert_batch = 20;
int flag_percent = 1;
int large_bytes = 128 * 1024;
long time = System.nanoTime();
LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = "
+ Float.toString(rows_to_insert * large_bytes / 1024 / 1024) + " MB");
byte [] val_large = new byte[large_bytes];
List<Put> puts = new ArrayList<Put>();
for (long i = 0; i < rows_to_insert; i++) {
Put put = new Put(Bytes.toBytes(Long.toString (i)));
if (i % 100 <= flag_percent) {
put.add(cf_essential, col_name, flag_yes);
}
else {
put.add(cf_essential, col_name, flag_no);
}
put.add(cf_joined, col_name, val_large);
puts.add(put);
if (puts.size() >= insert_batch) {
ht.put(puts);
puts.clear();
}
}
if (puts.size() >= 0) {
ht.put(puts);
puts.clear();
}
LOG.info("Data generated in "
+ Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds");
boolean slow = true;
for (int i = 0; i < 20; ++i) {
runScanner(ht, slow);
slow = !slow;
}
ht.close();
} finally {
if (cluster != null) {
htu.shutdownMiniCluster();
}
}
}
private void runScanner(HTable table, boolean slow) throws Exception {
long time = System.nanoTime();
Scan scan = new Scan();
scan.addColumn(cf_essential, col_name);
scan.addColumn(cf_joined, col_name);
SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf_essential, col_name, CompareFilter.CompareOp.EQUAL, flag_yes);
filter.setFilterIfMissing(true);
scan.setFilter(filter);
scan.setLoadColumnFamiliesOnDemand(!slow);
ResultScanner result_scanner = table.getScanner(scan);
Result res;
long rows_count = 0;
while ((res = result_scanner.next()) != null) {
rows_count++;
}
double timeSec = (System.nanoTime() - time) / 1000000000.0;
result_scanner.close();
LOG.info((slow ? "Slow" : "Joined") + " scanner finished in " + Double.toString(timeSec)
+ " seconds, got " + Long.toString(rows_count/2) + " rows");
}
private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, byte[]... families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
Path path = new Path(DIR + callingMethod);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw new IOException("Failed delete of " + path);
}
}
return HRegion.createHRegion(info, path, conf, htd);
}
}