HBASE-14406 The dataframe datasource filter is wrong, and will result in data loss or unexpected behavior (Ted Malaska)

This commit is contained in:
tedyu 2015-10-16 11:22:07 -07:00
parent 30cf4e3761
commit dae1775a11
12 changed files with 3460 additions and 2281 deletions

View File

@ -168,16 +168,3 @@ message RowRange {
message MultiRowRangeFilter { message MultiRowRangeFilter {
repeated RowRange row_range_list = 1; repeated RowRange row_range_list = 1;
} }
message SQLPredicatePushDownColumnFilter {
required bytes column_family = 1;
required bytes qualifier = 2;
repeated bytes get_point_list = 3;
repeated RowRange range_list = 4;
}
message SQLPredicatePushDownFilter {
repeated SQLPredicatePushDownColumnFilter column_filter_list = 1;
}

View File

@ -512,6 +512,14 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
</dependencies> </dependencies>
@ -603,5 +611,43 @@
<surefire.skipSecondPart>true</surefire.skipSecondPart> <surefire.skipSecondPart>true</surefire.skipSecondPart>
</properties> </properties>
</profile> </profile>
<profile>
<id>compile-protobuf</id>
<activation>
<property>
<name>compile-protobuf</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<imports>
<param>${basedir}/src/main/protobuf</param>
</imports>
<source>
<directory>${basedir}/src/main/protobuf</directory>
<includes>
<include>Filter.proto</include>
</includes>
</source>
<!--<output>${project.build.directory}/generated-sources/java</output>-->
<output>${basedir}/src/main/java/</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles> </profiles>
</project> </project>

View File

@ -24,8 +24,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; import org.apache.hadoop.hbase.spark.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import scala.collection.mutable.MutableList; import scala.collection.mutable.MutableList;
import java.io.IOException; import java.io.IOException;
@ -38,51 +39,133 @@ import java.util.Map;
* by SparkSQL so that we have make the filters at the region server level * by SparkSQL so that we have make the filters at the region server level
* and avoid sending the data back to the client to be filtered. * and avoid sending the data back to the client to be filtered.
*/ */
public class SparkSQLPushDownFilter extends FilterBase { public class SparkSQLPushDownFilter extends FilterBase{
protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class); protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class);
HashMap<ColumnFamilyQualifierMapKeyWrapper, ColumnFilter> columnFamilyQualifierFilterMap; //The following values are populated with protobuffer
DynamicLogicExpression dynamicLogicExpression;
byte[][] valueFromQueryArray;
HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
currentCellToColumnIndexMap;
public SparkSQLPushDownFilter(HashMap<ColumnFamilyQualifierMapKeyWrapper, //The following values are transient
ColumnFilter> columnFamilyQualifierFilterMap) { HashMap<String, ByteArrayComparable> columnToCurrentRowValueMap = null;
this.columnFamilyQualifierFilterMap = columnFamilyQualifierFilterMap;
static final byte[] rowKeyFamily = new byte[0];
static final byte[] rowKeyQualifier = Bytes.toBytes("key");
public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
byte[][] valueFromQueryArray,
HashMap<ByteArrayComparable,
HashMap<ByteArrayComparable, String>>
currentCellToColumnIndexMap) {
this.dynamicLogicExpression = dynamicLogicExpression;
this.valueFromQueryArray = valueFromQueryArray;
this.currentCellToColumnIndexMap = currentCellToColumnIndexMap;
}
public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
byte[][] valueFromQueryArray,
MutableList<SchemaQualifierDefinition> columnDefinitions) {
this.dynamicLogicExpression = dynamicLogicExpression;
this.valueFromQueryArray = valueFromQueryArray;
//generate family qualifier to index mapping
this.currentCellToColumnIndexMap =
new HashMap<>();
for (int i = 0; i < columnDefinitions.size(); i++) {
SchemaQualifierDefinition definition = columnDefinitions.get(i).get();
ByteArrayComparable familyByteComparable =
new ByteArrayComparable(definition.columnFamilyBytes(),
0, definition.columnFamilyBytes().length);
HashMap<ByteArrayComparable, String> qualifierIndexMap =
currentCellToColumnIndexMap.get(familyByteComparable);
if (qualifierIndexMap == null) {
qualifierIndexMap = new HashMap<>();
currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap);
}
ByteArrayComparable qualifierByteComparable =
new ByteArrayComparable(definition.qualifierBytes(), 0,
definition.qualifierBytes().length);
qualifierIndexMap.put(qualifierByteComparable, definition.columnName());
}
} }
/**
* This method will find the related filter logic for the given
* column family and qualifier then execute it. It will also
* not clone the in coming cell to avoid extra object creation
*
* @param c The Cell to be validated
* @return ReturnCode object to determine if skipping is required
* @throws IOException
*/
@Override @Override
public ReturnCode filterKeyValue(Cell c) throws IOException { public ReturnCode filterKeyValue(Cell c) throws IOException {
//Get filter if one exist //If the map RowValueMap is empty then we need to populate
ColumnFilter filter = // the row key
columnFamilyQualifierFilterMap.get(new ColumnFamilyQualifierMapKeyWrapper( if (columnToCurrentRowValueMap == null) {
c.getFamilyArray(), columnToCurrentRowValueMap = new HashMap<>();
HashMap<ByteArrayComparable, String> qualifierColumnMap =
currentCellToColumnIndexMap.get(
new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length));
if (qualifierColumnMap != null) {
String rowKeyColumnName =
qualifierColumnMap.get(
new ByteArrayComparable(rowKeyQualifier, 0,
rowKeyQualifier.length));
//Make sure that the rowKey is part of the where clause
if (rowKeyColumnName != null) {
columnToCurrentRowValueMap.put(rowKeyColumnName,
new ByteArrayComparable(c.getRowArray(),
c.getRowOffset(), c.getRowLength()));
}
}
}
//Always populate the column value into the RowValueMap
ByteArrayComparable currentFamilyByteComparable =
new ByteArrayComparable(c.getFamilyArray(),
c.getFamilyOffset(), c.getFamilyOffset(),
c.getFamilyLength(), c.getFamilyLength());
c.getQualifierArray(),
HashMap<ByteArrayComparable, String> qualifierColumnMap =
currentCellToColumnIndexMap.get(
currentFamilyByteComparable);
if (qualifierColumnMap != null) {
String columnName =
qualifierColumnMap.get(
new ByteArrayComparable(c.getQualifierArray(),
c.getQualifierOffset(), c.getQualifierOffset(),
c.getQualifierLength())); c.getQualifierLength()));
if (filter == null) { if (columnName != null) {
//If no filter then just include values columnToCurrentRowValueMap.put(columnName,
return ReturnCode.INCLUDE; new ByteArrayComparable(c.getValueArray(),
} else { c.getValueOffset(), c.getValueLength()));
//If there is a filter then run validation }
if (filter.validate(c.getValueArray(), c.getValueOffset(), c.getValueLength())) { }
return ReturnCode.INCLUDE; return ReturnCode.INCLUDE;
} }
@Override
public boolean filterRow() throws IOException {
try {
boolean result =
dynamicLogicExpression.execute(columnToCurrentRowValueMap,
valueFromQueryArray);
columnToCurrentRowValueMap = null;
return !result;
} catch (Throwable e) {
log.error("Error running dynamic logic on row", e);
} }
//If validation fails then skip whole row return false;
return ReturnCode.NEXT_ROW;
} }
/** /**
* @param pbBytes A pb serialized instance * @param pbBytes A pb serialized instance
* @return An instance of SparkSQLPushDownFilter * @return An instance of SparkSQLPushDownFilter
@ -92,47 +175,55 @@ public class SparkSQLPushDownFilter extends FilterBase {
public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes) public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes)
throws DeserializationException { throws DeserializationException {
HashMap<ColumnFamilyQualifierMapKeyWrapper,
ColumnFilter> columnFamilyQualifierFilterMap = new HashMap<>();
FilterProtos.SQLPredicatePushDownFilter proto; FilterProtos.SQLPredicatePushDownFilter proto;
try { try {
proto = FilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes); proto = FilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e); throw new DeserializationException(e);
} }
final List<FilterProtos.SQLPredicatePushDownColumnFilter> columnFilterListList =
proto.getColumnFilterListList();
for (FilterProtos.SQLPredicatePushDownColumnFilter columnFilter: columnFilterListList) { //Load DynamicLogicExpression
DynamicLogicExpression dynamicLogicExpression =
DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression());
byte[] columnFamily = columnFilter.getColumnFamily().toByteArray(); //Load valuesFromQuery
byte[] qualifier = columnFilter.getQualifier().toByteArray(); final List<ByteString> valueFromQueryArrayList = proto.getValueFromQueryArrayList();
final ColumnFamilyQualifierMapKeyWrapper columnFamilyQualifierMapKeyWrapper = byte[][] valueFromQueryArray = new byte[valueFromQueryArrayList.size()][];
new ColumnFamilyQualifierMapKeyWrapper(columnFamily, 0, columnFamily.length, for (int i = 0; i < valueFromQueryArrayList.size(); i++) {
qualifier, 0, qualifier.length); valueFromQueryArray[i] = valueFromQueryArrayList.get(i).toByteArray();
final MutableList<byte[]> points = new MutableList<>();
final MutableList<ScanRange> scanRanges = new MutableList<>();
for (ByteString byteString: columnFilter.getGetPointListList()) {
points.$plus$eq(byteString.toByteArray());
} }
for (FilterProtos.RowRange rowRange: columnFilter.getRangeListList()) { //Load mapping from HBase family/qualifier to Spark SQL columnName
ScanRange scanRange = new ScanRange(rowRange.getStopRow().toByteArray(), HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
rowRange.getStopRowInclusive(), currentCellToColumnIndexMap = new HashMap<>();
rowRange.getStartRow().toByteArray(),
rowRange.getStartRowInclusive()); for (FilterProtos.SQLPredicatePushDownCellToColumnMapping
scanRanges.$plus$eq(scanRange); sqlPredicatePushDownCellToColumnMapping :
proto.getCellToColumnMappingList()) {
byte[] familyArray =
sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray();
ByteArrayComparable familyByteComparable =
new ByteArrayComparable(familyArray, 0, familyArray.length);
HashMap<ByteArrayComparable, String> qualifierMap =
currentCellToColumnIndexMap.get(familyByteComparable);
if (qualifierMap == null) {
qualifierMap = new HashMap<>();
currentCellToColumnIndexMap.put(familyByteComparable, qualifierMap);
}
byte[] qualifierArray =
sqlPredicatePushDownCellToColumnMapping.getQualifier().toByteArray();
ByteArrayComparable qualifierByteComparable =
new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length);
qualifierMap.put(qualifierByteComparable,
sqlPredicatePushDownCellToColumnMapping.getColumnName());
} }
final ColumnFilter columnFilterObj = new ColumnFilter(null, null, points, scanRanges); return new SparkSQLPushDownFilter(dynamicLogicExpression,
valueFromQueryArray, currentCellToColumnIndexMap);
columnFamilyQualifierFilterMap.put(columnFamilyQualifierMapKeyWrapper, columnFilterObj);
}
return new SparkSQLPushDownFilter(columnFamilyQualifierFilterMap);
} }
/** /**
@ -143,44 +234,27 @@ public class SparkSQLPushDownFilter extends FilterBase {
FilterProtos.SQLPredicatePushDownFilter.Builder builder = FilterProtos.SQLPredicatePushDownFilter.Builder builder =
FilterProtos.SQLPredicatePushDownFilter.newBuilder(); FilterProtos.SQLPredicatePushDownFilter.newBuilder();
FilterProtos.SQLPredicatePushDownColumnFilter.Builder columnBuilder = FilterProtos.SQLPredicatePushDownCellToColumnMapping.Builder columnMappingBuilder =
FilterProtos.SQLPredicatePushDownColumnFilter.newBuilder(); FilterProtos.SQLPredicatePushDownCellToColumnMapping.newBuilder();
FilterProtos.RowRange.Builder rowRangeBuilder = FilterProtos.RowRange.newBuilder();
for (Map.Entry<ColumnFamilyQualifierMapKeyWrapper, ColumnFilter> entry :
columnFamilyQualifierFilterMap.entrySet()) {
columnBuilder.setColumnFamily(
ByteStringer.wrap(entry.getKey().cloneColumnFamily()));
columnBuilder.setQualifier(
ByteStringer.wrap(entry.getKey().cloneQualifier()));
final MutableList<byte[]> points = entry.getValue().points();
int pointLength = points.length();
for (int i = 0; i < pointLength; i++) {
byte[] point = points.get(i).get();
columnBuilder.addGetPointList(ByteStringer.wrap(point));
builder.setDynamicLogicExpression(dynamicLogicExpression.toExpressionString());
for (byte[] valueFromQuery: valueFromQueryArray) {
builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery));
} }
final MutableList<ScanRange> ranges = entry.getValue().ranges(); for (Map.Entry<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
int rangeLength = ranges.length(); familyEntry : currentCellToColumnIndexMap.entrySet()) {
for (int i = 0; i < rangeLength; i++) { for (Map.Entry<ByteArrayComparable, String> qualifierEntry :
ScanRange scanRange = ranges.get(i).get(); familyEntry.getValue().entrySet()) {
rowRangeBuilder.clear(); columnMappingBuilder.setColumnFamily(
rowRangeBuilder.setStartRow(ByteStringer.wrap(scanRange.lowerBound())); ByteStringer.wrap(familyEntry.getKey().bytes()));
rowRangeBuilder.setStopRow(ByteStringer.wrap(scanRange.upperBound())); columnMappingBuilder.setQualifier(
rowRangeBuilder.setStartRowInclusive(scanRange.isLowerBoundEqualTo()); ByteStringer.wrap(qualifierEntry.getKey().bytes()));
rowRangeBuilder.setStopRowInclusive(scanRange.isUpperBoundEqualTo()); columnMappingBuilder.setColumnName(qualifierEntry.getValue());
builder.addCellToColumnMapping(columnMappingBuilder.build());
columnBuilder.addRangeList(rowRangeBuilder.build()); }
} }
builder.addColumnFilterList(columnBuilder.build());
columnBuilder.clear();
}
return builder.build().toByteArray(); return builder.build().toByteArray();
} }
} }

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used for filters
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.spark.protobuf.generated";
option java_outer_classname = "FilterProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SQLPredicatePushDownCellToColumnMapping {
required bytes column_family = 1;
required bytes qualifier = 2;
required string column_name = 3;
}
message SQLPredicatePushDownFilter {
required string dynamic_logic_expression = 1;
repeated bytes value_from_query_array = 2;
repeated SQLPredicatePushDownCellToColumnMapping cell_to_column_mapping = 3;
}

View File

@ -0,0 +1,47 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.util.Bytes
class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length:Int = -1)
extends Comparable[ByteArrayComparable] {
if (length == -1) {
length = bytes.length
}
override def compareTo(o: ByteArrayComparable): Int = {
Bytes.compareTo(bytes, offset, length, o.bytes, o.offset, o.length)
}
override def hashCode(): Int = {
Bytes.hashCode(bytes, offset, length)
}
override def equals (obj: Any): Boolean = {
obj match {
case b: ByteArrayComparable =>
Bytes.equals(bytes, offset, length, b.bytes, b.offset, b.length)
case _ =>
false
}
}
}

View File

@ -22,7 +22,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan} import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
import org.apache.hadoop.hbase.types._ import org.apache.hadoop.hbase.types._
import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes} import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange,
PositionedByteRange, Bytes}
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
@ -44,7 +45,7 @@ import scala.collection.mutable
* - Type conversions of basic SQL types. All conversions will be * - Type conversions of basic SQL types. All conversions will be
* Through the HBase Bytes object commands. * Through the HBase Bytes object commands.
*/ */
class DefaultSource extends RelationProvider { class DefaultSource extends RelationProvider with Logging {
val TABLE_KEY:String = "hbase.table" val TABLE_KEY:String = "hbase.table"
val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping" val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
@ -52,6 +53,7 @@ class DefaultSource extends RelationProvider {
val CACHING_NUM_KEY:String = "hbase.caching.num" val CACHING_NUM_KEY:String = "hbase.caching.num"
val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources" val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context" val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
/** /**
* Is given input from SparkSQL to construct a BaseRelation * Is given input from SparkSQL to construct a BaseRelation
@ -73,6 +75,7 @@ class DefaultSource extends RelationProvider {
val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000") val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "") val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true") val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
val batchingNum:Int = try { val batchingNum:Int = try {
batchingNumStr.toInt batchingNumStr.toInt
@ -95,7 +98,8 @@ class DefaultSource extends RelationProvider {
batchingNum.toInt, batchingNum.toInt,
cachingNum.toInt, cachingNum.toInt,
hbaseConfigResources, hbaseConfigResources,
useHBaseReources.equalsIgnoreCase("true"))(sqlContext) useHBaseReources.equalsIgnoreCase("true"),
usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext)
} }
/** /**
@ -161,7 +165,8 @@ class HBaseRelation (val tableName:String,
val batchingNum:Int, val batchingNum:Int,
val cachingNum:Int, val cachingNum:Int,
val configResources:String, val configResources:String,
val useHBaseContext:Boolean) ( val useHBaseContext:Boolean,
val usePushDownColumnFilter:Boolean) (
@transient val sqlContext:SQLContext) @transient val sqlContext:SQLContext)
extends BaseRelation with PrunedFilteredScan with Logging { extends BaseRelation with PrunedFilteredScan with Logging {
@ -217,49 +222,53 @@ class HBaseRelation (val tableName:String,
*/ */
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val columnFilterCollection = buildColumnFilterCollection(filters) val pushDownTuple = buildPushDownPredicatesResource(filters)
val pushDownRowKeyFilter = pushDownTuple._1
var pushDownDynamicLogicExpression = pushDownTuple._2
val valueArray = pushDownTuple._3
if (!usePushDownColumnFilter) {
pushDownDynamicLogicExpression = null
}
logDebug("pushDownRowKeyFilter: " + pushDownRowKeyFilter.ranges)
if (pushDownDynamicLogicExpression != null) {
logDebug("pushDownDynamicLogicExpression: " +
pushDownDynamicLogicExpression.toExpressionString)
}
logDebug("valueArray: " + valueArray.length)
val requiredQualifierDefinitionList =
new mutable.MutableList[SchemaQualifierDefinition]
val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition]
requiredColumns.foreach( c => { requiredColumns.foreach( c => {
val definition = schemaMappingDefinition.get(c) val definition = schemaMappingDefinition.get(c)
if (definition.columnFamilyBytes.length > 0) { requiredQualifierDefinitionList += definition
requiredQualifierDefinitionArray += definition
}
}) })
//Create a local variable so that scala doesn't have to //Create a local variable so that scala doesn't have to
// serialize the whole HBaseRelation Object // serialize the whole HBaseRelation Object
val serializableDefinitionMap = schemaMappingDefinition val serializableDefinitionMap = schemaMappingDefinition
//retain the information for unit testing checks //retain the information for unit testing checks
DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection, DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
requiredQualifierDefinitionArray) pushDownDynamicLogicExpression)
var resultRDD: RDD[Row] = null var resultRDD: RDD[Row] = null
if (columnFilterCollection != null) {
val pushDownFilterJava =
new SparkSQLPushDownFilter(
columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition))
val getList = new util.ArrayList[Get]() val getList = new util.ArrayList[Get]()
val rddList = new util.ArrayList[RDD[Row]]() val rddList = new util.ArrayList[RDD[Row]]()
val it = columnFilterCollection.columnFilterMap.iterator
while (it.hasNext) {
val e = it.next()
val columnDefinition = schemaMappingDefinition.get(e._1)
//check is a rowKey
if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) {
//add points to getList //add points to getList
e._2.points.foreach(p => { pushDownRowKeyFilter.points.foreach(p => {
val get = new Get(p) val get = new Get(p)
requiredQualifierDefinitionArray.foreach( d => requiredQualifierDefinitionList.foreach( d => {
get.addColumn(d.columnFamilyBytes, d.qualifierBytes)) if (d.columnFamilyBytes.length > 0)
get.addColumn(d.columnFamilyBytes, d.qualifierBytes)
})
getList.add(get) getList.add(get)
}) })
val rangeIt = e._2.ranges.iterator val rangeIt = pushDownRowKeyFilter.ranges.iterator
while (rangeIt.hasNext) { while (rangeIt.hasNext) {
val r = rangeIt.next() val r = rangeIt.next()
@ -267,10 +276,15 @@ class HBaseRelation (val tableName:String,
val scan = new Scan() val scan = new Scan()
scan.setBatch(batchingNum) scan.setBatch(batchingNum)
scan.setCaching(cachingNum) scan.setCaching(cachingNum)
requiredQualifierDefinitionArray.foreach( d => requiredQualifierDefinitionList.foreach( d =>
if (d.columnFamilyBytes.length > 0)
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) { if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
val pushDownFilterJava =
new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
valueArray, requiredQualifierDefinitionList)
scan.setFilter(pushDownFilterJava) scan.setFilter(pushDownFilterJava)
} }
@ -304,7 +318,6 @@ class HBaseRelation (val tableName:String,
//New Max Bytes //New Max Bytes
newArray(r.upperBound.length) = Byte.MaxValue newArray(r.upperBound.length) = Byte.MaxValue
scan.setStopRow(newArray) scan.setStopRow(newArray)
} else { } else {
//Here equalTo is false for Upper bound which is exclusive and //Here equalTo is false for Upper bound which is exclusive and
@ -320,8 +333,6 @@ class HBaseRelation (val tableName:String,
}) })
rddList.add(rdd) rddList.add(rdd)
} }
}
}
//If there is more then one RDD then we have to union them together //If there is more then one RDD then we have to union them together
for (i <- 0 until rddList.size()) { for (i <- 0 until rddList.size()) {
@ -333,7 +344,8 @@ class HBaseRelation (val tableName:String,
//If there are gets then we can get them from the driver and union that rdd in //If there are gets then we can get them from the driver and union that rdd in
// with the rest of the values. // with the rest of the values.
if (getList.size() > 0) { if (getList.size() > 0) {
val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration) val connection =
ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
try { try {
val table = connection.getTable(TableName.valueOf(tableName)) val table = connection.getTable(TableName.valueOf(tableName))
try { try {
@ -356,12 +368,12 @@ class HBaseRelation (val tableName:String,
connection.close() connection.close()
} }
} }
}
if (resultRDD == null) { if (resultRDD == null) {
val scan = new Scan() val scan = new Scan()
scan.setBatch(batchingNum) scan.setBatch(batchingNum)
scan.setCaching(cachingNum) scan.setCaching(cachingNum)
requiredQualifierDefinitionArray.foreach( d => requiredQualifierDefinitionList.foreach( d =>
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => { val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
@ -373,79 +385,135 @@ class HBaseRelation (val tableName:String,
resultRDD resultRDD
} }
/** def buildPushDownPredicatesResource(filters: Array[Filter]):
* Root recursive function that will loop over the filters provided by (RowKeyFilter, DynamicLogicExpression, Array[Array[Byte]]) = {
* SparkSQL. Some filters are AND or OR functions and contain additional filters var superRowKeyFilter:RowKeyFilter = null
* hence the need for recursion. val queryValueList = new mutable.MutableList[Array[Byte]]
* var superDynamicLogicExpression: DynamicLogicExpression = null
* @param filters Filters provided by SparkSQL.
* Filters are joined with the AND operater
* @return A ColumnFilterCollection whish is a consolidated construct to
* hold the high level filter information
*/
def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = {
var superCollection: ColumnFilterCollection = null
filters.foreach( f => { filters.foreach( f => {
val parentCollection = new ColumnFilterCollection val rowKeyFilter = new RowKeyFilter()
buildColumnFilterCollection(parentCollection, f) val logicExpression = transverseFilterTree(rowKeyFilter, queryValueList, f)
if (superCollection == null) if (superDynamicLogicExpression == null) {
superCollection = parentCollection superDynamicLogicExpression = logicExpression
else superRowKeyFilter = rowKeyFilter
superCollection.mergeIntersect(parentCollection) } else {
}) superDynamicLogicExpression =
superCollection new AndLogicExpression(superDynamicLogicExpression, logicExpression)
superRowKeyFilter.mergeIntersect(rowKeyFilter)
} }
/** })
* Recursive function that will work to convert Spark Filter
* objects to ColumnFilterCollection val queryValueArray = queryValueList.toArray
*
* @param parentFilterCollection Parent ColumnFilterCollection if (superRowKeyFilter == null) {
* @param filter Current given filter from SparkSQL superRowKeyFilter = new RowKeyFilter
*/ }
def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection,
filter:Filter): Unit = { (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
}
def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
valueArray:mutable.MutableList[Array[Byte]],
filter:Filter): DynamicLogicExpression = {
filter match { filter match {
case EqualTo(attr, value) => case EqualTo(attr, value) =>
parentFilterCollection.mergeUnion(attr, val columnDefinition = schemaMappingDefinition.get(attr)
new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr, if (columnDefinition != null) {
schemaMappingDefinition, value.toString))) if (columnDefinition.columnFamily.isEmpty) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString), null))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString)
valueArray += byteValue
}
new EqualLogicExpression(attr, valueArray.length - 1, false)
case LessThan(attr, value) => case LessThan(attr, value) =>
parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, val columnDefinition = schemaMappingDefinition.get(attr)
if (columnDefinition != null) {
if (columnDefinition.columnFamily.isEmpty) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString), false, schemaMappingDefinition, value.toString), false,
new Array[Byte](0), true))) new Array[Byte](0), true)))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString)
valueArray += byteValue
}
new LessThanLogicExpression(attr, valueArray.length - 1)
case GreaterThan(attr, value) => case GreaterThan(attr, value) =>
parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, val columnDefinition = schemaMappingDefinition.get(attr)
if (columnDefinition != null) {
if (columnDefinition.columnFamily.isEmpty) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString), false))) schemaMappingDefinition, value.toString), false)))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString)
valueArray += byteValue
}
new GreaterThanLogicExpression(attr, valueArray.length - 1)
case LessThanOrEqual(attr, value) => case LessThanOrEqual(attr, value) =>
parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, val columnDefinition = schemaMappingDefinition.get(attr)
if (columnDefinition != null) {
if (columnDefinition.columnFamily.isEmpty) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString), true, schemaMappingDefinition, value.toString), true,
new Array[Byte](0), true))) new Array[Byte](0), true)))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString)
valueArray += byteValue
}
new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
case GreaterThanOrEqual(attr, value) => case GreaterThanOrEqual(attr, value) =>
parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, val columnDefinition = schemaMappingDefinition.get(attr)
if (columnDefinition != null) {
if (columnDefinition.columnFamily.isEmpty) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString), true))) schemaMappingDefinition, value.toString), true)))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(attr,
schemaMappingDefinition, value.toString)
valueArray += byteValue
}
new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
case Or(left, right) => case Or(left, right) =>
buildColumnFilterCollection(parentFilterCollection, left) val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
val rightSideCollection = new ColumnFilterCollection val rightSideRowKeyFilter = new RowKeyFilter
buildColumnFilterCollection(rightSideCollection, right) val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
parentFilterCollection.mergeUnion(rightSideCollection)
parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
new OrLogicExpression(leftExpression, rightExpression)
case And(left, right) => case And(left, right) =>
buildColumnFilterCollection(parentFilterCollection, left)
val rightSideCollection = new ColumnFilterCollection val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
buildColumnFilterCollection(rightSideCollection, right) val rightSideRowKeyFilter = new RowKeyFilter
parentFilterCollection.mergeIntersect(rightSideCollection) val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
case _ => //nothing parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
new AndLogicExpression(leftExpression, rightExpression)
case IsNull(attr) =>
new IsNullLogicExpression(attr, false)
case IsNotNull(attr) =>
new IsNullLogicExpression(attr, true)
case _ =>
new PassThroughLogicExpression
} }
} }
} }
@ -472,7 +540,7 @@ case class SchemaQualifierDefinition(columnName:String,
else if (colType.equals("DOUBLE")) DoubleType else if (colType.equals("DOUBLE")) DoubleType
else if (colType.equals("STRING")) StringType else if (colType.equals("STRING")) StringType
else if (colType.equals("TIMESTAMP")) TimestampType else if (colType.equals("TIMESTAMP")) TimestampType
else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale) else if (colType.equals("DECIMAL")) StringType
else throw new IllegalArgumentException("Unsupported column type :" + colType) else throw new IllegalArgumentException("Unsupported column type :" + colType)
} }
@ -524,11 +592,11 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
isLowerBoundEqualTo = if (lowerBoundCompare == 0) isLowerBoundEqualTo = if (lowerBoundCompare == 0)
isLowerBoundEqualTo || other.isLowerBoundEqualTo isLowerBoundEqualTo || other.isLowerBoundEqualTo
else isLowerBoundEqualTo else if (lowerBoundCompare < 0) isLowerBoundEqualTo else other.isLowerBoundEqualTo
isUpperBoundEqualTo = if (upperBoundCompare == 0) isUpperBoundEqualTo = if (upperBoundCompare == 0)
isUpperBoundEqualTo || other.isUpperBoundEqualTo isUpperBoundEqualTo || other.isUpperBoundEqualTo
else isUpperBoundEqualTo else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo
} }
/** /**
@ -551,14 +619,15 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
* @param other Other scan object * @param other Other scan object
* @return True is overlap false is not overlap * @return True is overlap false is not overlap
*/ */
def doesOverLap(other:ScanRange): Boolean = { def getOverLapScanRange(other:ScanRange): ScanRange = {
var leftRange:ScanRange = null var leftRange:ScanRange = null
var rightRange:ScanRange = null var rightRange:ScanRange = null
//First identify the Left range //First identify the Left range
// Also lower bound can't be null // Also lower bound can't be null
if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) { if (compareRange(lowerBound, other.lowerBound) < 0 ||
compareRange(upperBound, other.upperBound) < 0) {
leftRange = this leftRange = this
rightRange = other rightRange = other
} else { } else {
@ -568,8 +637,12 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
//Then see if leftRange goes to null or if leftRange.upperBound //Then see if leftRange goes to null or if leftRange.upperBound
// upper is greater or equals to rightRange.lowerBound // upper is greater or equals to rightRange.lowerBound
leftRange.upperBound == null || if (leftRange.upperBound == null ||
Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0 Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0) {
new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo, rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
} else {
null
}
} }
/** /**
@ -586,9 +659,25 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
else if (left != null && right == null) -1 else if (left != null && right == null) -1
else Bytes.compareTo(left, right) else Bytes.compareTo(left, right)
} }
/**
*
* @return
*/
def containsPoint(point:Array[Byte]): Boolean = {
val lowerCompare = compareRange(point, lowerBound)
val upperCompare = compareRange(point, upperBound)
((isLowerBoundEqualTo && lowerCompare >= 0) ||
(!isLowerBoundEqualTo && lowerCompare > 0)) &&
((isUpperBoundEqualTo && upperCompare <= 0) ||
(!isUpperBoundEqualTo && upperCompare < 0))
}
override def toString:String = { override def toString:String = {
"ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," + "ScanRange:(upperBound:" + Bytes.toString(upperBound) +
Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")" ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" +
Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")"
} }
} }
@ -663,7 +752,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
other.ranges.foreach( otherR => { other.ranges.foreach( otherR => {
var doesOverLap = false var doesOverLap = false
ranges.foreach{ r => ranges.foreach{ r =>
if (r.doesOverLap(otherR)) { if (r.getOverLapScanRange(otherR) != null) {
r.mergeUnion(otherR) r.mergeUnion(otherR)
doesOverLap = true doesOverLap = true
}} }}
@ -692,7 +781,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
other.ranges.foreach( otherR => { other.ranges.foreach( otherR => {
ranges.foreach( r => { ranges.foreach( r => {
if (r.doesOverLap(otherR)) { if (r.getOverLapScanRange(otherR) != null) {
r.mergeIntersect(otherR) r.mergeIntersect(otherR)
survivingRanges += r survivingRanges += r
} }
@ -842,7 +931,8 @@ object DefaultSourceStaticUtils {
getFreshByteRange(bytes, 0, bytes.length) getFreshByteRange(bytes, 0, bytes.length)
} }
def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int): PositionedByteRange = { def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int):
PositionedByteRange = {
byteRange.get().set(bytes).setLength(length).setOffset(offset) byteRange.get().set(bytes).setLength(length).setOffset(offset)
} }
@ -856,14 +946,13 @@ object DefaultSourceStaticUtils {
* This method is to populate the lastFiveExecutionRules for unit test perposes * This method is to populate the lastFiveExecutionRules for unit test perposes
* This method is not thread safe. * This method is not thread safe.
* *
* @param columnFilterCollection The filters in the last job * @param rowKeyFilter The rowKey Filter logic used in the last query
* @param requiredQualifierDefinitionArray The required columns in the last job * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
*/ */
def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection, def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
requiredQualifierDefinitionArray: dynamicLogicExpression: DynamicLogicExpression):Unit = {
mutable.MutableList[SchemaQualifierDefinition]):Unit = {
lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting( lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
columnFilterCollection, requiredQualifierDefinitionArray)) rowKeyFilter, dynamicLogicExpression))
while (lastFiveExecutionRules.size() > 5) { while (lastFiveExecutionRules.size() > 5) {
lastFiveExecutionRules.poll() lastFiveExecutionRules.poll()
} }
@ -977,6 +1066,162 @@ object DefaultSourceStaticUtils {
} }
} }
class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection, /**
val requiredQualifierDefinitionArray: * Contains information related to a filters for a given column.
mutable.MutableList[SchemaQualifierDefinition]) * This can contain many ranges or points.
*
* @param currentPoint the initial point when the filter is created
* @param currentRange the initial scanRange when the filter is created
*/
class RowKeyFilter (currentPoint:Array[Byte] = null,
currentRange:ScanRange =
new ScanRange(null, true, new Array[Byte](0), true),
var points:mutable.MutableList[Array[Byte]] =
new mutable.MutableList[Array[Byte]](),
var ranges:mutable.MutableList[ScanRange] =
new mutable.MutableList[ScanRange]() ) extends Serializable {
//Collection of ranges
if (currentRange != null ) ranges.+=(currentRange)
//Collection of points
if (currentPoint != null) points.+=(currentPoint)
/**
* This will validate a give value through the filter's points and/or ranges
* the result will be if the value passed the filter
*
* @param value Value to be validated
* @param valueOffSet The offset of the value
* @param valueLength The length of the value
* @return True is the value passes the filter false if not
*/
def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
var result = false
points.foreach( p => {
if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
result = true
}
})
ranges.foreach( r => {
val upperBoundPass = r.upperBound == null ||
(r.isUpperBoundEqualTo &&
Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
value, valueOffSet, valueLength) >= 0) ||
(!r.isUpperBoundEqualTo &&
Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
value, valueOffSet, valueLength) > 0)
val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
(r.isLowerBoundEqualTo &&
Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
value, valueOffSet, valueLength) <= 0) ||
(!r.isLowerBoundEqualTo &&
Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
value, valueOffSet, valueLength) < 0)
result = result || (upperBoundPass && lowerBoundPass)
})
result
}
/**
* This will allow us to merge filter logic that is joined to the existing filter
* through a OR operator
*
* @param other Filter to merge
*/
def mergeUnion(other:RowKeyFilter): Unit = {
other.points.foreach( p => points += p)
other.ranges.foreach( otherR => {
var doesOverLap = false
ranges.foreach{ r =>
if (r.getOverLapScanRange(otherR) != null) {
r.mergeUnion(otherR)
doesOverLap = true
}}
if (!doesOverLap) ranges.+=(otherR)
})
}
/**
* This will allow us to merge filter logic that is joined to the existing filter
* through a AND operator
*
* @param other Filter to merge
*/
def mergeIntersect(other:RowKeyFilter): Unit = {
val survivingPoints = new mutable.MutableList[Array[Byte]]()
val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
if (points == null || points.length == 0) {
other.points.foreach( otherP => {
didntSurviveFirstPassPoints += otherP
})
} else {
points.foreach(p => {
if (other.points.length == 0) {
didntSurviveFirstPassPoints += p
} else {
other.points.foreach(otherP => {
if (Bytes.equals(p, otherP)) {
survivingPoints += p
} else {
didntSurviveFirstPassPoints += p
}
})
}
})
}
val survivingRanges = new mutable.MutableList[ScanRange]()
if (ranges.length == 0) {
didntSurviveFirstPassPoints.foreach(p => {
survivingPoints += p
})
} else {
ranges.foreach(r => {
other.ranges.foreach(otherR => {
val overLapScanRange = r.getOverLapScanRange(otherR)
if (overLapScanRange != null) {
survivingRanges += overLapScanRange
}
})
didntSurviveFirstPassPoints.foreach(p => {
if (r.containsPoint(p)) {
survivingPoints += p
}
})
})
}
points = survivingPoints
ranges = survivingRanges
}
override def toString:String = {
val strBuilder = new StringBuilder
strBuilder.append("(points:(")
var isFirst = true
points.foreach( p => {
if (isFirst) isFirst = false
else strBuilder.append(",")
strBuilder.append(Bytes.toString(p))
})
strBuilder.append("),ranges:")
isFirst = true
ranges.foreach( r => {
if (isFirst) isFirst = false
else strBuilder.append(",")
strBuilder.append(r)
})
strBuilder.append("))")
strBuilder.toString()
}
}
class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter,
val dynamicLogicExpression: DynamicLogicExpression)

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.util
import org.apache.hadoop.hbase.util.Bytes
/**
* Dynamic logic for SQL push down logic there is an instance for most
* common operations and a pass through for other operations not covered here
*
* Logic can be nested with And or Or operators.
*
* A logic tree can be written out as a string and reconstructed from that string
*
*/
trait DynamicLogicExpression {
def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean
def toExpressionString: String = {
val strBuilder = new StringBuilder
appendToExpression(strBuilder)
strBuilder.toString()
}
def appendToExpression(strBuilder:StringBuilder)
}
class AndLogicExpression (val leftExpression:DynamicLogicExpression,
val rightExpression:DynamicLogicExpression)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) &&
rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append("( ")
strBuilder.append(leftExpression.toExpressionString)
strBuilder.append(" AND ")
strBuilder.append(rightExpression.toExpressionString)
strBuilder.append(" )")
}
}
class OrLogicExpression (val leftExpression:DynamicLogicExpression,
val rightExpression:DynamicLogicExpression)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) ||
rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append("( ")
strBuilder.append(leftExpression.toExpressionString)
strBuilder.append(" OR ")
strBuilder.append(rightExpression.toExpressionString)
strBuilder.append(" )")
}
}
class EqualLogicExpression (val columnName:String,
val valueFromQueryIndex:Int,
val isNot:Boolean) extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
currentRowValue != null &&
Bytes.equals(valueFromQuery,
0, valueFromQuery.length, currentRowValue.bytes,
currentRowValue.offset, currentRowValue.length) != isNot
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
val command = if (isNot) "!=" else "=="
strBuilder.append(columnName + " " + command + " " + valueFromQueryIndex)
}
}
class IsNullLogicExpression (val columnName:String,
val isNot:Boolean) extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
(currentRowValue == null) != isNot
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
val command = if (isNot) "isNotNull" else "isNull"
strBuilder.append(columnName + " " + command)
}
}
class GreaterThanLogicExpression (val columnName:String,
val valueFromQueryIndex:Int)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
currentRowValue != null &&
Bytes.compareTo(currentRowValue.bytes,
currentRowValue.offset, currentRowValue.length, valueFromQuery,
0, valueFromQuery.length) > 0
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append(columnName + " > " + valueFromQueryIndex)
}
}
class GreaterThanOrEqualLogicExpression (val columnName:String,
val valueFromQueryIndex:Int)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
currentRowValue != null &&
Bytes.compareTo(currentRowValue.bytes,
currentRowValue.offset, currentRowValue.length, valueFromQuery,
0, valueFromQuery.length) >= 0
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append(columnName + " >= " + valueFromQueryIndex)
}
}
class LessThanLogicExpression (val columnName:String,
val valueFromQueryIndex:Int)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
currentRowValue != null &&
Bytes.compareTo(currentRowValue.bytes,
currentRowValue.offset, currentRowValue.length, valueFromQuery,
0, valueFromQuery.length) < 0
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append(columnName + " < " + valueFromQueryIndex)
}
}
class LessThanOrEqualLogicExpression (val columnName:String,
val valueFromQueryIndex:Int)
extends DynamicLogicExpression{
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
val currentRowValue = columnToCurrentRowValueMap.get(columnName)
val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
currentRowValue != null &&
Bytes.compareTo(currentRowValue.bytes,
currentRowValue.offset, currentRowValue.length, valueFromQuery,
0, valueFromQuery.length) <= 0
}
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append(columnName + " <= " + valueFromQueryIndex)
}
}
class PassThroughLogicExpression() extends DynamicLogicExpression {
override def execute(columnToCurrentRowValueMap:
util.HashMap[String, ByteArrayComparable],
valueFromQueryValueArray: Array[Array[Byte]]): Boolean = true
override def appendToExpression(strBuilder: StringBuilder): Unit = {
strBuilder.append("Pass")
}
}
object DynamicLogicExpressionBuilder {
def build(expressionString:String): DynamicLogicExpression = {
val expressionAndOffset = build(expressionString.split(' '), 0)
expressionAndOffset._1
}
private def build(expressionArray:Array[String],
offSet:Int): (DynamicLogicExpression, Int) = {
if (expressionArray(offSet).equals("(")) {
val left = build(expressionArray, offSet + 1)
val right = build(expressionArray, left._2 + 1)
if (expressionArray(left._2).equals("AND")) {
(new AndLogicExpression(left._1, right._1), right._2 + 1)
} else if (expressionArray(left._2).equals("OR")) {
(new OrLogicExpression(left._1, right._1), right._2 + 1)
} else {
throw new Throwable("Unknown gate:" + expressionArray(left._2))
}
} else {
val command = expressionArray(offSet + 1)
if (command.equals("<")) {
(new LessThanLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt), offSet + 3)
} else if (command.equals("<=")) {
(new LessThanOrEqualLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt), offSet + 3)
} else if (command.equals(">")) {
(new GreaterThanLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt), offSet + 3)
} else if (command.equals(">=")) {
(new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt), offSet + 3)
} else if (command.equals("==")) {
(new EqualLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt, false), offSet + 3)
} else if (command.equals("!=")) {
(new EqualLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt, true), offSet + 3)
} else if (command.equals("isNull")) {
(new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
} else if (command.equals("isNotNull")) {
(new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
} else if (command.equals("Pass")) {
(new PassThroughLogicExpression, offSet + 2)
} else {
throw new Throwable("Unknown logic command:" + command)
}
}
}
}

