HBASE-16261 MultiHFileOutputFormat Enhancement (Yi Liang)

This commit is contained in:
Jerry He 2017-06-01 10:44:17 -07:00
parent 123086edad
commit c7a7f880dd
5 changed files with 892 additions and 324 deletions

View File

@ -166,7 +166,7 @@ public class HFileOutputFormat2
/**
* Mapredue job will create a temp path for outputting results. If out != null, it means that
* the caller has set the temp working dir; If out == null, it means we need to set it here.
* Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us
* Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
* temp working dir at the table level and HFileOutputFormat2 has to set it here within this
* constructor.
*/

View File

@ -1,99 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import com.google.common.annotations.VisibleForTesting;
/**
* Create 3 level tree directory, first level is using table name as parent directory and then use
* family name as child directory, and all related HFiles for one family are under child directory
* -tableName1
* -columnFamilyName1
* -columnFamilyName2
* -HFiles
* -tableName2
* -columnFamilyName1
* -HFiles
* -columnFamilyName2
* <p>
*/
@InterfaceAudience.Public
@VisibleForTesting
public class MultiHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
@Override
public RecordWriter<ImmutableBytesWritable, Cell>
getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
return createMultiHFileRecordWriter(context);
}
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
// Get the path of the output directory
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
// Map of tables to writers
final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters = new HashMap<>();
return new RecordWriter<ImmutableBytesWritable, V>() {
@Override
public void write(ImmutableBytesWritable tableName, V cell)
throws IOException, InterruptedException {
RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
// if there is new table, verify that table directory exists
if (tableWriter == null) {
// using table name as directory name
final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
fs.mkdirs(tableOutputDir);
LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+ tableOutputDir.toString());
// Create writer for one specific table
tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
// Put table into map
tableWriters.put(tableName, tableWriter);
}
// Write <Row, Cell> into tableWriter
// in the original code, it does not use Row
tableWriter.write(null, cell);
}
@Override
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
writer.close(c);
}
}
};
}
}

View File

