HBASE-17933: [hbase-spark] Support Java api for bulkload

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Yi Liang 2017-04-21 18:10:03 -07:00 committed by Sean Busbey
parent 9a1aff447e
commit 49f707fba7
4 changed files with 371 additions and 12 deletions

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.spark.KeyFamilyQualifier;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Run this example using command below:
*
* SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample
* path/to/hbase-spark.jar {path/to/output/HFiles}
*
* This example will output put hfiles in {path/to/output/HFiles}, and user can run
* 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
*/
final public class JavaHBaseBulkLoadExample {
private JavaHBaseBulkLoadExample() {}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("JavaHBaseBulkLoadExample " + "{outputPath}");
return;
}
String tableName = "bulkload-table-test";
String columnFamily1 = "f1";
String columnFamily2 = "f2";
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<String> list= new ArrayList<String>();
// row1
list.add("1," + columnFamily1 + ",b,1");
// row3
list.add("3," + columnFamily1 + ",a,2");
list.add("3," + columnFamily1 + ",b,1");
list.add("3," + columnFamily2 + ",a,1");
/* row2 */
list.add("2," + columnFamily2 + ",a,3");
list.add("2," + columnFamily2 + ",b,3");
JavaRDD<String> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0],
new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
} finally {
jsc.stop();
}
}
public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
@Override
public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
if (v1 == null)
return null;
String[] strs = v1.split(",");
if(strs.length != 4)
return null;
KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
Bytes.toBytes(strs[2]));
return new Pair(kfq, Bytes.toBytes(strs[3]));
}
}
}

View File

@ -55,4 +55,14 @@ class FamiliesQualifiersValues extends Serializable {
qualifierValues.put(new ByteArrayWrapper(qualifier), value)
}
}
/**
* A wrapper for "+=" method above, can be used by Java
* @param family HBase column family
* @param qualifier HBase column qualifier
* @param value HBase cell value
*/
def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
this += (family, qualifier, value)
}
}

View File