View File

@ -29,7 +29,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null @transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
val tableName = "t1" val t1TableName = "t1"
val t2TableName = "t2"
val columnFamily = "c" val columnFamily = "c"
var sqlContext:SQLContext = null var sqlContext:SQLContext = null
@ -41,50 +42,94 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
logInfo(" - minicluster started") logInfo(" - minicluster started")
try try
TEST_UTIL.deleteTable(TableName.valueOf(tableName)) TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
catch { catch {
case e: Exception => logInfo(" - no table " + tableName + " found") case e: Exception => logInfo(" - no table " + t1TableName + " found")
} }
logInfo(" - creating table " + tableName) try
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}
logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table") logInfo(" - created table")
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
val table = connection.getTable(TableName.valueOf("t1")) try {
val t1Table = connection.getTable(TableName.valueOf("t1"))
try { try {
var put = new Put(Bytes.toBytes("get1")) var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
table.put(put) t1Table.put(put)
put = new Put(Bytes.toBytes("get2")) put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
table.put(put) t1Table.put(put)
put = new Put(Bytes.toBytes("get3")) put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
table.put(put) t1Table.put(put)
put = new Put(Bytes.toBytes("get4")) put = new Put(Bytes.toBytes("get4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
table.put(put) t1Table.put(put)
put = new Put(Bytes.toBytes("get5")) put = new Put(Bytes.toBytes("get5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
table.put(put) t1Table.put(put)
} finally {
t1Table.close()
}
val t2Table = connection.getTable(TableName.valueOf("t2"))
try {
var put = new Put(Bytes.toBytes(1))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
t2Table.put(put)
put = new Put(Bytes.toBytes(2))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
t2Table.put(put)
put = new Put(Bytes.toBytes(3))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t2Table.put(put)
put = new Put(Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
t2Table.put(put)
put = new Put(Bytes.toBytes(5))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t2Table.put(put)
} finally {
t2Table.close()
}
} finally { } finally {
table.close()
connection.close() connection.close()
} }
@ -98,23 +143,36 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
"hbase.batching.num" -> "100", "hbase.batching.num" -> "100",
"cachingNum" -> "100")) "cachingNum" -> "100"))
df.registerTempTable("hbaseTmp") df.registerTempTable("hbaseTable1")
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" ->
"KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
"hbase.table" -> "t2",
"hbase.batching.num" -> "100",
"cachingNum" -> "100"))
df.registerTempTable("hbaseTable2")
} }
override def afterAll() { override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(tableName)) TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
logInfo("shuting down minicluster") logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster() TEST_UTIL.shutdownMiniCluster()
sc.stop() sc.stop()
} }
override def beforeEach(): Unit = {
DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
}
/** /**
* A example of query three fields and also only using rowkey points for the filter * A example of query three fields and also only using rowkey points for the filter
*/ */
test("Test rowKey point only rowKey query") { test("Test rowKey point only rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10) "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
@ -122,23 +180,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3) assert(results.length == 3)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) assert(executionRules.dynamicLogicExpression.toExpressionString.
val keyFieldFilter = equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
assert(keyFieldFilter.ranges.length == 0)
assert(keyFieldFilter.points.length == 3)
assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
assert(Bytes.toString(keyFieldFilter.points(1)).equals("get2"))
assert(Bytes.toString(keyFieldFilter.points(2)).equals("get3"))
assert(executionRules.requiredQualifierDefinitionArray.length == 2) assert(executionRules.rowKeyFilter.points.size == 3)
assert(executionRules.rowKeyFilter.ranges.size == 0)
} }
/** /**
* A example of query three fields and also only using cell points for the filter * A example of query three fields and also only using cell points for the filter
*/ */
test("Test cell point only rowKey query") { test("Test cell point only rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10) "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
@ -146,17 +199,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3) assert(results.length == 3)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 2) assert(executionRules.dynamicLogicExpression.toExpressionString.
val bFieldFilter = equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
assert(bFieldFilter.ranges.length == 0)
assert(bFieldFilter.points.length == 2)
val aFieldFilter =
executionRules.columnFilterCollection.columnFilterMap.get("A_FIELD").get
assert(aFieldFilter.ranges.length == 0)
assert(aFieldFilter.points.length == 1)
assert(executionRules.requiredQualifierDefinitionArray.length == 2)
} }
/** /**
@ -164,7 +208,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* Also an example of less then and greater then * Also an example of less then and greater then
*/ */
test("Test two range rowKey query") { test("Test two range rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10) "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
@ -172,13 +216,88 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3) assert(results.length == 3)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) assert(executionRules.dynamicLogicExpression.toExpressionString.
val keyFieldFilter = equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
assert(keyFieldFilter.ranges.length == 2)
assert(keyFieldFilter.points.length == 0)
assert(executionRules.requiredQualifierDefinitionArray.length == 2) assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
assert(scanRange1.isLowerBoundEqualTo)
assert(!scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
assert(scanRange2.upperBound == null)
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
}
/**
* A example of a OR merge between to ranges the result is one range
* Also an example of less then and greater then
*
* This example makes sure the code works for a int rowKey
*/
test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
"( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
assert(results.length == 5)
}
/**
* A example of a OR merge between to ranges the result is two ranges
* Also an example of less then and greater then
*
* This example makes sure the code works for a int rowKey
*/
test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
"( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
assert(scanRange1.isLowerBoundEqualTo)
assert(!scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes(4)))
assert(scanRange2.upperBound == null)
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
assert(results.length == 2)
} }
/** /**
@ -186,7 +305,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* Also an example of less then and equal to and greater then and equal to * Also an example of less then and equal to and greater then and equal to
*/ */
test("Test one combined range rowKey query") { test("Test one combined range rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10) "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
@ -194,13 +313,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 2) assert(results.length == 2)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) assert(executionRules.dynamicLogicExpression.toExpressionString.
val keyFieldFilter = equals("( KEY_FIELD <= 0 AND KEY_FIELD >= 1 )"))
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
assert(keyFieldFilter.ranges.length == 1) assert(executionRules.rowKeyFilter.points.size == 0)
assert(keyFieldFilter.points.length == 0) assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
assert(executionRules.requiredQualifierDefinitionArray.length == 2)
} }
/** /**
@ -211,10 +335,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val results = df.select("KEY_FIELD").take(10) val results = df.select("KEY_FIELD").take(10)
assert(results.length == 5) assert(results.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.columnFilterCollection == null)
assert(executionRules.requiredQualifierDefinitionArray.length == 0) assert(executionRules.dynamicLogicExpression == null)
} }
@ -223,38 +346,28 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* rowKey and the a column * rowKey and the a column
*/ */
test("Test SQL point and range combo") { test("Test SQL point and range combo") {
val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
"( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 1)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
assert(results.length == 3) assert(results.length == 3)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
val keyFieldFilter =
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
assert(keyFieldFilter.ranges.length == 1)
assert(keyFieldFilter.ranges.head.upperBound == null)
assert(Bytes.toString(keyFieldFilter.ranges.head.lowerBound).equals("get3"))
assert(keyFieldFilter.ranges.head.isLowerBoundEqualTo)
assert(keyFieldFilter.points.length == 1)
assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
val bFieldFilter =
executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
assert(bFieldFilter.ranges.length == 1)
assert(bFieldFilter.ranges.head.lowerBound.length == 0)
assert(Bytes.toString(bFieldFilter.ranges.head.upperBound).equals("3"))
assert(!bFieldFilter.ranges.head.isUpperBoundEqualTo)
assert(bFieldFilter.points.length == 1)
assert(Bytes.toString(bFieldFilter.points.head).equals("8"))
assert(executionRules.requiredQualifierDefinitionArray.length == 1)
assert(executionRules.requiredQualifierDefinitionArray.head.columnName.equals("B_FIELD"))
assert(executionRules.requiredQualifierDefinitionArray.head.columnFamily.equals("c"))
assert(executionRules.requiredQualifierDefinitionArray.head.qualifier.equals("b"))
} }
/** /**
@ -262,46 +375,145 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
*/ */
test("Test two complete range non merge rowKey query") { test("Test two complete range non merge rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " + "WHERE " +
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" + "( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10) "( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4) assert(results.length == 4)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
"( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) assert(executionRules.rowKeyFilter.points.size == 0)
val keyFieldFilter = assert(executionRules.rowKeyFilter.ranges.size == 2)
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
assert(keyFieldFilter.ranges.length == 2) val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(keyFieldFilter.points.length == 0) assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
assert(executionRules.requiredQualifierDefinitionArray.length == 2)
} }
/** /**
* A complex query with two complex ranges that does merge into one * A complex query with two complex ranges that does merge into one
*/ */
test("Test two complete range merge rowKey query") { test("Test two complete range merge rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " + "WHERE " +
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10) "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
"( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
}
test("Test OR logic with a one RowKey and One column") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
"( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 5) assert(results.length == 5)
assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) assert(executionRules.dynamicLogicExpression.toExpressionString.
val keyFieldFilter = equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get "( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
assert(keyFieldFilter.ranges.length == 1)
assert(keyFieldFilter.points.length == 0)
assert(executionRules.requiredQualifierDefinitionArray.length == 2) assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
//This is the main test for 14406
//Because the key is joined through a or with a qualifier
//There is no filter on the rowKey
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
} }
test("test table that doesn't exist") { test("Test OR logic with a two columns") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
"( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 5)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
"( A_FIELD > 2 OR B_FIELD < 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
test("Test single RowKey Or Column logic") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
test("Test table that doesn't exist") {
intercept[TableNotFoundException] { intercept[TableNotFoundException] {
df = sqlContext.load("org.apache.hadoop.hbase.spark", df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" -> Map("hbase.columns.mapping" ->
@ -315,6 +527,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count() "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
} }
DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
} }
test("Test table with column that doesn't exist") { test("Test table with column that doesn't exist") {
@ -325,11 +538,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseFactColumnTmp") df.registerTempTable("hbaseFactColumnTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseFactColumnTmp") val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
assert(result.count() == 5) assert(result.count() == 5)
val localResult = result.take(5) val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
} }
test("Test table with INT column") { test("Test table with INT column") {
@ -343,11 +559,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+ val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
" where I_FIELD > 4 and I_FIELD < 10") " where I_FIELD > 4 and I_FIELD < 10")
assert(result.count() == 2) val localResult = result.take(5)
val localResult = result.take(3)
assert(localResult.length == 2)
assert(localResult(0).getInt(2) == 8) assert(localResult(0).getInt(2) == 8)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( I_FIELD > 0 AND I_FIELD < 1 )"))
} }
test("Test table with INT column defined at wrong type") { test("Test table with INT column defined at wrong type") {
@ -358,11 +578,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp") df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
assert(result.count() == 5) val localResult = result.take(10)
assert(localResult.length == 5)
val localResult = result.take(5) val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
assert(localResult(0).getString(2).length == 4) assert(localResult(0).getString(2).length == 4)
assert(localResult(0).getString(2).charAt(0).toByte == 0) assert(localResult(0).getString(2).charAt(0).toByte == 0)
@ -380,9 +603,13 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseBadTmp") df.registerTempTable("hbaseBadTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseBadTmp") val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseBadTmp")
val localResult = result.take(5) val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
result.take(5)
} }
} }
@ -396,11 +623,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp") df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
assert(result.count() == 5) val localResult = result.take(10)
assert(localResult.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
val localResult = result.take(5)
} }
} }
@ -413,11 +644,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp") df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
assert(result.count() == 5) assert(result.count() == 5)
val localResult = result.take(5) val localResult = result.take(5)
localResult.length
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
} }
} }
@ -430,11 +668,16 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp") df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " +
"I_FIELD FROM hbaseIntWrongTypeTmp")
val localResult = result.take(10)
assert(localResult.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
assert(result.count() == 5)
val localResult = result.take(5)
} }
} }
@ -448,9 +691,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp") val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
assert(result.count() == 5) val localResult = result.take(10)
assert(localResult.length == 5)
val localResult = result.take(5)
assert(localResult(0).getString(2) == null) assert(localResult(0).getString(2) == null)
assert(localResult(1).getString(2) == "FOO") assert(localResult(1).getString(2) == "FOO")
@ -458,5 +700,27 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(localResult(3).getString(2) == "BAR") assert(localResult(3).getString(2) == "BAR")
assert(localResult(4).getString(2) == null) assert(localResult(4).getString(2) == null)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
}
test("Test with column logic disabled") {
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" ->
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
"hbase.table" -> "t1",
"hbase.push.down.column.filter" -> "false"))
df.registerTempTable("hbaseNoPushDownTmp")
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
"WHERE " +
"(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 2)
assert(executionRules.dynamicLogicExpression == null)
} }
} }

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.util
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.Logging
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class DynamicLogicExpressionSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
test("Basic And Test") {
val leftLogic = new LessThanLogicExpression("Col1", 0)
val rightLogic = new GreaterThanLogicExpression("Col1", 1)
val andLogic = new AndLogicExpression(leftLogic, rightLogic)
val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
val valueFromQueryValueArray = new Array[Array[Byte]](2)
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
val expressionString = andLogic.toExpressionString
assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
}
test("Basic OR Test") {
val leftLogic = new LessThanLogicExpression("Col1", 0)
val rightLogic = new GreaterThanLogicExpression("Col1", 1)
val OrLogic = new OrLogicExpression(leftLogic, rightLogic)
val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
val valueFromQueryValueArray = new Array[Array[Byte]](2)
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(!OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
val expressionString = OrLogic.toExpressionString
assert(expressionString.equals("( Col1 < 0 OR Col1 > 1 )"))
val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(5)
assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(15)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
valueFromQueryValueArray(1) = Bytes.toBytes(10)
assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
}
test("Basic Command Test") {
val greaterLogic = new GreaterThanLogicExpression("Col1", 0)
val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0)
val lessLogic = new LessThanLogicExpression("Col1", 0)
val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0)
val equalLogic = new EqualLogicExpression("Col1", 0, false)
val notEqualLogic = new EqualLogicExpression("Col1", 0, true)
val passThrough = new PassThroughLogicExpression
val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
val valueFromQueryValueArray = new Array[Array[Byte]](1)
//great than
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(20)
assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
//great than and equal
valueFromQueryValueArray(0) = Bytes.toBytes(5)
assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(20)
assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
valueFromQueryValueArray))
//less than
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(5)
assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
//less than and equal
valueFromQueryValueArray(0) = Bytes.toBytes(20)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(20)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
//equal too
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(5)
assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
//not equal too
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(5)
assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
//pass through
valueFromQueryValueArray(0) = Bytes.toBytes(10)
assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
valueFromQueryValueArray(0) = Bytes.toBytes(5)
assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
}
}

View File

@ -37,6 +37,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val columnFamily = "c" val columnFamily = "c"
override def beforeAll() { override def beforeAll() {
TEST_UTIL.startMiniCluster() TEST_UTIL.startMiniCluster()
logInfo(" - minicluster started") logInfo(" - minicluster started")
@ -75,6 +76,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
(Bytes.toBytes("5"), (Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
var isFinished = false
val hbaseContext = new HBaseContext(sc, config) val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200)) val ssc = new StreamingContext(sc, Milliseconds(200))
@ -93,9 +96,19 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
put put
}) })
dStream.foreachRDD(rdd => {
if (rdd.count() == 0) {
isFinished = true
}
})
ssc.start() ssc.start()
ssc.awaitTerminationOrTimeout(1000) while (!isFinished) {
Thread.sleep(100)
}
ssc.stop(true, true)
val connection = ConnectionFactory.createConnection(config) val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1")) val table = connection.getTable(TableName.valueOf("t1"))