@ -0,0 +1,509 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.TreeMap;
import java.util.ArrayList;
import java.util.TreeSet;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* Create 3 level tree directory, first level is using table name as parent directory and then use
* family name as child directory, and all related HFiles for one family are under child directory
* -tableName1
* -columnFamilyName1
* -HFile (region1)
* -columnFamilyName2
* -HFile1 (region1)
* -HFile2 (region2)
* -HFile3 (region3)
* -tableName2
* -columnFamilyName1
* -HFile (region1)
* family directory and its hfiles match the output of HFileOutputFormat2
* @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
*/
@InterfaceAudience.Public
@VisibleForTesting
public class MultiTableHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
@Override
public RecordWriter<ImmutableBytesWritable, Cell>
getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
return createMultiHFileRecordWriter(context);
}
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
// Get the path of the output directory
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
// Map of existing tables, avoid calling getTable() everytime
final Map<ImmutableBytesWritable, Table> tables = new HashMap<>();
// Map of tables to writers
final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters = new HashMap<>();
return new RecordWriter<ImmutableBytesWritable, V>() {
@Override
public void write(ImmutableBytesWritable tableName, V cell)
throws IOException, InterruptedException {
RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
// if there is new table, verify that table directory exists
if (tableWriter == null) {
// using table name as directory name
final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
fs.mkdirs(tableOutputDir);
LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+ tableOutputDir.toString());
// Configure for tableWriter, if table exist, write configuration of table into conf
Table table = null;
if (tables.containsKey(tableName)) {
table = tables.get(tableName);
} else {
table = getTable(tableName.copyBytes(), conn, admin);
tables.put(tableName, table);
}
if (table != null) {
configureForOneTable(conf, table.getTableDescriptor());
}
// Create writer for one specific table
tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
// Put table into map
tableWriters.put(tableName, tableWriter);
}
// Write <Row, Cell> into tableWriter
// in the original code, it does not use Row
tableWriter.write(null, cell);
}
@Override
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
writer.close(c);
}
if (conn != null) {
conn.close();
}
if (admin != null) {
admin.close();
}
}
};
}
/**
* Configure for one table, should be used before creating a new HFileRecordWriter,
* Set compression algorithms and related configuration based on column families
*/
private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor)
throws UnsupportedEncodingException {
HFileOutputFormat2.configureCompression(conf, tableDescriptor);
HFileOutputFormat2.configureBlockSize(tableDescriptor, conf);
HFileOutputFormat2.configureBloomType(tableDescriptor, conf);
HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
}
/**
* Configure a MapReduce Job to output HFiles for performing an incremental load into
* the multiple tables.
* <ul>
* <li>Inspects the tables to configure a partitioner based on their region boundaries</li>
* <li>Writes the partitions file and configures the partitioner</li>
* <li>Sets the number of reduce tasks to match the total number of all tables' regions</li>
* <li>Sets the reducer up to perform the appropriate sorting (KeyValueSortReducer)</li>
* </ul>
*
* ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job.
* Caller needs to setup input path, output path and mapper
*
* @param job
* @param tables A list of tables to inspects
* @throws IOException
*/
public static void configureIncrementalLoad(Job job, List<TableName> tables) throws IOException {
configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class);
}
public static void configureIncrementalLoad(Job job, List<TableName> tables,
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys =
MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables);
configureIncrementalLoad(job, tableSplitKeys, cls);
}
/**
* Same purpose as configureIncrementalLoad(Job job, List<TableName> tables)
* Used when region startKeys of each table is available, input as <TableName, List<RegionStartKey>>
*
* Caller needs to transfer TableName and byte[] to ImmutableBytesWritable
*/
public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
List<ImmutableBytesWritable>> tableSplitKeys) throws IOException {
configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class);
}
public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
List<ImmutableBytesWritable>> tableSplitKeys, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
// file path to store <table, splitKey>
String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
LOG.info("Writing partition info into dir: " + partitionsPath.toString());
job.setPartitionerClass(MultiHFilePartitioner.class);
// get split keys for all the tables, and write them into partition file
MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys);
MultiHFilePartitioner.setPartitionFile(conf, partitionsPath);
partitionsPath.getFileSystem(conf).makeQualified(partitionsPath);
partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath);
// now only support Mapper output <ImmutableBytesWritable, KeyValue>
// we can use KeyValueSortReducer directly to sort Mapper output
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
int reducerNum = getReducerNumber(tableSplitKeys);
job.setNumReduceTasks(reducerNum);
LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count");
// setup output format
job.setOutputFormatClass(cls);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
}
/**
* Check if table exist, should not dependent on HBase instance
* @return instance of table, if it exist
*/
private static Table getTable(final byte[] tableName, Connection conn, Admin admin) {
if (conn == null || admin == null) {
LOG.info("can not get Connection or Admin");
return null;
}
try {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
return conn.getTable(table);
}
} catch (IOException e) {
LOG.info("Exception found in getTable()" + e.toString());
return null;
}
LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist");
return null;
}
/**
* Get the number of reducers by tables' split keys
*/
private static int getReducerNumber(
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys) {
int reducerNum = 0;
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : tableSplitKeys.entrySet()) {
reducerNum += entry.getValue().size();
}
return reducerNum;
}
/**
* MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner
* The input is partitioned based on table's name and its region boundaries with the table.
* Two records are in the same partition if they have same table name and the their cells are
* in the same region
*/
static class MultiHFilePartitioner extends Partitioner<ImmutableBytesWritable, Cell>
implements Configurable {
public static final String DEFAULT_PATH = "_partition_multihfile.lst";
public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path";
private Configuration conf;
// map to receive <table, splitKeys> from file
private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> table_SplitKeys;
// each <table,splitKey> pair is map to one unique integer
private TreeMap<TableSplitKeyPair, Integer> partitionMap;
@Override
public void setConf(Configuration conf) {
try {
this.conf = conf;
partitionMap = new TreeMap<>();
table_SplitKeys = readTableSplitKeys(conf);
// initiate partitionMap by table_SplitKeys map
int splitNum = 0;
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : table_SplitKeys.entrySet()) {
ImmutableBytesWritable table = entry.getKey();
List<ImmutableBytesWritable> list = entry.getValue();
for (ImmutableBytesWritable splitKey : list) {
partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
@Override
public Configuration getConf() {
return conf;
}
/**
* Set the path to the SequenceFile storing the sorted <table, splitkey>. It must be the case
* that for <tt>R</tt> reduces, there are <tt>R-1</tt> keys in the SequenceFile.
*/
public static void setPartitionFile(Configuration conf, Path p) {
conf.set(PARTITIONER_PATH, p.toString());
}
/**
* Get the path to the SequenceFile storing the sorted <table, splitkey>.
* @see #setPartitionFile(Configuration, Path)
*/
public static String getPartitionFile(Configuration conf) {
return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
}
/**
* Return map of <tableName, the start keys of all of the regions in this table>
*/
public static Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> getTablesRegionStartKeys(
Configuration conf, List<TableName> tables) throws IOException {
final TreeMap<ImmutableBytesWritable, List<ImmutableBytesWritable>> ret = new TreeMap<>();
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
LOG.info("Looking up current regions for tables");
for (TableName tName : tables) {
RegionLocator table = conn.getRegionLocator(tName);
// if table not exist, use default split keys for this table
byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY };
if (admin.tableExists(tName)) {
byteKeys = table.getStartKeys();
}
List<ImmutableBytesWritable> tableStartKeys = new ArrayList<>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
tableStartKeys.add(new ImmutableBytesWritable(byteKey));
}
ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys);
}
return ret;
}
}
/**
* write <tableName, start key of each region in table> into sequence file in order,
* and this format can be parsed by MultiHFilePartitioner
*/
public static void writeTableSplitKeys(Configuration conf, Path partitionsPath,
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (map == null || map.isEmpty()) {
throw new IllegalArgumentException("No regions passed for all tables");
}
SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath),
Writer.keyClass(ImmutableBytesWritable.class),
Writer.valueClass(ImmutableBytesWritable.class));
try {
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : map.entrySet()) {
ImmutableBytesWritable table = entry.getKey();
List<ImmutableBytesWritable> list = entry.getValue();
if (list == null) {
throw new IOException("Split keys for a table can not be null");
}
TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(list);
ImmutableBytesWritable first = sorted.first();
if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
throw new IllegalArgumentException(
"First region of table should have empty start key. Instead has: "
+ Bytes.toStringBinary(first.get()));
}
for (ImmutableBytesWritable startKey : sorted) {
writer.append(table, startKey);
}
}
} finally {
writer.close();
}
}
/**
* read partition file into map <table, splitKeys of this table>
*/
private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> readTableSplitKeys(
Configuration conf) throws IOException {
String parts = getPartitionFile(conf);
LOG.info("Read partition info from file: " + parts);
final Path partFile = new Path(parts);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile));
// values are already sorted in file, so use list
final Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map =
new TreeMap<>();
// key and value have same type
ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
ImmutableBytesWritable value =
ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
try {
while (reader.next(key, value)) {
List<ImmutableBytesWritable> list = map.get(key);
if (list == null) {
list = new ArrayList<>();
}
list.add(value);
map.put(key, list);
key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
}
} finally {
IOUtils.cleanup(LOG, reader);
}
return map;
}
@Override
public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) {
byte[] row = CellUtil.cloneRow(value);
final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row);
ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
//find splitKey by input rowKey
if (table_SplitKeys.containsKey(table)) {
List<ImmutableBytesWritable> list = table_SplitKeys.get(table);
int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator());
if (index < 0) {
index = (index + 1) * (-1) - 1;
} else if (index == list.size()) {
index -= 1;
}
if (index < 0) {
index = 0;
LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY ");
}
splitId = list.get(index);
}
// find the id of the reducer for the input
Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId));
if (id == null) {
LOG.warn("Can not get reducer id for input record");
return -1;
}
return id.intValue() % numPartitions;
}
/**
* A class store pair<TableName, SplitKey>, has two main usage
* 1. store tableName and one of its splitKey as a pair
* 2. implement comparable, so that partitioner can find splitKey of its input cell
*/
static class TableSplitKeyPair extends Pair<ImmutableBytesWritable, ImmutableBytesWritable>
implements Comparable<TableSplitKeyPair> {
private static final long serialVersionUID = -6485999667666325594L;
public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) {
super(a, b);
}
@Override
public int compareTo(TableSplitKeyPair other) {
if (this.getFirst().equals(other.getFirst())) {
return this.getSecond().compareTo(other.getSecond());
}
return this.getFirst().compareTo(other.getFirst());
}
}
}
}