@ -17,9 +17,12 @@
package org.apache.hadoop.hbase.spark
import java.util.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.hbase.classification.InterfaceAudience
import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@ -268,7 +271,6 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize The number of gets to be batched together
* @param javaDStream Original DStream with data to iterate over
@ -291,6 +293,67 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
(r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.bulkLoad method.
* It allow addition support for a user to take a JavaRDD and
* convert into new JavaRDD[Pair] based on MapFunction,
* and HFiles will be generated in stagingDir for bulk load
*
* @param javaRdd The javaRDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunc A Function that will convert a value in JavaRDD
* to Pair(KeyFamilyQualifier, Array[Byte])
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
*/
def bulkLoad[T](javaRdd: JavaRDD[T],
tableName: TableName,
mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
stagingDir: String,
familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean,
maxSize: Long):
Unit = {
hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
val keyFamilyQualifier = t.getFirst
val value = t.getSecond
Seq((keyFamilyQualifier, value)).iterator
}, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
}
/**
* A simple abstraction over the HBaseContext.bulkLoadThinRows method.
* It allow addition support for a user to take a JavaRDD and
* convert into new JavaRDD[Pair] based on MapFunction,
* and HFiles will be generated in stagingDir for bulk load
*
* @param javaRdd The javaRDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunc A Function that will convert a value in JavaRDD
* to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
*/
def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
tableName: TableName,
mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
stagingDir: String,
familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean,
maxSize: Long):
Unit = {
hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
tableName, t => {
(t.getFirst, t.getSecond)
}, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
}
/**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new JavaRDD
@ -341,4 +404,5 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}

View File

@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.spark;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@ -38,17 +44,24 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.*;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.Tuple2;
import com.google.common.io.Files;
@Category({MiscTests.class, MediumTests.class})
@ -58,19 +71,22 @@ public class TestJavaHBaseContext implements Serializable {
protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
byte[] tableName = Bytes.toBytes("t1");
byte[] columnFamily = Bytes.toBytes("c");
byte[] columnFamily1 = Bytes.toBytes("d");
String columnFamilyStr = Bytes.toString(columnFamily);
String columnFamilyStr1 = Bytes.toString(columnFamily1);
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
jsc.addJar("spark.jar");
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
htu = HBaseTestingUtility.createLocalHTU();
htu = new HBaseTestingUtility();
try {
LOG.info("cleaning up test dir");
@ -91,7 +107,7 @@ public class TestJavaHBaseContext implements Serializable {
LOG.info(" - creating table " + Bytes.toString(tableName));
htu.createTable(TableName.valueOf(tableName),
columnFamily);
new byte[][]{columnFamily, columnFamily1});
LOG.info(" - created table");
} catch (Exception e1) {
throw new RuntimeException(e1);
@ -278,6 +294,173 @@ public class TestJavaHBaseContext implements Serializable {
Assert.assertEquals(stringJavaRDD.count(), 5);
}
@Test
public void testBulkLoad() throws Exception {
Path output = htu.getDataTestDir("testBulkLoad");
// Add cell as String: "row,falmily,qualifier,value"
List<String> list= new ArrayList<String>();
// row1
list.add("1," + columnFamilyStr + ",b,1");
// row3
list.add("3," + columnFamilyStr + ",a,2");
list.add("3," + columnFamilyStr + ",b,1");
list.add("3," + columnFamilyStr1 + ",a,1");
//row2
list.add("2," + columnFamilyStr + ",a,3");
list.add("2," + columnFamilyStr + ",b,3");
JavaRDD<String> rdd = jsc.parallelize(list);
Configuration conf = htu.getConfiguration();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(),
new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
Table table = conn.getTable(TableName.valueOf(tableName));
// Do bulk load
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
// Check row1
List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
Assert.assertEquals(cell1.size(), 1);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
// Check row3
List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
Assert.assertEquals(cell3.size(), 3);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
// Check row2
List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
Assert.assertEquals(cell2.size(), 2);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
}
}
@Test
public void testBulkLoadThinRows() throws Exception {
Path output = htu.getDataTestDir("testBulkLoadThinRows");
// because of the limitation of scala bulkLoadThinRows API
// we need to provide data as <row, all cells in that row>
List<List<String>> list= new ArrayList<List<String>>();
// row1
List<String> list1 = new ArrayList<String>();
list1.add("1," + columnFamilyStr + ",b,1");
list.add(list1);
// row3
List<String> list3 = new ArrayList<String>();
list3.add("3," + columnFamilyStr + ",a,2");
list3.add("3," + columnFamilyStr + ",b,1");
list3.add("3," + columnFamilyStr1 + ",a,1");
list.add(list3);
//row2
List<String> list2 = new ArrayList<String>();
list2.add("2," + columnFamilyStr + ",a,3");
list2.add("2," + columnFamilyStr + ",b,3");
list.add(list2);
JavaRDD<List<String>> rdd = jsc.parallelize(list);
Configuration conf = htu.getConfiguration();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(),
new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
Table table = conn.getTable(TableName.valueOf(tableName));
// Do bulk load
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
// Check row1
List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
Assert.assertEquals(cell1.size(), 1);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
// Check row3
List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
Assert.assertEquals(cell3.size(), 3);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
// Check row2
List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
Assert.assertEquals(cell2.size(), 2);
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
}
}
public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
@Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
if (v1 == null)
return null;
String[] strs = v1.split(",");
if(strs.length != 4)
return null;
KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
Bytes.toBytes(strs[2]));
return new Pair(kfq, Bytes.toBytes(strs[3]));
}
}
public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> {
@Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception {
if (list == null)
return null;
ByteArrayWrapper rowKey = null;
FamiliesQualifiersValues fqv = new FamiliesQualifiersValues();
for (String cell : list) {
String[] strs = cell.split(",");
if (rowKey == null) {
rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0]));
}
fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3]));
}
return new Pair(rowKey, fqv);
}
}
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
@ -335,4 +518,4 @@ public class TestJavaHBaseContext implements Serializable {
}
}
}
}