HBASE-16823 Add examples in HBase Spark module (Weiqing Yang)

This commit is contained in:
tedyu 2016-10-14 10:19:54 -07:00
parent a68c0e2a34
commit 444dc866c0
3 changed files with 426 additions and 0 deletions

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.datasources
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.spark.AvroSerdes
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @param col0 Column #0, Type is String
* @param col1 Column #1, Type is Array[Byte]
*/
case class AvroHBaseRecord(col0: String,
col1: Array[Byte])
object AvroHBaseRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]},
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
| ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name${"%03d".format(i)}")
user.put("favorite_number", i)
user.put("favorite_color", s"color${"%03d".format(i)}")
val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
favoriteArray.add(s"number${i}")
favoriteArray.add(s"number${i+1}")
user.put("favorite_array", favoriteArray)
import collection.JavaConverters._
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
user.put("favorite_map", favoriteMap)
val avroByte = AvroSerdes.serialize(user, avroSchema)
AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
}
}
object AvroSource {
def catalog = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def avroCatalogInsert = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotableInsert"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("AvroSourceExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val data = (0 to 255).map { i =>
AvroHBaseRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val df = withCatalog(catalog)
df.show
df.printSchema()
df.registerTempTable("ExampleAvrotable")
val c = sqlContext.sql("select count(1) from ExampleAvrotable")
c.show
val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
filtered.show
val collected = filtered.collect()
if (collected(0).getSeq[String](1)(0) != "number1") {
throw new UserCustomizedSampleException("value invalid")
}
if (collected(0).getSeq[String](1)(1) != "number2") {
throw new UserCustomizedSampleException("value invalid")
}
df.write.options(
Map("avroSchema"->AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog->avroCatalogInsert,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val newDF = withCatalog(avroCatalogInsert)
newDF.show
newDF.printSchema()
if(newDF.count() != 256) {
throw new UserCustomizedSampleException("value invalid")
}
df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show
df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.datasources
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
class UserCustomizedSampleException(message: String = null, cause: Throwable = null) extends
RuntimeException(UserCustomizedSampleException.message(message, cause), cause)
object UserCustomizedSampleException {
def message(message: String, cause: Throwable) =
if (message != null) message
else if (cause != null) cause.toString()
else null
}
case class IntKeyRecord(
col0: Integer,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object IntKeyRecord {
def apply(i: Int): IntKeyRecord = {
IntKeyRecord(if (i % 2 == 0) i else -i,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i extra",
i.toByte)
}
}
object DataType {
val cat = s"""{
|"table":{"namespace":"default", "name":"DataTypeExampleTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
def main(args: Array[String]){
val sparkConf = new SparkConf().setAppName("DataTypeExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
// test populate table
val data = (0 until 32).map { i =>
IntKeyRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
// test less than 0
val df = withCatalog(cat)
val s = df.filter($"col0" < 0)
s.show
if(s.count() != 16){
throw new UserCustomizedSampleException("value invalid")
}
//test less or equal than -10. The number of results is 11
val num1 = df.filter($"col0" <= -10)
num1.show
val c1 = num1.count()
println(s"test result count should be 11: $c1")
//test less or equal than -9. The number of results is 12
val num2 = df.filter($"col0" <= -9)
num2.show
val c2 = num2.count()
println(s"test result count should be 12: $c2")
//test greater or equal than -9". The number of results is 21
val num3 = df.filter($"col0" >= -9)
num3.show
val c3 = num3.count()
println(s"test result count should be 21: $c3")
//test greater or equal than 0. The number of results is 16
val num4 = df.filter($"col0" >= 0)
num4.show
val c4 = num4.count()
println(s"test result count should be 16: $c4")
//test greater than 10. The number of results is 10
val num5 = df.filter($"col0" > 10)
num5.show
val c5 = num5.count()
println(s"test result count should be 10: $c5")
// test "and". The number of results is 11
val num6 = df.filter($"col0" > -10 && $"col0" <= 10)
num6.show
val c6 = num6.count()
println(s"test result count should be 11: $c6")
//test "or". The number of results is 21
val num7 = df.filter($"col0" <= -10 || $"col0" > 10)
num7.show
val c7 = num7.count()
println(s"test result count should be 21: $c7")
//test "all". The number of results is 32
val num8 = df.filter($"col0" >= -100)
num8.show
val c8 = num8.count()
println(s"test result count should be 32: $c8")
//test "full query"
val df1 = withCatalog(cat)
df1.show()
val c_df = df1.count()
println(s"df count should be 32: $c_df")
if(c_df != 32){
throw new UserCustomizedSampleException("value invalid")
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.datasources
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord {
def apply(i: Int): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i extra",
i.toByte)
}
}
object HBaseSource {
val cat = s"""{
|"table":{"namespace":"default", "name":"HBaseSourceExampleTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseSourceExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val data = (0 to 255).map { i =>
HBaseRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val df = withCatalog(cat)
df.show
df.filter($"col0" <= "row005")
.select($"col0", $"col1").show
df.filter($"col0" === "row005" || $"col0" <= "row005")
.select($"col0", $"col1").show
df.filter($"col0" > "row250")
.select($"col0", $"col1").show
df.registerTempTable("table1")
val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'")
c.show()
}
}