View File

@ -1,224 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
* writes hfiles.
*/
@Category(MediumTests.class)
public class TestMultiHFileOutputFormat {
private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
private HBaseTestingUtility util = new HBaseTestingUtility();
private static int ROWSPERSPLIT = 10;
private static final int KEYLEN_DEFAULT = 10;
private static final String KEYLEN_CONF = "randomkv.key.length";
private static final int VALLEN_DEFAULT = 10;
private static final String VALLEN_CONF = "randomkv.val.length";
private static final byte[][] TABLES =
{ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
private static final byte[][] FAMILIES =
{ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
private static final byte[] QUALIFIER = Bytes.toBytes("data");
public static void main(String[] args) throws Exception {
new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
}
/**
* Run small MR job. this MR job will write HFile into
* testWritingDataIntoHFiles/tableNames/columFamilies/
*/
@Test
public void testWritingDataIntoHFiles() throws Exception {
Configuration conf = util.getConfiguration();
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
FileSystem fs = testDir.getFileSystem(conf);
LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files by setting max file size.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
try {
Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
FileOutputFormat.setOutputPath(job, testDir);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setReducerClass(Table_KeyValueSortReducer.class);
job.setOutputFormatClass(MultiHFileOutputFormat.class);
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("\nStarting test testWritingDataIntoHFiles\n");
assertTrue(job.waitForCompletion(true));
LOG.info("\nWaiting on checking MapReduce output\n");
assertTrue(checkMROutput(fs, testDir, 0));
} finally {
testDir.getFileSystem(conf).delete(testDir, true);
util.shutdownMiniCluster();
}
}
/**
* MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
* created directory is correct or not A recursion method, the testDir had better be small size
*/
private boolean checkMROutput(FileSystem fs, Path testDir, int level)
throws FileNotFoundException, IOException {
if (level >= 3) {
return HFile.isHFileFormat(fs, testDir);
}
FileStatus[] fStats = fs.listStatus(testDir);
if (fStats == null || fStats.length <= 0) {
LOG.info("Created directory format is not correct");
return false;
}
for (FileStatus stats : fStats) {
// skip the _SUCCESS file created by MapReduce
if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
continue;
if (level < 2 && !stats.isDirectory()) {
LOG.info("Created directory format is not correct");
return false;
}
boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
if (flag == false) return false;
}
return true;
}
/**
* Simple mapper that makes <TableName, KeyValue> output. With no input data
*/
static class Random_TableKV_GeneratingMapper
extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
private int keyLength;
private int valLength;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
@Override
protected void map(NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
ArrayList<ImmutableBytesWritable> tables = new ArrayList<>();
for (int i = 0; i < TABLES.length; i++) {
tables.add(new ImmutableBytesWritable(TABLES[i]));
}
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
for (ImmutableBytesWritable table : tables) {
for (byte[] family : FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(table, kv);
}
}
}
}
}
/**
* Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
* <TableName, KeyValue>, with KeyValues are ordered
*/
static class Table_KeyValueSortReducer
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<>(KeyValue.COMPARATOR);
for (KeyValue kv : kvs) {
try {
map.add(kv.clone());
} catch (CloneNotSupportedException e) {
throw new java.io.IOException(e);
}
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv : map) {
context.write(table, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}
}

View File

@ -0,0 +1,382 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
* writes hfiles.
*/
@Category(MediumTests.class)
public class TestMultiTableHFileOutputFormat {
private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class);
private HBaseTestingUtility util = new HBaseTestingUtility();
private static int ROWSPERSPLIT = 10;
private static final int KEYLEN_DEFAULT = 10;
private static final String KEYLEN_CONF = "randomkv.key.length";
private static final int VALLEN_DEFAULT = 10;
private static final String VALLEN_CONF = "randomkv.val.length";
private static final byte[][] TABLES =
{ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
private static final byte[][] FAMILIES =
{ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
private static final byte[] QUALIFIER = Bytes.toBytes("data");
/**
* Run small MR job. this MR job will write HFile into
* testWritingDataIntoHFiles/tableNames/columnFamilies/
*/
@Test
public void testWritingDataIntoHFiles() throws Exception {
Configuration conf = util.getConfiguration();
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
FileSystem fs = testDir.getFileSystem(conf);
LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files by setting max file size.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
try {
Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
FileOutputFormat.setOutputPath(job, testDir);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setReducerClass(Table_KeyValueSortReducer.class);
job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("\nStarting test testWritingDataIntoHFiles\n");
assertTrue(job.waitForCompletion(true));
LOG.info("\nWaiting on checking MapReduce output\n");
assertTrue(checkMROutput(fs, testDir, 0));
} finally {
testDir.getFileSystem(conf).delete(testDir, true);
util.shutdownMiniCluster();
}
}
/**
* check whether create directory and hfiles as format designed in MultiHFilePartitioner
* and also check whether the output file has same related configuration as created table
*/
@Test
public void testMultiHFilePartitioner() throws Exception {
Configuration conf = util.getConfiguration();
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner");
FileSystem fs = testDir.getFileSystem(conf);
LOG.info("testMultiHFilePartitioner dir writing to : " + testDir);
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files by setting max file size.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
// Create several tables for testing
List<TableName> tables = new ArrayList<TableName>();
// to store splitKeys for TABLE[0] for testing;
byte[][] testKeys = new byte[0][0];
for (int i = 0; i < TABLES.length; i++) {
TableName tableName = TableName.valueOf(TABLES[i]);
byte[][] splitKeys = generateRandomSplitKeys(3);
if (i == 0) {
testKeys = splitKeys;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
for (int j = 0; j < FAMILIES.length; j++) {
HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]);
//only set Tables[0] configuration, and specify compression type and DataBlockEncode
if (i == 0) {
familyDescriptor.setCompressionType(Compression.Algorithm.GZ);
familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
}
tableDescriptor.addFamily(familyDescriptor);
}
util.createTable(tableDescriptor, splitKeys, conf);
tables.add(tableName);
}
// set up for MapReduce job
try {
Job job = Job.getInstance(conf, "testMultiHFilePartitioner");
FileOutputFormat.setOutputPath(job, testDir);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables);
LOG.info("Starting test testWritingDataIntoHFiles");
assertTrue(job.waitForCompletion(true));
LOG.info("Waiting on checking MapReduce output");
assertTrue(checkMROutput(fs, testDir, 0));
assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys));
} finally {
for (int i = 0; i < TABLES.length; i++) {
TableName tName = TableName.valueOf(TABLES[i]);
util.deleteTable(tName);
}
fs.delete(testDir, true);
fs.close();
util.shutdownMiniCluster();
}
}
/**
* check the output hfile has same configuration as created test table
* and also check whether hfiles get split correctly
* only check TABLES[0]
*/
private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException {
FileStatus[] fStats = fs.listStatus(testDir);
for (FileStatus stats : fStats) {
if (stats.getPath().getName().equals(new String(TABLES[0]))) {
FileStatus[] cfStats = fs.listStatus(stats.getPath());
for (FileStatus cfstat : cfStats) {
FileStatus[] hfStats = fs.listStatus(cfstat.getPath());
List<byte[]> firsttKeys = new ArrayList<byte[]>();
List<byte[]> lastKeys = new ArrayList<byte[]>();
for (FileStatus hfstat : hfStats) {
if (HFile.isHFileFormat(fs, hfstat)) {
HFile.Reader hfr =
HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf);
if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr
.getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false;
firsttKeys.add(hfr.getFirstRowKey());
lastKeys.add(hfr.getLastRowKey());
}
}
if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) {
return false;
}
}
}
}
return true;
}
/**
* Check whether the Hfile has been split by region boundaries
* @param splitKeys split keys for that table
* @param firstKeys first rowKey for hfiles
* @param lastKeys last rowKey for hfiles
*/
private boolean checkFileSplit(byte[][] splitKeys, List<byte[]> firstKeys, List<byte[]> lastKeys) {
Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR);
Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR);
Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR);
int is = 0, il = 0;
for (byte[] key : lastKeys) {
while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++;
if (is == splitKeys.length) {
break;
}
if (is > 0) {
if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false;
}
il++;
}
if (is == splitKeys.length) {
return il == lastKeys.size() - 1;
}
return true;
}
/**
* MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
* created directory is correct or not A recursion method, the testDir had better be small size
*/
private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException {
if (level >= 3) {
return HFile.isHFileFormat(fs, testDir);
}
FileStatus[] fStats = fs.listStatus(testDir);
if (fStats == null || fStats.length <= 0) {
LOG.info("Created directory format is not correct");
return false;
}
for (FileStatus stats : fStats) {
// skip the _SUCCESS file created by MapReduce
if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
continue;
if (level < 2 && !stats.isDirectory()) {
LOG.info("Created directory format is not correct");
return false;
}
boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
if (flag == false) return false;
}
return true;
}
private byte[][] generateRandomSplitKeys(int numKeys) {
Random random = new Random();
byte[][] ret = new byte[numKeys][];
for (int i = 0; i < numKeys; i++) {
ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT);
}
return ret;
}
/**
* Simple mapper that makes <TableName, KeyValue> output. With no input data
*/
static class Random_TableKV_GeneratingMapper
extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
private int keyLength;
private int valLength;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
@Override
protected void map(NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
ArrayList<ImmutableBytesWritable> tables = new ArrayList<>();
for (int i = 0; i < TABLES.length; i++) {
tables.add(new ImmutableBytesWritable(TABLES[i]));
}
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
for (ImmutableBytesWritable table : tables) {
for (byte[] family : FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(table, kv);
}
}
}
}
}
/**
* Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
* <TableName, KeyValue>, with KeyValues are ordered
*/
static class Table_KeyValueSortReducer
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
for (KeyValue kv : kvs) {
try {
map.add(kv.clone());
} catch (CloneNotSupportedException e) {
throw new java.io.IOException(e);
}
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv : map) {
context.write(table, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}
}