From e95358a7fc3f554dcbb351c8b7295cafc01e8c23 Mon Sep 17 00:00:00 2001 From: Ted Malaska Date: Mon, 7 Sep 2015 09:19:52 -0500 Subject: [PATCH] HBASE-14181 Add Spark DataFrame DataSource to HBase-Spark Module Signed-off-by: Sean Busbey --- .../protobuf/generated/FilterProtos.java | 1825 ++++++++++++++++- hbase-protocol/src/main/protobuf/Filter.proto | 13 + hbase-spark/pom.xml | 12 + .../hbase/spark/SparkSQLPushDownFilter.java | 186 ++ .../ColumnFamilyQualifierMapKeyWrapper.scala | 73 + .../hadoop/hbase/spark/DefaultSource.scala | 982 +++++++++ .../hadoop/hbase/spark/HBaseContext.scala | 6 + .../hbase/spark/DefaultSourceSuite.scala | 462 +++++ 8 files changed, 3556 insertions(+), 3 deletions(-) create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala create mode 100644 hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java index e5583712a9f..fe18cae374b 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java @@ -17300,6 +17300,1797 @@ public final class FilterProtos { // @@protoc_insertion_point(class_scope:hbase.pb.MultiRowRangeFilter) } + public interface SQLPredicatePushDownColumnFilterOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes column_family = 1; + /** + * required bytes column_family = 1; + */ + boolean hasColumnFamily(); + /** + * required bytes column_family = 1; + */ + com.google.protobuf.ByteString getColumnFamily(); + + // required bytes qualifier = 2; + /** + * required bytes qualifier = 2; + */ + boolean hasQualifier(); + /** + * required bytes qualifier = 2; + */ + com.google.protobuf.ByteString getQualifier(); + + // repeated bytes get_point_list = 3; + /** + * repeated bytes get_point_list = 3; + */ + java.util.List getGetPointListList(); + /** + * repeated bytes get_point_list = 3; + */ + int getGetPointListCount(); + /** + * repeated bytes get_point_list = 3; + */ + com.google.protobuf.ByteString getGetPointList(int index); + + // repeated .hbase.pb.RowRange range_list = 4; + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + java.util.List + getRangeListList(); + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index); + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + int getRangeListCount(); + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + java.util.List + getRangeListOrBuilderList(); + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder( + int index); + } + /** + * Protobuf type {@code hbase.pb.SQLPredicatePushDownColumnFilter} + */ + public static final class SQLPredicatePushDownColumnFilter extends + com.google.protobuf.GeneratedMessage + implements SQLPredicatePushDownColumnFilterOrBuilder { + // Use SQLPredicatePushDownColumnFilter.newBuilder() to construct. + private SQLPredicatePushDownColumnFilter(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SQLPredicatePushDownColumnFilter(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SQLPredicatePushDownColumnFilter defaultInstance; + public static SQLPredicatePushDownColumnFilter getDefaultInstance() { + return defaultInstance; + } + + public SQLPredicatePushDownColumnFilter getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SQLPredicatePushDownColumnFilter( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + columnFamily_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + qualifier_ = input.readBytes(); + break; + } + case 26: { + if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + getPointList_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000004; + } + getPointList_.add(input.readBytes()); + break; + } + case 34: { + if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + rangeList_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000008; + } + rangeList_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + getPointList_ = java.util.Collections.unmodifiableList(getPointList_); + } + if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + rangeList_ = java.util.Collections.unmodifiableList(rangeList_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public SQLPredicatePushDownColumnFilter parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SQLPredicatePushDownColumnFilter(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required bytes column_family = 1; + public static final int COLUMN_FAMILY_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString columnFamily_; + /** + * required bytes column_family = 1; + */ + public boolean hasColumnFamily() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required bytes column_family = 1; + */ + public com.google.protobuf.ByteString getColumnFamily() { + return columnFamily_; + } + + // required bytes qualifier = 2; + public static final int QUALIFIER_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString qualifier_; + /** + * required bytes qualifier = 2; + */ + public boolean hasQualifier() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required bytes qualifier = 2; + */ + public com.google.protobuf.ByteString getQualifier() { + return qualifier_; + } + + // repeated bytes get_point_list = 3; + public static final int GET_POINT_LIST_FIELD_NUMBER = 3; + private java.util.List getPointList_; + /** + * repeated bytes get_point_list = 3; + */ + public java.util.List + getGetPointListList() { + return getPointList_; + } + /** + * repeated bytes get_point_list = 3; + */ + public int getGetPointListCount() { + return getPointList_.size(); + } + /** + * repeated bytes get_point_list = 3; + */ + public com.google.protobuf.ByteString getGetPointList(int index) { + return getPointList_.get(index); + } + + // repeated .hbase.pb.RowRange range_list = 4; + public static final int RANGE_LIST_FIELD_NUMBER = 4; + private java.util.List rangeList_; + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public java.util.List getRangeListList() { + return rangeList_; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public java.util.List + getRangeListOrBuilderList() { + return rangeList_; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public int getRangeListCount() { + return rangeList_.size(); + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index) { + return rangeList_.get(index); + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder( + int index) { + return rangeList_.get(index); + } + + private void initFields() { + columnFamily_ = com.google.protobuf.ByteString.EMPTY; + qualifier_ = com.google.protobuf.ByteString.EMPTY; + getPointList_ = java.util.Collections.emptyList(); + rangeList_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasColumnFamily()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasQualifier()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, columnFamily_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, qualifier_); + } + for (int i = 0; i < getPointList_.size(); i++) { + output.writeBytes(3, getPointList_.get(i)); + } + for (int i = 0; i < rangeList_.size(); i++) { + output.writeMessage(4, rangeList_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, columnFamily_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, qualifier_); + } + { + int dataSize = 0; + for (int i = 0; i < getPointList_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(getPointList_.get(i)); + } + size += dataSize; + size += 1 * getGetPointListList().size(); + } + for (int i = 0; i < rangeList_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(4, rangeList_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter other = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) obj; + + boolean result = true; + result = result && (hasColumnFamily() == other.hasColumnFamily()); + if (hasColumnFamily()) { + result = result && getColumnFamily() + .equals(other.getColumnFamily()); + } + result = result && (hasQualifier() == other.hasQualifier()); + if (hasQualifier()) { + result = result && getQualifier() + .equals(other.getQualifier()); + } + result = result && getGetPointListList() + .equals(other.getGetPointListList()); + result = result && getRangeListList() + .equals(other.getRangeListList()); + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasColumnFamily()) { + hash = (37 * hash) + COLUMN_FAMILY_FIELD_NUMBER; + hash = (53 * hash) + getColumnFamily().hashCode(); + } + if (hasQualifier()) { + hash = (37 * hash) + QUALIFIER_FIELD_NUMBER; + hash = (53 * hash) + getQualifier().hashCode(); + } + if (getGetPointListCount() > 0) { + hash = (37 * hash) + GET_POINT_LIST_FIELD_NUMBER; + hash = (53 * hash) + getGetPointListList().hashCode(); + } + if (getRangeListCount() > 0) { + hash = (37 * hash) + RANGE_LIST_FIELD_NUMBER; + hash = (53 * hash) + getRangeListList().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code hbase.pb.SQLPredicatePushDownColumnFilter} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getRangeListFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + columnFamily_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + qualifier_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + getPointList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000004); + if (rangeListBuilder_ == null) { + rangeList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); + } else { + rangeListBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter build() { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.columnFamily_ = columnFamily_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.qualifier_ = qualifier_; + if (((bitField0_ & 0x00000004) == 0x00000004)) { + getPointList_ = java.util.Collections.unmodifiableList(getPointList_); + bitField0_ = (bitField0_ & ~0x00000004); + } + result.getPointList_ = getPointList_; + if (rangeListBuilder_ == null) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { + rangeList_ = java.util.Collections.unmodifiableList(rangeList_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.rangeList_ = rangeList_; + } else { + result.rangeList_ = rangeListBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance()) return this; + if (other.hasColumnFamily()) { + setColumnFamily(other.getColumnFamily()); + } + if (other.hasQualifier()) { + setQualifier(other.getQualifier()); + } + if (!other.getPointList_.isEmpty()) { + if (getPointList_.isEmpty()) { + getPointList_ = other.getPointList_; + bitField0_ = (bitField0_ & ~0x00000004); + } else { + ensureGetPointListIsMutable(); + getPointList_.addAll(other.getPointList_); + } + onChanged(); + } + if (rangeListBuilder_ == null) { + if (!other.rangeList_.isEmpty()) { + if (rangeList_.isEmpty()) { + rangeList_ = other.rangeList_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureRangeListIsMutable(); + rangeList_.addAll(other.rangeList_); + } + onChanged(); + } + } else { + if (!other.rangeList_.isEmpty()) { + if (rangeListBuilder_.isEmpty()) { + rangeListBuilder_.dispose(); + rangeListBuilder_ = null; + rangeList_ = other.rangeList_; + bitField0_ = (bitField0_ & ~0x00000008); + rangeListBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getRangeListFieldBuilder() : null; + } else { + rangeListBuilder_.addAllMessages(other.rangeList_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasColumnFamily()) { + + return false; + } + if (!hasQualifier()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required bytes column_family = 1; + private com.google.protobuf.ByteString columnFamily_ = com.google.protobuf.ByteString.EMPTY; + /** + * required bytes column_family = 1; + */ + public boolean hasColumnFamily() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required bytes column_family = 1; + */ + public com.google.protobuf.ByteString getColumnFamily() { + return columnFamily_; + } + /** + * required bytes column_family = 1; + */ + public Builder setColumnFamily(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + columnFamily_ = value; + onChanged(); + return this; + } + /** + * required bytes column_family = 1; + */ + public Builder clearColumnFamily() { + bitField0_ = (bitField0_ & ~0x00000001); + columnFamily_ = getDefaultInstance().getColumnFamily(); + onChanged(); + return this; + } + + // required bytes qualifier = 2; + private com.google.protobuf.ByteString qualifier_ = com.google.protobuf.ByteString.EMPTY; + /** + * required bytes qualifier = 2; + */ + public boolean hasQualifier() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required bytes qualifier = 2; + */ + public com.google.protobuf.ByteString getQualifier() { + return qualifier_; + } + /** + * required bytes qualifier = 2; + */ + public Builder setQualifier(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + qualifier_ = value; + onChanged(); + return this; + } + /** + * required bytes qualifier = 2; + */ + public Builder clearQualifier() { + bitField0_ = (bitField0_ & ~0x00000002); + qualifier_ = getDefaultInstance().getQualifier(); + onChanged(); + return this; + } + + // repeated bytes get_point_list = 3; + private java.util.List getPointList_ = java.util.Collections.emptyList(); + private void ensureGetPointListIsMutable() { + if (!((bitField0_ & 0x00000004) == 0x00000004)) { + getPointList_ = new java.util.ArrayList(getPointList_); + bitField0_ |= 0x00000004; + } + } + /** + * repeated bytes get_point_list = 3; + */ + public java.util.List + getGetPointListList() { + return java.util.Collections.unmodifiableList(getPointList_); + } + /** + * repeated bytes get_point_list = 3; + */ + public int getGetPointListCount() { + return getPointList_.size(); + } + /** + * repeated bytes get_point_list = 3; + */ + public com.google.protobuf.ByteString getGetPointList(int index) { + return getPointList_.get(index); + } + /** + * repeated bytes get_point_list = 3; + */ + public Builder setGetPointList( + int index, com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureGetPointListIsMutable(); + getPointList_.set(index, value); + onChanged(); + return this; + } + /** + * repeated bytes get_point_list = 3; + */ + public Builder addGetPointList(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureGetPointListIsMutable(); + getPointList_.add(value); + onChanged(); + return this; + } + /** + * repeated bytes get_point_list = 3; + */ + public Builder addAllGetPointList( + java.lang.Iterable values) { + ensureGetPointListIsMutable(); + super.addAll(values, getPointList_); + onChanged(); + return this; + } + /** + * repeated bytes get_point_list = 3; + */ + public Builder clearGetPointList() { + getPointList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000004); + onChanged(); + return this; + } + + // repeated .hbase.pb.RowRange range_list = 4; + private java.util.List rangeList_ = + java.util.Collections.emptyList(); + private void ensureRangeListIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + rangeList_ = new java.util.ArrayList(rangeList_); + bitField0_ |= 0x00000008; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder> rangeListBuilder_; + + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public java.util.List getRangeListList() { + if (rangeListBuilder_ == null) { + return java.util.Collections.unmodifiableList(rangeList_); + } else { + return rangeListBuilder_.getMessageList(); + } + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public int getRangeListCount() { + if (rangeListBuilder_ == null) { + return rangeList_.size(); + } else { + return rangeListBuilder_.getCount(); + } + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index) { + if (rangeListBuilder_ == null) { + return rangeList_.get(index); + } else { + return rangeListBuilder_.getMessage(index); + } + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder setRangeList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) { + if (rangeListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureRangeListIsMutable(); + rangeList_.set(index, value); + onChanged(); + } else { + rangeListBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder setRangeList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) { + if (rangeListBuilder_ == null) { + ensureRangeListIsMutable(); + rangeList_.set(index, builderForValue.build()); + onChanged(); + } else { + rangeListBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder addRangeList(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) { + if (rangeListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureRangeListIsMutable(); + rangeList_.add(value); + onChanged(); + } else { + rangeListBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder addRangeList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) { + if (rangeListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureRangeListIsMutable(); + rangeList_.add(index, value); + onChanged(); + } else { + rangeListBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder addRangeList( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) { + if (rangeListBuilder_ == null) { + ensureRangeListIsMutable(); + rangeList_.add(builderForValue.build()); + onChanged(); + } else { + rangeListBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder addRangeList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) { + if (rangeListBuilder_ == null) { + ensureRangeListIsMutable(); + rangeList_.add(index, builderForValue.build()); + onChanged(); + } else { + rangeListBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder addAllRangeList( + java.lang.Iterable values) { + if (rangeListBuilder_ == null) { + ensureRangeListIsMutable(); + super.addAll(values, rangeList_); + onChanged(); + } else { + rangeListBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder clearRangeList() { + if (rangeListBuilder_ == null) { + rangeList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); + onChanged(); + } else { + rangeListBuilder_.clear(); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public Builder removeRangeList(int index) { + if (rangeListBuilder_ == null) { + ensureRangeListIsMutable(); + rangeList_.remove(index); + onChanged(); + } else { + rangeListBuilder_.remove(index); + } + return this; + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder getRangeListBuilder( + int index) { + return getRangeListFieldBuilder().getBuilder(index); + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder( + int index) { + if (rangeListBuilder_ == null) { + return rangeList_.get(index); } else { + return rangeListBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public java.util.List + getRangeListOrBuilderList() { + if (rangeListBuilder_ != null) { + return rangeListBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(rangeList_); + } + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder addRangeListBuilder() { + return getRangeListFieldBuilder().addBuilder( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.getDefaultInstance()); + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder addRangeListBuilder( + int index) { + return getRangeListFieldBuilder().addBuilder( + index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.getDefaultInstance()); + } + /** + * repeated .hbase.pb.RowRange range_list = 4; + */ + public java.util.List + getRangeListBuilderList() { + return getRangeListFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder> + getRangeListFieldBuilder() { + if (rangeListBuilder_ == null) { + rangeListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>( + rangeList_, + ((bitField0_ & 0x00000008) == 0x00000008), + getParentForChildren(), + isClean()); + rangeList_ = null; + } + return rangeListBuilder_; + } + + // @@protoc_insertion_point(builder_scope:hbase.pb.SQLPredicatePushDownColumnFilter) + } + + static { + defaultInstance = new SQLPredicatePushDownColumnFilter(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:hbase.pb.SQLPredicatePushDownColumnFilter) + } + + public interface SQLPredicatePushDownFilterOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + java.util.List + getColumnFilterListList(); + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index); + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + int getColumnFilterListCount(); + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + java.util.List + getColumnFilterListOrBuilderList(); + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder( + int index); + } + /** + * Protobuf type {@code hbase.pb.SQLPredicatePushDownFilter} + */ + public static final class SQLPredicatePushDownFilter extends + com.google.protobuf.GeneratedMessage + implements SQLPredicatePushDownFilterOrBuilder { + // Use SQLPredicatePushDownFilter.newBuilder() to construct. + private SQLPredicatePushDownFilter(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SQLPredicatePushDownFilter(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SQLPredicatePushDownFilter defaultInstance; + public static SQLPredicatePushDownFilter getDefaultInstance() { + return defaultInstance; + } + + public SQLPredicatePushDownFilter getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SQLPredicatePushDownFilter( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + columnFilterList_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000001; + } + columnFilterList_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + columnFilterList_ = java.util.Collections.unmodifiableList(columnFilterList_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public SQLPredicatePushDownFilter parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SQLPredicatePushDownFilter(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + public static final int COLUMN_FILTER_LIST_FIELD_NUMBER = 1; + private java.util.List columnFilterList_; + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public java.util.List getColumnFilterListList() { + return columnFilterList_; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public java.util.List + getColumnFilterListOrBuilderList() { + return columnFilterList_; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public int getColumnFilterListCount() { + return columnFilterList_.size(); + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index) { + return columnFilterList_.get(index); + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder( + int index) { + return columnFilterList_.get(index); + } + + private void initFields() { + columnFilterList_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + for (int i = 0; i < getColumnFilterListCount(); i++) { + if (!getColumnFilterList(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < columnFilterList_.size(); i++) { + output.writeMessage(1, columnFilterList_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + for (int i = 0; i < columnFilterList_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, columnFilterList_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter other = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) obj; + + boolean result = true; + result = result && getColumnFilterListList() + .equals(other.getColumnFilterListList()); + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (getColumnFilterListCount() > 0) { + hash = (37 * hash) + COLUMN_FILTER_LIST_FIELD_NUMBER; + hash = (53 * hash) + getColumnFilterListList().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code hbase.pb.SQLPredicatePushDownFilter} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilterOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getColumnFilterListFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (columnFilterListBuilder_ == null) { + columnFilterList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + } else { + columnFilterListBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter build() { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter(this); + int from_bitField0_ = bitField0_; + if (columnFilterListBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001)) { + columnFilterList_ = java.util.Collections.unmodifiableList(columnFilterList_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.columnFilterList_ = columnFilterList_; + } else { + result.columnFilterList_ = columnFilterListBuilder_.build(); + } + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.getDefaultInstance()) return this; + if (columnFilterListBuilder_ == null) { + if (!other.columnFilterList_.isEmpty()) { + if (columnFilterList_.isEmpty()) { + columnFilterList_ = other.columnFilterList_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureColumnFilterListIsMutable(); + columnFilterList_.addAll(other.columnFilterList_); + } + onChanged(); + } + } else { + if (!other.columnFilterList_.isEmpty()) { + if (columnFilterListBuilder_.isEmpty()) { + columnFilterListBuilder_.dispose(); + columnFilterListBuilder_ = null; + columnFilterList_ = other.columnFilterList_; + bitField0_ = (bitField0_ & ~0x00000001); + columnFilterListBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getColumnFilterListFieldBuilder() : null; + } else { + columnFilterListBuilder_.addAllMessages(other.columnFilterList_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + for (int i = 0; i < getColumnFilterListCount(); i++) { + if (!getColumnFilterList(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + private java.util.List columnFilterList_ = + java.util.Collections.emptyList(); + private void ensureColumnFilterListIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + columnFilterList_ = new java.util.ArrayList(columnFilterList_); + bitField0_ |= 0x00000001; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder> columnFilterListBuilder_; + + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public java.util.List getColumnFilterListList() { + if (columnFilterListBuilder_ == null) { + return java.util.Collections.unmodifiableList(columnFilterList_); + } else { + return columnFilterListBuilder_.getMessageList(); + } + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public int getColumnFilterListCount() { + if (columnFilterListBuilder_ == null) { + return columnFilterList_.size(); + } else { + return columnFilterListBuilder_.getCount(); + } + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index) { + if (columnFilterListBuilder_ == null) { + return columnFilterList_.get(index); + } else { + return columnFilterListBuilder_.getMessage(index); + } + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder setColumnFilterList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) { + if (columnFilterListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureColumnFilterListIsMutable(); + columnFilterList_.set(index, value); + onChanged(); + } else { + columnFilterListBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder setColumnFilterList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) { + if (columnFilterListBuilder_ == null) { + ensureColumnFilterListIsMutable(); + columnFilterList_.set(index, builderForValue.build()); + onChanged(); + } else { + columnFilterListBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder addColumnFilterList(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) { + if (columnFilterListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureColumnFilterListIsMutable(); + columnFilterList_.add(value); + onChanged(); + } else { + columnFilterListBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder addColumnFilterList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) { + if (columnFilterListBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureColumnFilterListIsMutable(); + columnFilterList_.add(index, value); + onChanged(); + } else { + columnFilterListBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder addColumnFilterList( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) { + if (columnFilterListBuilder_ == null) { + ensureColumnFilterListIsMutable(); + columnFilterList_.add(builderForValue.build()); + onChanged(); + } else { + columnFilterListBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder addColumnFilterList( + int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) { + if (columnFilterListBuilder_ == null) { + ensureColumnFilterListIsMutable(); + columnFilterList_.add(index, builderForValue.build()); + onChanged(); + } else { + columnFilterListBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder addAllColumnFilterList( + java.lang.Iterable values) { + if (columnFilterListBuilder_ == null) { + ensureColumnFilterListIsMutable(); + super.addAll(values, columnFilterList_); + onChanged(); + } else { + columnFilterListBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder clearColumnFilterList() { + if (columnFilterListBuilder_ == null) { + columnFilterList_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + } else { + columnFilterListBuilder_.clear(); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public Builder removeColumnFilterList(int index) { + if (columnFilterListBuilder_ == null) { + ensureColumnFilterListIsMutable(); + columnFilterList_.remove(index); + onChanged(); + } else { + columnFilterListBuilder_.remove(index); + } + return this; + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder getColumnFilterListBuilder( + int index) { + return getColumnFilterListFieldBuilder().getBuilder(index); + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder( + int index) { + if (columnFilterListBuilder_ == null) { + return columnFilterList_.get(index); } else { + return columnFilterListBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public java.util.List + getColumnFilterListOrBuilderList() { + if (columnFilterListBuilder_ != null) { + return columnFilterListBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(columnFilterList_); + } + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder addColumnFilterListBuilder() { + return getColumnFilterListFieldBuilder().addBuilder( + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance()); + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder addColumnFilterListBuilder( + int index) { + return getColumnFilterListFieldBuilder().addBuilder( + index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance()); + } + /** + * repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1; + */ + public java.util.List + getColumnFilterListBuilderList() { + return getColumnFilterListFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder> + getColumnFilterListFieldBuilder() { + if (columnFilterListBuilder_ == null) { + columnFilterListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>( + columnFilterList_, + ((bitField0_ & 0x00000001) == 0x00000001), + getParentForChildren(), + isClean()); + columnFilterList_ = null; + } + return columnFilterListBuilder_; + } + + // @@protoc_insertion_point(builder_scope:hbase.pb.SQLPredicatePushDownFilter) + } + + static { + defaultInstance = new SQLPredicatePushDownFilter(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:hbase.pb.SQLPredicatePushDownFilter) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_hbase_pb_Filter_descriptor; private static @@ -17450,6 +19241,16 @@ public final class FilterProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_hbase_pb_MultiRowRangeFilter_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -17512,9 +19313,15 @@ public final class FilterProtos { "ow\030\001 \001(\014\022\033\n\023start_row_inclusive\030\002 \001(\010\022\020\n" + "\010stop_row\030\003 \001(\014\022\032\n\022stop_row_inclusive\030\004 " + "\001(\010\"A\n\023MultiRowRangeFilter\022*\n\016row_range_" + - "list\030\001 \003(\0132\022.hbase.pb.RowRangeBB\n*org.ap" + - "ache.hadoop.hbase.protobuf.generatedB\014Fi" + - "lterProtosH\001\210\001\001\240\001\001" + "list\030\001 \003(\0132\022.hbase.pb.RowRange\"\214\001\n SQLPr" + + "edicatePushDownColumnFilter\022\025\n\rcolumn_fa" + + "mily\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\022\026\n\016get_poi" + + "nt_list\030\003 \003(\014\022&\n\nrange_list\030\004 \003(\0132\022.hbas" + + "e.pb.RowRange\"d\n\032SQLPredicatePushDownFil" + + "ter\022F\n\022column_filter_list\030\001 \003(\0132*.hbase." + + "pb.SQLPredicatePushDownColumnFilterBB\n*o", + "rg.apache.hadoop.hbase.protobuf.generate" + + "dB\014FilterProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -17701,6 +19508,18 @@ public final class FilterProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_MultiRowRangeFilter_descriptor, new java.lang.String[] { "RowRangeList", }); + internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor = + getDescriptor().getMessageTypes().get(30); + internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor, + new java.lang.String[] { "ColumnFamily", "Qualifier", "GetPointList", "RangeList", }); + internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor = + getDescriptor().getMessageTypes().get(31); + internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor, + new java.lang.String[] { "ColumnFilterList", }); return null; } }; diff --git a/hbase-protocol/src/main/protobuf/Filter.proto b/hbase-protocol/src/main/protobuf/Filter.proto index 67d57172052..b2dc7d0de01 100644 --- a/hbase-protocol/src/main/protobuf/Filter.proto +++ b/hbase-protocol/src/main/protobuf/Filter.proto @@ -167,4 +167,17 @@ message RowRange { message MultiRowRangeFilter { repeated RowRange row_range_list = 1; +} + + +message SQLPredicatePushDownColumnFilter { + required bytes column_family = 1; + required bytes qualifier = 2; + repeated bytes get_point_list = 3; + repeated RowRange range_list = 4; + +} + +message SQLPredicatePushDownFilter { + repeated SQLPredicatePushDownColumnFilter column_filter_list = 1; } \ No newline at end of file diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml index e48f9e81ad6..8110629cad9 100644 --- a/hbase-spark/pom.xml +++ b/hbase-spark/pom.xml @@ -77,6 +77,12 @@ + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + org.apache.spark spark-streaming_${scala.binary.version} @@ -317,6 +323,12 @@ + + org.apache.hbase + hbase-protocol + ${project.version} + + org.apache.hbase hbase-hadoop-compat diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java new file mode 100644 index 00000000000..d0f10912291 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; +import org.apache.hadoop.hbase.util.ByteStringer; +import scala.collection.mutable.MutableList; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This filter will push down all qualifier logic given to us + * by SparkSQL so that we have make the filters at the region server level + * and avoid sending the data back to the client to be filtered. + */ +public class SparkSQLPushDownFilter extends FilterBase { + protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class); + + HashMap columnFamilyQualifierFilterMap; + + public SparkSQLPushDownFilter(HashMap columnFamilyQualifierFilterMap) { + this.columnFamilyQualifierFilterMap = columnFamilyQualifierFilterMap; + } + + /** + * This method will find the related filter logic for the given + * column family and qualifier then execute it. It will also + * not clone the in coming cell to avoid extra object creation + * + * @param c The Cell to be validated + * @return ReturnCode object to determine if skipping is required + * @throws IOException + */ + @Override + public ReturnCode filterKeyValue(Cell c) throws IOException { + + //Get filter if one exist + ColumnFilter filter = + columnFamilyQualifierFilterMap.get(new ColumnFamilyQualifierMapKeyWrapper( + c.getFamilyArray(), + c.getFamilyOffset(), + c.getFamilyLength(), + c.getQualifierArray(), + c.getQualifierOffset(), + c.getQualifierLength())); + + if (filter == null) { + //If no filter then just include values + return ReturnCode.INCLUDE; + } else { + //If there is a filter then run validation + if (filter.validate(c.getValueArray(), c.getValueOffset(), c.getValueLength())) { + return ReturnCode.INCLUDE; + } + } + //If validation fails then skip whole row + return ReturnCode.NEXT_ROW; + } + + /** + * @param pbBytes A pb serialized instance + * @return An instance of SparkSQLPushDownFilter + * @throws org.apache.hadoop.hbase.exceptions.DeserializationException + */ + @SuppressWarnings("unused") + public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes) + throws DeserializationException { + + HashMap columnFamilyQualifierFilterMap = new HashMap<>(); + + FilterProtos.SQLPredicatePushDownFilter proto; + try { + proto = FilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + final List columnFilterListList = + proto.getColumnFilterListList(); + + for (FilterProtos.SQLPredicatePushDownColumnFilter columnFilter: columnFilterListList) { + + byte[] columnFamily = columnFilter.getColumnFamily().toByteArray(); + byte[] qualifier = columnFilter.getQualifier().toByteArray(); + final ColumnFamilyQualifierMapKeyWrapper columnFamilyQualifierMapKeyWrapper = + new ColumnFamilyQualifierMapKeyWrapper(columnFamily, 0, columnFamily.length, + qualifier, 0, qualifier.length); + + final MutableList points = new MutableList<>(); + final MutableList scanRanges = new MutableList<>(); + + for (ByteString byteString: columnFilter.getGetPointListList()) { + points.$plus$eq(byteString.toByteArray()); + } + + for (FilterProtos.RowRange rowRange: columnFilter.getRangeListList()) { + ScanRange scanRange = new ScanRange(rowRange.getStopRow().toByteArray(), + rowRange.getStopRowInclusive(), + rowRange.getStartRow().toByteArray(), + rowRange.getStartRowInclusive()); + scanRanges.$plus$eq(scanRange); + } + + final ColumnFilter columnFilterObj = new ColumnFilter(null, null, points, scanRanges); + + columnFamilyQualifierFilterMap.put(columnFamilyQualifierMapKeyWrapper, columnFilterObj); + } + + return new SparkSQLPushDownFilter(columnFamilyQualifierFilterMap); + } + + /** + * @return The filter serialized using pb + */ + public byte[] toByteArray() { + + FilterProtos.SQLPredicatePushDownFilter.Builder builder = + FilterProtos.SQLPredicatePushDownFilter.newBuilder(); + + FilterProtos.SQLPredicatePushDownColumnFilter.Builder columnBuilder = + FilterProtos.SQLPredicatePushDownColumnFilter.newBuilder(); + + FilterProtos.RowRange.Builder rowRangeBuilder = FilterProtos.RowRange.newBuilder(); + + for (Map.Entry entry : + columnFamilyQualifierFilterMap.entrySet()) { + + columnBuilder.setColumnFamily( + ByteStringer.wrap(entry.getKey().cloneColumnFamily())); + columnBuilder.setQualifier( + ByteStringer.wrap(entry.getKey().cloneQualifier())); + + final MutableList points = entry.getValue().points(); + + int pointLength = points.length(); + for (int i = 0; i < pointLength; i++) { + byte[] point = points.get(i).get(); + columnBuilder.addGetPointList(ByteStringer.wrap(point)); + + } + + final MutableList ranges = entry.getValue().ranges(); + int rangeLength = ranges.length(); + for (int i = 0; i < rangeLength; i++) { + ScanRange scanRange = ranges.get(i).get(); + rowRangeBuilder.clear(); + rowRangeBuilder.setStartRow(ByteStringer.wrap(scanRange.lowerBound())); + rowRangeBuilder.setStopRow(ByteStringer.wrap(scanRange.upperBound())); + rowRangeBuilder.setStartRowInclusive(scanRange.isLowerBoundEqualTo()); + rowRangeBuilder.setStopRowInclusive(scanRange.isUpperBoundEqualTo()); + + columnBuilder.addRangeList(rowRangeBuilder.build()); + } + + builder.addColumnFilterList(columnBuilder.build()); + columnBuilder.clear(); + } + return builder.build().toByteArray(); + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala new file mode 100644 index 00000000000..f223d1b8bd4 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.util.Bytes + +/** + * A wrapper class that will allow both columnFamily and qualifier to + * be the key of a hashMap. Also allow for finding the value in a hashmap + * with out cloning the HBase value from the HBase Cell object + * @param columnFamily ColumnFamily byte array + * @param columnFamilyOffSet Offset of columnFamily value in the array + * @param columnFamilyLength Length of the columnFamily value in the columnFamily array + * @param qualifier Qualifier byte array + * @param qualifierOffSet Offset of qualifier value in the array + * @param qualifierLength Length of the qualifier value with in the array + */ +class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte], + val columnFamilyOffSet:Int, + val columnFamilyLength:Int, + val qualifier:Array[Byte], + val qualifierOffSet:Int, + val qualifierLength:Int) + extends Serializable{ + + override def equals(other:Any): Boolean = { + val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper] + + Bytes.compareTo(columnFamily, + columnFamilyOffSet, + columnFamilyLength, + otherWrapper.columnFamily, + otherWrapper.columnFamilyOffSet, + otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier, + qualifierOffSet, + qualifierLength, + otherWrapper.qualifier, + otherWrapper.qualifierOffSet, + otherWrapper.qualifierLength) == 0 + } + + override def hashCode():Int = { + Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) + + Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength) + } + + def cloneColumnFamily():Array[Byte] = { + val resultArray = new Array[Byte](columnFamilyLength) + System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, columnFamilyLength) + resultArray + } + + def cloneQualifier():Array[Byte] = { + val resultArray = new Array[Byte](qualifierLength) + System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, qualifierLength) + resultArray + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala new file mode 100644 index 00000000000..a180de2236f --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -0,0 +1,982 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import java.util +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan} +import org.apache.hadoop.hbase.types._ +import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes} +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import scala.collection.mutable + +/** + * DefaultSource for integration with Spark's dataframe datasources. + * This class will produce a relationProvider based on input given to it from spark + * + * In all this DefaultSource support the following datasource functionality + * - Scan range pruning through filter push down logic based on rowKeys + * - Filter push down logic on HBase Cells + * - Qualifier filtering based on columns used in the SparkSQL statement + * - Type conversions of basic SQL types. All conversions will be + * Through the HBase Bytes object commands. + */ +class DefaultSource extends RelationProvider { + + val TABLE_KEY:String = "hbase.table" + val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping" + val BATCHING_NUM_KEY:String = "hbase.batching.num" + val CACHING_NUM_KEY:String = "hbase.caching.num" + val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources" + val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context" + + /** + * Is given input from SparkSQL to construct a BaseRelation + * @param sqlContext SparkSQL context + * @param parameters Parameters given to us from SparkSQL + * @return A BaseRelation Object + */ + override def createRelation(sqlContext: SQLContext, + parameters: Map[String, String]): + BaseRelation = { + + + val tableName = parameters.get(TABLE_KEY) + if (tableName.isEmpty) + new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'") + + val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "") + val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000") + val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000") + val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "") + val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true") + + val batchingNum:Int = try { + batchingNumStr.toInt + } catch { + case e:NumberFormatException => throw + new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '" + + batchingNumStr + "'", e) + } + + val cachingNum:Int = try { + cachingNumStr.toInt + } catch { + case e:NumberFormatException => throw + new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '" + + cachingNumStr + "'", e) + } + + new HBaseRelation(tableName.get, + generateSchemaMappingMap(schemaMappingString), + batchingNum.toInt, + cachingNum.toInt, + hbaseConfigResources, + useHBaseReources.equalsIgnoreCase("true"))(sqlContext) + } + + /** + * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of + * SchemaQualifierDefinitions with the original sql column name as the key + * @param schemaMappingString The schema mapping string from the SparkSQL map + * @return A map of definitions keyed by the SparkSQL column name + */ + def generateSchemaMappingMap(schemaMappingString:String): + java.util.HashMap[String, SchemaQualifierDefinition] = { + try { + val columnDefinitions = schemaMappingString.split(',') + val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]() + columnDefinitions.map(cd => { + val parts = cd.trim.split(' ') + + //Make sure we get three parts + // + if (parts.length == 3) { + val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') { + Array[String]("", "key") + } else { + parts(2).split(':') + } + resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0), + parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1))) + } else { + throw new IllegalArgumentException("Invalid value for schema mapping '" + cd + + "' should be ' :' " + + "for columns and ' :' for rowKeys") + } + }) + resultingMap + } catch { + case e:Exception => throw + new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY + + " '" + schemaMappingString + "'", e ) + } + } +} + +/** + * Implementation of Spark BaseRelation that will build up our scan logic + * , do the scan pruning, filter push down, and value conversions + * + * @param tableName HBase table that we plan to read from + * @param schemaMappingDefinition SchemaMapping information to map HBase + * Qualifiers to SparkSQL columns + * @param batchingNum The batching number to be applied to the + * scan object + * @param cachingNum The caching number to be applied to the + * scan object + * @param configResources Optional comma separated list of config resources + * to get based on their URI + * @param useHBaseContext If true this will look to see if + * HBaseContext.latest is populated to use that + * connection information + * @param sqlContext SparkSQL context + */ +class HBaseRelation (val tableName:String, + val schemaMappingDefinition: + java.util.HashMap[String, SchemaQualifierDefinition], + val batchingNum:Int, + val cachingNum:Int, + val configResources:String, + val useHBaseContext:Boolean) ( + @transient val sqlContext:SQLContext) + extends BaseRelation with PrunedFilteredScan with Logging { + + //create or get latest HBaseContext + @transient val hbaseContext:HBaseContext = if (useHBaseContext) { + LatestHBaseContextCache.latest + } else { + val config = HBaseConfiguration.create() + configResources.split(",").foreach( r => config.addResource(r)) + new HBaseContext(sqlContext.sparkContext, config) + } + + /** + * Generates a Spark SQL schema object so Spark SQL knows what is being + * provided by this BaseRelation + * + * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value + */ + override def schema: StructType = { + + val metadataBuilder = new MetadataBuilder() + + val structFieldArray = new Array[StructField](schemaMappingDefinition.size()) + + val schemaMappingDefinitionIt = schemaMappingDefinition.values().iterator() + var indexCounter = 0 + while (schemaMappingDefinitionIt.hasNext) { + val c = schemaMappingDefinitionIt.next() + + val metadata = metadataBuilder.putString("name", c.columnName).build() + val structField = + new StructField(c.columnName, c.columnSparkSqlType, nullable = true, metadata) + + structFieldArray(indexCounter) = structField + indexCounter += 1 + } + + val result = new StructType(structFieldArray) + result + } + + /** + * Here we are building the functionality to populate the resulting RDD[Row] + * Here is where we will do the following: + * - Filter push down + * - Scan or GetList pruning + * - Executing our scan(s) or/and GetList to generate result + * + * @param requiredColumns The columns that are being requested by the requesting query + * @param filters The filters that are being applied by the requesting query + * @return RDD will all the results from HBase needed for SparkSQL to + * execute the query on + */ + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + + val columnFilterCollection = buildColumnFilterCollection(filters) + + val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition] + requiredColumns.foreach( c => { + val definition = schemaMappingDefinition.get(c) + if (definition.columnFamilyBytes.length > 0) { + requiredQualifierDefinitionArray += definition + } + }) + + //Create a local variable so that scala doesn't have to + // serialize the whole HBaseRelation Object + val serializableDefinitionMap = schemaMappingDefinition + + + //retain the information for unit testing checks + DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection, + requiredQualifierDefinitionArray) + var resultRDD: RDD[Row] = null + + if (columnFilterCollection != null) { + val pushDownFilterJava = + new SparkSQLPushDownFilter( + columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition)) + + val getList = new util.ArrayList[Get]() + val rddList = new util.ArrayList[RDD[Row]]() + + val it = columnFilterCollection.columnFilterMap.iterator + while (it.hasNext) { + val e = it.next() + val columnDefinition = schemaMappingDefinition.get(e._1) + //check is a rowKey + if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) { + //add points to getList + e._2.points.foreach(p => { + val get = new Get(p) + requiredQualifierDefinitionArray.foreach( d => + get.addColumn(d.columnFamilyBytes, d.qualifierBytes)) + getList.add(get) + }) + + val rangeIt = e._2.ranges.iterator + + while (rangeIt.hasNext) { + val r = rangeIt.next() + + val scan = new Scan() + scan.setBatch(batchingNum) + scan.setCaching(cachingNum) + requiredQualifierDefinitionArray.foreach( d => + scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) + + if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) { + scan.setFilter(pushDownFilterJava) + } + + //Check if there is a lower bound + if (r.lowerBound != null && r.lowerBound.length > 0) { + + if (r.isLowerBoundEqualTo) { + //HBase startRow is inclusive: Therefore it acts like isLowerBoundEqualTo + // by default + scan.setStartRow(r.lowerBound) + } else { + //Since we don't equalTo we want the next value we need + // to add another byte to the start key. That new byte will be + // the min byte value. + val newArray = new Array[Byte](r.lowerBound.length + 1) + System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length) + + //new Min Byte + newArray(r.lowerBound.length) = Byte.MinValue + scan.setStartRow(newArray) + } + } + + //Check if there is a upperBound + if (r.upperBound != null && r.upperBound.length > 0) { + if (r.isUpperBoundEqualTo) { + //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo + // by default. So we need to add a new max byte to the stopRow key + val newArray = new Array[Byte](r.upperBound.length + 1) + System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length) + + //New Max Bytes + newArray(r.upperBound.length) = Byte.MaxValue + + scan.setStopRow(newArray) + } else { + //Here equalTo is false for Upper bound which is exclusive and + // HBase stopRow acts like that by default so no need to mutate the + // rowKey + scan.setStopRow(r.upperBound) + } + } + + val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => { + Row.fromSeq(requiredColumns.map(c => + DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2))) + }) + rddList.add(rdd) + } + } + } + + //If there is more then one RDD then we have to union them together + for (i <- 0 until rddList.size()) { + if (resultRDD == null) resultRDD = rddList.get(i) + else resultRDD = resultRDD.union(rddList.get(i)) + + } + + //If there are gets then we can get them from the driver and union that rdd in + // with the rest of the values. + if (getList.size() > 0) { + val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration) + try { + val table = connection.getTable(TableName.valueOf(tableName)) + try { + val results = table.get(getList) + val rowList = mutable.MutableList[Row]() + for (i <- 0 until results.length) { + val rowArray = requiredColumns.map(c => + DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i))) + rowList += Row.fromSeq(rowArray) + } + val getRDD = sqlContext.sparkContext.parallelize(rowList) + if (resultRDD == null) resultRDD = getRDD + else { + resultRDD = resultRDD.union(getRDD) + } + } finally { + table.close() + } + } finally { + connection.close() + } + } + } + if (resultRDD == null) { + val scan = new Scan() + scan.setBatch(batchingNum) + scan.setCaching(cachingNum) + requiredQualifierDefinitionArray.foreach( d => + scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) + + val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => { + Row.fromSeq(requiredColumns.map(c => DefaultSourceStaticUtils.getValue(c, + serializableDefinitionMap, r._2))) + }) + resultRDD=rdd + } + resultRDD + } + + /** + * Root recursive function that will loop over the filters provided by + * SparkSQL. Some filters are AND or OR functions and contain additional filters + * hence the need for recursion. + * + * @param filters Filters provided by SparkSQL. + * Filters are joined with the AND operater + * @return A ColumnFilterCollection whish is a consolidated construct to + * hold the high level filter information + */ + def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = { + var superCollection: ColumnFilterCollection = null + + filters.foreach( f => { + val parentCollection = new ColumnFilterCollection + buildColumnFilterCollection(parentCollection, f) + if (superCollection == null) + superCollection = parentCollection + else + superCollection.mergeIntersect(parentCollection) + }) + superCollection + } + + /** + * Recursive function that will work to convert Spark Filter + * objects to ColumnFilterCollection + * + * @param parentFilterCollection Parent ColumnFilterCollection + * @param filter Current given filter from SparkSQL + */ + def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection, + filter:Filter): Unit = { + filter match { + + case EqualTo(attr, value) => + parentFilterCollection.mergeUnion(attr, + new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr, + schemaMappingDefinition, value.toString))) + + case LessThan(attr, value) => + parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, + new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, + schemaMappingDefinition, value.toString), false, + new Array[Byte](0), true))) + + case GreaterThan(attr, value) => + parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, + new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, + schemaMappingDefinition, value.toString), false))) + + case LessThanOrEqual(attr, value) => + parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, + new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, + schemaMappingDefinition, value.toString), true, + new Array[Byte](0), true))) + + case GreaterThanOrEqual(attr, value) => + parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, + new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, + schemaMappingDefinition, value.toString), true))) + + case Or(left, right) => + buildColumnFilterCollection(parentFilterCollection, left) + val rightSideCollection = new ColumnFilterCollection + buildColumnFilterCollection(rightSideCollection, right) + parentFilterCollection.mergeUnion(rightSideCollection) + case And(left, right) => + buildColumnFilterCollection(parentFilterCollection, left) + val rightSideCollection = new ColumnFilterCollection + buildColumnFilterCollection(rightSideCollection, right) + parentFilterCollection.mergeIntersect(rightSideCollection) + case _ => //nothing + } + } +} + +/** + * Construct to contains column data that spend SparkSQL and HBase + * + * @param columnName SparkSQL column name + * @param colType SparkSQL column type + * @param columnFamily HBase column family + * @param qualifier HBase qualifier name + */ +case class SchemaQualifierDefinition(columnName:String, + colType:String, + columnFamily:String, + qualifier:String) extends Serializable { + val columnFamilyBytes = Bytes.toBytes(columnFamily) + val qualifierBytes = Bytes.toBytes(qualifier) + val columnSparkSqlType:DataType = if (colType.equals("BOOLEAN")) BooleanType + else if (colType.equals("TINYINT")) IntegerType + else if (colType.equals("INT")) IntegerType + else if (colType.equals("BIGINT")) LongType + else if (colType.equals("FLOAT")) FloatType + else if (colType.equals("DOUBLE")) DoubleType + else if (colType.equals("STRING")) StringType + else if (colType.equals("TIMESTAMP")) TimestampType + else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale) + else throw new IllegalArgumentException("Unsupported column type :" + colType) +} + +/** + * Construct to contain a single scan ranges information. Also + * provide functions to merge with other scan ranges through AND + * or OR operators + * + * @param upperBound Upper bound of scan + * @param isUpperBoundEqualTo Include upper bound value in the results + * @param lowerBound Lower bound of scan + * @param isLowerBoundEqualTo Include lower bound value in the results + */ +class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, + var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean) + extends Serializable { + + /** + * Function to merge another scan object through a AND operation + * @param other Other scan object + */ + def mergeIntersect(other:ScanRange): Unit = { + val upperBoundCompare = compareRange(upperBound, other.upperBound) + val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) + + upperBound = if (upperBoundCompare <0) upperBound else other.upperBound + lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound + + isLowerBoundEqualTo = if (lowerBoundCompare == 0) + isLowerBoundEqualTo && other.isLowerBoundEqualTo + else isLowerBoundEqualTo + + isUpperBoundEqualTo = if (upperBoundCompare == 0) + isUpperBoundEqualTo && other.isUpperBoundEqualTo + else isUpperBoundEqualTo + } + + /** + * Function to merge another scan object through a OR operation + * @param other Other scan object + */ + def mergeUnion(other:ScanRange): Unit = { + + val upperBoundCompare = compareRange(upperBound, other.upperBound) + val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) + + upperBound = if (upperBoundCompare >0) upperBound else other.upperBound + lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound + + isLowerBoundEqualTo = if (lowerBoundCompare == 0) + isLowerBoundEqualTo || other.isLowerBoundEqualTo + else isLowerBoundEqualTo + + isUpperBoundEqualTo = if (upperBoundCompare == 0) + isUpperBoundEqualTo || other.isUpperBoundEqualTo + else isUpperBoundEqualTo + } + + /** + * Common function to see if this scan over laps with another + * + * Reference Visual + * + * A B + * |---------------------------| + * LL--------------LU + * RL--------------RU + * + * A = lowest value is byte[0] + * B = highest value is null + * LL = Left Lower Bound + * LU = Left Upper Bound + * RL = Right Lower Bound + * RU = Right Upper Bound + * + * @param other Other scan object + * @return True is overlap false is not overlap + */ + def doesOverLap(other:ScanRange): Boolean = { + + var leftRange:ScanRange = null + var rightRange:ScanRange = null + + //First identify the Left range + // Also lower bound can't be null + if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) { + leftRange = this + rightRange = other + } else { + leftRange = other + rightRange = this + } + + //Then see if leftRange goes to null or if leftRange.upperBound + // upper is greater or equals to rightRange.lowerBound + leftRange.upperBound == null || + Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0 + } + + /** + * Special compare logic because we can have null values + * for left or right bound + * + * @param left Left byte array + * @param right Right byte array + * @return 0 for equals 1 is left is greater and -1 is right is greater + */ + def compareRange(left:Array[Byte], right:Array[Byte]): Int = { + if (left == null && right == null) 0 + else if (left == null && right != null) 1 + else if (left != null && right == null) -1 + else Bytes.compareTo(left, right) + } + override def toString:String = { + "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," + + Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")" + } +} + +/** + * Contains information related to a filters for a given column. + * This can contain many ranges or points. + * + * @param currentPoint the initial point when the filter is created + * @param currentRange the initial scanRange when the filter is created + */ +class ColumnFilter (currentPoint:Array[Byte] = null, + currentRange:ScanRange = null, + var points:mutable.MutableList[Array[Byte]] = + new mutable.MutableList[Array[Byte]](), + var ranges:mutable.MutableList[ScanRange] = + new mutable.MutableList[ScanRange]() ) extends Serializable { + //Collection of ranges + if (currentRange != null ) ranges.+=(currentRange) + + //Collection of points + if (currentPoint != null) points.+=(currentPoint) + + /** + * This will validate a give value through the filter's points and/or ranges + * the result will be if the value passed the filter + * + * @param value Value to be validated + * @param valueOffSet The offset of the value + * @param valueLength The length of the value + * @return True is the value passes the filter false if not + */ + def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = { + var result = false + + points.foreach( p => { + if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) { + result = true + } + }) + + ranges.foreach( r => { + val upperBoundPass = r.upperBound == null || + (r.isUpperBoundEqualTo && + Bytes.compareTo(r.upperBound, 0, r.upperBound.length, + value, valueOffSet, valueLength) >= 0) || + (!r.isUpperBoundEqualTo && + Bytes.compareTo(r.upperBound, 0, r.upperBound.length, + value, valueOffSet, valueLength) > 0) + + val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0 + (r.isLowerBoundEqualTo && + Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length, + value, valueOffSet, valueLength) <= 0) || + (!r.isLowerBoundEqualTo && + Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length, + value, valueOffSet, valueLength) < 0) + + result = result || (upperBoundPass && lowerBoundPass) + }) + result + } + + /** + * This will allow us to merge filter logic that is joined to the existing filter + * through a OR operator + * + * @param other Filter to merge + */ + def mergeUnion(other:ColumnFilter): Unit = { + other.points.foreach( p => points += p) + + other.ranges.foreach( otherR => { + var doesOverLap = false + ranges.foreach{ r => + if (r.doesOverLap(otherR)) { + r.mergeUnion(otherR) + doesOverLap = true + }} + if (!doesOverLap) ranges.+=(otherR) + }) + } + + /** + * This will allow us to merge filter logic that is joined to the existing filter + * through a AND operator + * + * @param other Filter to merge + */ + def mergeIntersect(other:ColumnFilter): Unit = { + val survivingPoints = new mutable.MutableList[Array[Byte]]() + points.foreach( p => { + other.points.foreach( otherP => { + if (Bytes.equals(p, otherP)) { + survivingPoints.+=(p) + } + }) + }) + points = survivingPoints + + val survivingRanges = new mutable.MutableList[ScanRange]() + + other.ranges.foreach( otherR => { + ranges.foreach( r => { + if (r.doesOverLap(otherR)) { + r.mergeIntersect(otherR) + survivingRanges += r + } + }) + }) + ranges = survivingRanges + } + + override def toString:String = { + val strBuilder = new StringBuilder + strBuilder.append("(points:(") + var isFirst = true + points.foreach( p => { + if (isFirst) isFirst = false + else strBuilder.append(",") + strBuilder.append(Bytes.toString(p)) + }) + strBuilder.append("),ranges:") + isFirst = true + ranges.foreach( r => { + if (isFirst) isFirst = false + else strBuilder.append(",") + strBuilder.append(r) + }) + strBuilder.append("))") + strBuilder.toString() + } +} + +/** + * A collection of ColumnFilters indexed by column names. + * + * Also contains merge commends that will consolidate the filters + * per column name + */ +class ColumnFilterCollection { + val columnFilterMap = new mutable.HashMap[String, ColumnFilter] + + def clear(): Unit = { + columnFilterMap.clear() + } + + /** + * This will allow us to merge filter logic that is joined to the existing filter + * through a OR operator. This will merge a single columns filter + * + * @param column The column to be merged + * @param other The other ColumnFilter object to merge + */ + def mergeUnion(column:String, other:ColumnFilter): Unit = { + val existingFilter = columnFilterMap.get(column) + if (existingFilter.isEmpty) { + columnFilterMap.+=((column, other)) + } else { + existingFilter.get.mergeUnion(other) + } + } + + /** + * This will allow us to merge all filters in the existing collection + * to the filters in the other collection. All merges are done as a result + * of a OR operator + * + * @param other The other Column Filter Collection to be merged + */ + def mergeUnion(other:ColumnFilterCollection): Unit = { + other.columnFilterMap.foreach( e => { + mergeUnion(e._1, e._2) + }) + } + + /** + * This will allow us to merge all filters in the existing collection + * to the filters in the other collection. All merges are done as a result + * of a AND operator + * + * @param other The column filter from the other collection + */ + def mergeIntersect(other:ColumnFilterCollection): Unit = { + other.columnFilterMap.foreach( e => { + val existingColumnFilter = columnFilterMap.get(e._1) + if (existingColumnFilter.isEmpty) { + columnFilterMap += e + } else { + existingColumnFilter.get.mergeIntersect(e._2) + } + }) + } + + /** + * This will collect all the filter information in a way that is optimized + * for the HBase filter commend. Allowing the filter to be accessed + * with columnFamily and qualifier information + * + * @param schemaDefinitionMap Schema Map that will help us map the right filters + * to the correct columns + * @return HashMap oc column filters + */ + def generateFamilyQualifiterFilterMap(schemaDefinitionMap: + java.util.HashMap[String, + SchemaQualifierDefinition]): + util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter] = { + val familyQualifierFilterMap = + new util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter]() + + columnFilterMap.foreach( e => { + val definition = schemaDefinitionMap.get(e._1) + //Don't add rowKeyFilter + if (definition.columnFamilyBytes.size > 0) { + familyQualifierFilterMap.put( + new ColumnFamilyQualifierMapKeyWrapper( + definition.columnFamilyBytes, 0, definition.columnFamilyBytes.length, + definition.qualifierBytes, 0, definition.qualifierBytes.length), e._2) + } + }) + familyQualifierFilterMap + } + + override def toString:String = { + val strBuilder = new StringBuilder + columnFilterMap.foreach( e => strBuilder.append(e)) + strBuilder.toString() + } +} + +/** + * Status object to store static functions but also to hold last executed + * information that can be used for unit testing. + */ +object DefaultSourceStaticUtils { + + val rawInteger = new RawInteger + val rawLong = new RawLong + val rawFloat = new RawFloat + val rawDouble = new RawDouble + val rawString = RawString.ASCENDING + + val byteRange = new ThreadLocal[PositionedByteRange]{ + override def initialValue(): PositionedByteRange = { + val range = new SimplePositionedMutableByteRange() + range.setOffset(0) + range.setPosition(0) + } + } + + def getFreshByteRange(bytes:Array[Byte]): PositionedByteRange = { + getFreshByteRange(bytes, 0, bytes.length) + } + + def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int): PositionedByteRange = { + byteRange.get().set(bytes).setLength(length).setOffset(offset) + } + + //This will contain the last 5 filters and required fields used in buildScan + // These values can be used in unit testing to make sure we are converting + // The Spark SQL input correctly + val lastFiveExecutionRules = + new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]() + + /** + * This method is to populate the lastFiveExecutionRules for unit test perposes + * This method is not thread safe. + * + * @param columnFilterCollection The filters in the last job + * @param requiredQualifierDefinitionArray The required columns in the last job + */ + def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection, + requiredQualifierDefinitionArray: + mutable.MutableList[SchemaQualifierDefinition]):Unit = { + lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting( + columnFilterCollection, requiredQualifierDefinitionArray)) + while (lastFiveExecutionRules.size() > 5) { + lastFiveExecutionRules.poll() + } + } + + /** + * This method will convert the result content from HBase into the + * SQL value type that is requested by the Spark SQL schema definition + * + * @param columnName The name of the SparkSQL Column + * @param schemaMappingDefinition The schema definition map + * @param r The result object from HBase + * @return The converted object type + */ + def getValue(columnName: String, + schemaMappingDefinition: + java.util.HashMap[String, SchemaQualifierDefinition], + r: Result): Any = { + + val columnDef = schemaMappingDefinition.get(columnName) + + if (columnDef == null) throw new IllegalArgumentException("Unknown column:" + columnName) + + + if (columnDef.columnFamilyBytes.isEmpty) { + val row = r.getRow + + columnDef.columnSparkSqlType match { + case IntegerType => rawInteger.decode(getFreshByteRange(row)) + case LongType => rawLong.decode(getFreshByteRange(row)) + case FloatType => rawFloat.decode(getFreshByteRange(row)) + case DoubleType => rawDouble.decode(getFreshByteRange(row)) + case StringType => rawString.decode(getFreshByteRange(row)) + case TimestampType => rawLong.decode(getFreshByteRange(row)) + case _ => Bytes.toString(row) + } + } else { + val cellByteValue = + r.getColumnLatestCell(columnDef.columnFamilyBytes, columnDef.qualifierBytes) + if (cellByteValue == null) null + else columnDef.columnSparkSqlType match { + case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength)) + case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength)) + case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength)) + case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength)) + case StringType => Bytes.toString(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength) + case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength)) + case _ => Bytes.toString(cellByteValue.getValueArray, + cellByteValue.getValueOffset, cellByteValue.getValueLength) + } + } + } + + /** + * This will convert the value from SparkSQL to be stored into HBase using the + * right byte Type + * + * @param columnName SparkSQL column name + * @param schemaMappingDefinition Schema definition map + * @param value String value from SparkSQL + * @return Returns the byte array to go into HBase + */ + def getByteValue(columnName: String, + schemaMappingDefinition: + java.util.HashMap[String, SchemaQualifierDefinition], + value: String): Array[Byte] = { + + val columnDef = schemaMappingDefinition.get(columnName) + + if (columnDef == null) { + throw new IllegalArgumentException("Unknown column:" + columnName) + } else { + columnDef.columnSparkSqlType match { + case IntegerType => + val result = new Array[Byte](Bytes.SIZEOF_INT) + val localDataRange = getFreshByteRange(result) + rawInteger.encode(localDataRange, value.toInt) + localDataRange.getBytes + case LongType => + val result = new Array[Byte](Bytes.SIZEOF_LONG) + val localDataRange = getFreshByteRange(result) + rawLong.encode(localDataRange, value.toLong) + localDataRange.getBytes + case FloatType => + val result = new Array[Byte](Bytes.SIZEOF_FLOAT) + val localDataRange = getFreshByteRange(result) + rawFloat.encode(localDataRange, value.toFloat) + localDataRange.getBytes + case DoubleType => + val result = new Array[Byte](Bytes.SIZEOF_DOUBLE) + val localDataRange = getFreshByteRange(result) + rawDouble.encode(localDataRange, value.toDouble) + localDataRange.getBytes + case StringType => + Bytes.toBytes(value) + case TimestampType => + val result = new Array[Byte](Bytes.SIZEOF_LONG) + val localDataRange = getFreshByteRange(result) + rawLong.encode(localDataRange, value.toLong) + localDataRange.getBytes + + case _ => Bytes.toBytes(value) + } + } + } +} + +class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection, + val requiredQualifierDefinitionArray: + mutable.MutableList[SchemaQualifierDefinition]) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 9d14e2224db..57ae6b0e7db 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -68,6 +68,8 @@ class HBaseContext(@transient sc: SparkContext, val broadcastedConf = sc.broadcast(new SerializableWritable(config)) val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) + LatestHBaseContextCache.latest = this + if (tmpHdfsConfgFile != null && config != null) { val fs = FileSystem.newInstance(config) val tmpPath = new Path(tmpHdfsConfgFile) @@ -838,3 +840,7 @@ class HBaseContext(@transient sc: SparkContext, } } } + +object LatestHBaseContextCache { + var latest:HBaseContext = null +} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala new file mode 100644 index 00000000000..fb475b0fab0 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.client.{Put, ConnectionFactory} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility} +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkContext, Logging} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +class DefaultSourceSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + + var sqlContext:SQLContext = null + var df:DataFrame = null + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + catch { + case e: Exception => logInfo(" - no table " + tableName + " found") + + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + sc = new SparkContext("local", "test") + + val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1)) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) + table.put(put) + put = new Put(Bytes.toBytes("get4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR")) + table.put(put) + put = new Put(Bytes.toBytes("get5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) + table.put(put) + } finally { + table.close() + connection.close() + } + + new HBaseContext(sc, TEST_UTIL.getConfiguration) + sqlContext = new SQLContext(sc) + + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,", + "hbase.table" -> "t1", + "hbase.batching.num" -> "100", + "cachingNum" -> "100")) + + df.registerTempTable("hbaseTmp") + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + + /** + * A example of query three fields and also only using rowkey points for the filter + */ + test("Test rowKey point only rowKey query") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 3) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 0) + assert(keyFieldFilter.points.length == 3) + assert(Bytes.toString(keyFieldFilter.points.head).equals("get1")) + assert(Bytes.toString(keyFieldFilter.points(1)).equals("get2")) + assert(Bytes.toString(keyFieldFilter.points(2)).equals("get3")) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + /** + * A example of query three fields and also only using cell points for the filter + */ + test("Test cell point only rowKey query") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 3) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 2) + val bFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get + assert(bFieldFilter.ranges.length == 0) + assert(bFieldFilter.points.length == 2) + val aFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("A_FIELD").get + assert(aFieldFilter.ranges.length == 0) + assert(aFieldFilter.points.length == 1) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + /** + * A example of a OR merge between to ranges the result is one range + * Also an example of less then and greater then + */ + test("Test two range rowKey query") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 3) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 2) + assert(keyFieldFilter.points.length == 0) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + /** + * A example of a AND merge between to ranges the result is one range + * Also an example of less then and equal to and greater then and equal to + */ + test("Test one combined range rowKey query") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 2) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 1) + assert(keyFieldFilter.points.length == 0) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + /** + * Do a select with no filters + */ + test("Test select only query") { + + val results = df.select("KEY_FIELD").take(10) + assert(results.length == 5) + + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + assert(executionRules.columnFilterCollection == null) + assert(executionRules.requiredQualifierDefinitionArray.length == 0) + + } + + /** + * A complex query with one point and one range for both the + * rowKey and the a column + */ + test("Test SQL point and range combo") { + val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTmp " + + "WHERE " + + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + + "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 3) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 2) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 1) + assert(keyFieldFilter.ranges.head.upperBound == null) + assert(Bytes.toString(keyFieldFilter.ranges.head.lowerBound).equals("get3")) + assert(keyFieldFilter.ranges.head.isLowerBoundEqualTo) + assert(keyFieldFilter.points.length == 1) + assert(Bytes.toString(keyFieldFilter.points.head).equals("get1")) + + val bFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get + assert(bFieldFilter.ranges.length == 1) + assert(bFieldFilter.ranges.head.lowerBound.length == 0) + assert(Bytes.toString(bFieldFilter.ranges.head.upperBound).equals("3")) + assert(!bFieldFilter.ranges.head.isUpperBoundEqualTo) + assert(bFieldFilter.points.length == 1) + assert(Bytes.toString(bFieldFilter.points.head).equals("8")) + + assert(executionRules.requiredQualifierDefinitionArray.length == 1) + assert(executionRules.requiredQualifierDefinitionArray.head.columnName.equals("B_FIELD")) + assert(executionRules.requiredQualifierDefinitionArray.head.columnFamily.equals("c")) + assert(executionRules.requiredQualifierDefinitionArray.head.qualifier.equals("b")) + } + + /** + * A complex query with two complex ranges that doesn't merge into one + */ + test("Test two complete range non merge rowKey query") { + + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" + + "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + assert(results.length == 4) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 2) + assert(keyFieldFilter.points.length == 0) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + /** + * A complex query with two complex ranges that does merge into one + */ + test("Test two complete range merge rowKey query") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " + + "WHERE " + + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" + + "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10) + + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + + assert(results.length == 5) + + assert(executionRules.columnFilterCollection.columnFilterMap.size == 1) + val keyFieldFilter = + executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get + assert(keyFieldFilter.ranges.length == 1) + assert(keyFieldFilter.points.length == 0) + + assert(executionRules.requiredQualifierDefinitionArray.length == 2) + } + + test("test table that doesn't exist") { + intercept[TableNotFoundException] { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,", + "hbase.table" -> "t1NotThere")) + + df.registerTempTable("hbaseNonExistingTmp") + + sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNonExistingTmp " + + "WHERE " + + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" + + "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count() + } + } + + test("Test table with column that doesn't exist") { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, C_FIELD STRING c:c,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseFactColumnTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseFactColumnTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + } + + test("Test table with INT column") { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD INT c:i,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseIntTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+ + " where I_FIELD > 4 and I_FIELD < 10") + + assert(result.count() == 2) + + val localResult = result.take(3) + + assert(localResult(0).getInt(2) == 8) + } + + test("Test table with INT column defined at wrong type") { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseIntWrongTypeTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + + assert(localResult(0).getString(2).length == 4) + assert(localResult(0).getString(2).charAt(0).toByte == 0) + assert(localResult(0).getString(2).charAt(1).toByte == 0) + assert(localResult(0).getString(2).charAt(2).toByte == 0) + assert(localResult(0).getString(2).charAt(3).toByte == 1) + } + + test("Test improperly formatted column mapping") { + intercept[IllegalArgumentException] { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD,STRING,:key, A_FIELD,STRING,c:a, B_FIELD,STRING,c:b, I_FIELD,STRING,c:i,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseBadTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseBadTmp") + + val localResult = result.take(5) + } + } + + + test("Test bad column type") { + intercept[IllegalArgumentException] { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseIntWrongTypeTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + } + } + + test("Test bad hbase.batching.num type") { + intercept[IllegalArgumentException] { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", + "hbase.table" -> "t1", "hbase.batching.num" -> "foo")) + + df.registerTempTable("hbaseIntWrongTypeTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + } + } + + test("Test bad hbase.caching.num type") { + intercept[IllegalArgumentException] { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", + "hbase.table" -> "t1", "hbase.caching.num" -> "foo")) + + df.registerTempTable("hbaseIntWrongTypeTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + } + } + + test("Test table with sparse column") { + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map("hbase.columns.mapping" -> + "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,", + "hbase.table" -> "t1")) + + df.registerTempTable("hbaseZTmp") + + val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp") + + assert(result.count() == 5) + + val localResult = result.take(5) + + assert(localResult(0).getString(2) == null) + assert(localResult(1).getString(2) == "FOO") + assert(localResult(2).getString(2) == null) + assert(localResult(3).getString(2) == "BAR") + assert(localResult(4).getString(2) == null) + + } +}