From c7a7f880dd99a29183e54f0092c10e7a70186d9d Mon Sep 17 00:00:00 2001 From: Jerry He Date: Thu, 1 Jun 2017 10:44:17 -0700 Subject: [PATCH] HBASE-16261 MultiHFileOutputFormat Enhancement (Yi Liang) --- .../hbase/mapreduce/HFileOutputFormat2.java | 2 +- .../mapreduce/MultiHFileOutputFormat.java | 99 ---- .../MultiTableHFileOutputFormat.java | 509 ++++++++++++++++++ .../mapreduce/TestMultiHFileOutputFormat.java | 224 -------- .../TestMultiTableHFileOutputFormat.java | 382 +++++++++++++ 5 files changed, 892 insertions(+), 324 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 5b1f13cc274..da507b13d0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -166,7 +166,7 @@ public class HFileOutputFormat2 /** * Mapredue job will create a temp path for outputting results. If out != null, it means that * the caller has set the temp working dir; If out == null, it means we need to set it here. - * Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us + * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us * temp working dir at the table level and HFileOutputFormat2 has to set it here within this * constructor. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java deleted file mode 100644 index 3c90b5987a8..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; - -import com.google.common.annotations.VisibleForTesting; -/** - * Create 3 level tree directory, first level is using table name as parent directory and then use - * family name as child directory, and all related HFiles for one family are under child directory - * -tableName1 - * -columnFamilyName1 - * -columnFamilyName2 - * -HFiles - * -tableName2 - * -columnFamilyName1 - * -HFiles - * -columnFamilyName2 - *

- */ -@InterfaceAudience.Public -@VisibleForTesting -public class MultiHFileOutputFormat extends FileOutputFormat { - private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class); - - @Override - public RecordWriter - getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { - return createMultiHFileRecordWriter(context); - } - - static RecordWriter - createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException { - - // Get the path of the output directory - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); - final Configuration conf = context.getConfiguration(); - final FileSystem fs = outputDir.getFileSystem(conf); - - // Map of tables to writers - final Map> tableWriters = new HashMap<>(); - - return new RecordWriter() { - @Override - public void write(ImmutableBytesWritable tableName, V cell) - throws IOException, InterruptedException { - RecordWriter tableWriter = tableWriters.get(tableName); - // if there is new table, verify that table directory exists - if (tableWriter == null) { - // using table name as directory name - final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes())); - fs.mkdirs(tableOutputDir); - LOG.info("Writing Table '" + tableName.toString() + "' data into following directory" - + tableOutputDir.toString()); - - // Create writer for one specific table - tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir); - // Put table into map - tableWriters.put(tableName, tableWriter); - } - // Write into tableWriter - // in the original code, it does not use Row - tableWriter.write(null, cell); - } - - @Override - public void close(TaskAttemptContext c) throws IOException, InterruptedException { - for (RecordWriter writer : tableWriters.values()) { - writer.close(c); - } - } - }; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java new file mode 100644 index 00000000000..03256bf9338 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java @@ -0,0 +1,509 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.TreeMap; +import java.util.ArrayList; +import java.util.TreeSet; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Create 3 level tree directory, first level is using table name as parent directory and then use + * family name as child directory, and all related HFiles for one family are under child directory + * -tableName1 + * -columnFamilyName1 + * -HFile (region1) + * -columnFamilyName2 + * -HFile1 (region1) + * -HFile2 (region2) + * -HFile3 (region3) + * -tableName2 + * -columnFamilyName1 + * -HFile (region1) + * family directory and its hfiles match the output of HFileOutputFormat2 + * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 + */ + +@InterfaceAudience.Public +@VisibleForTesting +public class MultiTableHFileOutputFormat extends FileOutputFormat { + private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class); + + @Override + public RecordWriter + getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { + return createMultiHFileRecordWriter(context); + } + + static RecordWriter + createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException { + + // Get the path of the output directory + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final FileSystem fs = outputDir.getFileSystem(conf); + + Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin(); + + // Map of existing tables, avoid calling getTable() everytime + final Map tables = new HashMap<>(); + + // Map of tables to writers + final Map> tableWriters = new HashMap<>(); + + return new RecordWriter() { + @Override + public void write(ImmutableBytesWritable tableName, V cell) + throws IOException, InterruptedException { + RecordWriter tableWriter = tableWriters.get(tableName); + // if there is new table, verify that table directory exists + if (tableWriter == null) { + // using table name as directory name + final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes())); + fs.mkdirs(tableOutputDir); + LOG.info("Writing Table '" + tableName.toString() + "' data into following directory" + + tableOutputDir.toString()); + // Configure for tableWriter, if table exist, write configuration of table into conf + Table table = null; + if (tables.containsKey(tableName)) { + table = tables.get(tableName); + } else { + table = getTable(tableName.copyBytes(), conn, admin); + tables.put(tableName, table); + } + if (table != null) { + configureForOneTable(conf, table.getTableDescriptor()); + } + // Create writer for one specific table + tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir); + // Put table into map + tableWriters.put(tableName, tableWriter); + } + // Write into tableWriter + // in the original code, it does not use Row + tableWriter.write(null, cell); + } + + @Override + public void close(TaskAttemptContext c) throws IOException, InterruptedException { + for (RecordWriter writer : tableWriters.values()) { + writer.close(c); + } + if (conn != null) { + conn.close(); + } + if (admin != null) { + admin.close(); + } + } + }; + } + + /** + * Configure for one table, should be used before creating a new HFileRecordWriter, + * Set compression algorithms and related configuration based on column families + */ + private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor) + throws UnsupportedEncodingException { + HFileOutputFormat2.configureCompression(conf, tableDescriptor); + HFileOutputFormat2.configureBlockSize(tableDescriptor, conf); + HFileOutputFormat2.configureBloomType(tableDescriptor, conf); + HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); + } + + /** + * Configure a MapReduce Job to output HFiles for performing an incremental load into + * the multiple tables. + *

+ * + * ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job. + * Caller needs to setup input path, output path and mapper + * + * @param job + * @param tables A list of tables to inspects + * @throws IOException + */ + public static void configureIncrementalLoad(Job job, List tables) throws IOException { + configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class); + } + + public static void configureIncrementalLoad(Job job, List tables, + Class> cls) throws IOException { + + Configuration conf = job.getConfiguration(); + Map> tableSplitKeys = + MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables); + configureIncrementalLoad(job, tableSplitKeys, cls); + } + + /** + * Same purpose as configureIncrementalLoad(Job job, List tables) + * Used when region startKeys of each table is available, input as > + * + * Caller needs to transfer TableName and byte[] to ImmutableBytesWritable + */ + public static void configureIncrementalLoad(Job job, Map> tableSplitKeys) throws IOException { + configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class); + } + + public static void configureIncrementalLoad(Job job, Map> tableSplitKeys, Class> cls) throws IOException { + Configuration conf = job.getConfiguration(); + + // file path to store + String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); + LOG.info("Writing partition info into dir: " + partitionsPath.toString()); + job.setPartitionerClass(MultiHFilePartitioner.class); + // get split keys for all the tables, and write them into partition file + MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys); + MultiHFilePartitioner.setPartitionFile(conf, partitionsPath); + partitionsPath.getFileSystem(conf).makeQualified(partitionsPath); + partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath); + + // now only support Mapper output + // we can use KeyValueSortReducer directly to sort Mapper output + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + int reducerNum = getReducerNumber(tableSplitKeys); + job.setNumReduceTasks(reducerNum); + LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count"); + + // setup output format + job.setOutputFormatClass(cls); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + } + + /** + * Check if table exist, should not dependent on HBase instance + * @return instance of table, if it exist + */ + private static Table getTable(final byte[] tableName, Connection conn, Admin admin) { + if (conn == null || admin == null) { + LOG.info("can not get Connection or Admin"); + return null; + } + + try { + TableName table = TableName.valueOf(tableName); + if (admin.tableExists(table)) { + return conn.getTable(table); + } + } catch (IOException e) { + LOG.info("Exception found in getTable()" + e.toString()); + return null; + } + + LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist"); + return null; + } + + /** + * Get the number of reducers by tables' split keys + */ + private static int getReducerNumber( + Map> tableSplitKeys) { + int reducerNum = 0; + for (Map.Entry> entry : tableSplitKeys.entrySet()) { + reducerNum += entry.getValue().size(); + } + return reducerNum; + } + + /** + * MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner + * The input is partitioned based on table's name and its region boundaries with the table. + * Two records are in the same partition if they have same table name and the their cells are + * in the same region + */ + static class MultiHFilePartitioner extends Partitioner + implements Configurable { + + public static final String DEFAULT_PATH = "_partition_multihfile.lst"; + public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path"; + private Configuration conf; + // map to receive from file + private Map> table_SplitKeys; + // each pair is map to one unique integer + private TreeMap partitionMap; + + @Override + public void setConf(Configuration conf) { + try { + this.conf = conf; + partitionMap = new TreeMap<>(); + table_SplitKeys = readTableSplitKeys(conf); + + // initiate partitionMap by table_SplitKeys map + int splitNum = 0; + for (Map.Entry> entry : table_SplitKeys.entrySet()) { + ImmutableBytesWritable table = entry.getKey(); + List list = entry.getValue(); + for (ImmutableBytesWritable splitKey : list) { + partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Can't read partitions file", e); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Set the path to the SequenceFile storing the sorted . It must be the case + * that for R reduces, there are R-1 keys in the SequenceFile. + */ + public static void setPartitionFile(Configuration conf, Path p) { + conf.set(PARTITIONER_PATH, p.toString()); + } + + /** + * Get the path to the SequenceFile storing the sorted . + * @see #setPartitionFile(Configuration, Path) + */ + public static String getPartitionFile(Configuration conf) { + return conf.get(PARTITIONER_PATH, DEFAULT_PATH); + } + + + /** + * Return map of + */ + public static Map> getTablesRegionStartKeys( + Configuration conf, List tables) throws IOException { + final TreeMap> ret = new TreeMap<>(); + + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { + LOG.info("Looking up current regions for tables"); + for (TableName tName : tables) { + RegionLocator table = conn.getRegionLocator(tName); + // if table not exist, use default split keys for this table + byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY }; + if (admin.tableExists(tName)) { + byteKeys = table.getStartKeys(); + } + List tableStartKeys = new ArrayList<>(byteKeys.length); + for (byte[] byteKey : byteKeys) { + tableStartKeys.add(new ImmutableBytesWritable(byteKey)); + } + ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys); + + } + return ret; + } + } + + /** + * write into sequence file in order, + * and this format can be parsed by MultiHFilePartitioner + */ + public static void writeTableSplitKeys(Configuration conf, Path partitionsPath, + Map> map) throws IOException { + LOG.info("Writing partition information to " + partitionsPath); + + if (map == null || map.isEmpty()) { + throw new IllegalArgumentException("No regions passed for all tables"); + } + + SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), + Writer.keyClass(ImmutableBytesWritable.class), + Writer.valueClass(ImmutableBytesWritable.class)); + + try { + for (Map.Entry> entry : map.entrySet()) { + ImmutableBytesWritable table = entry.getKey(); + List list = entry.getValue(); + if (list == null) { + throw new IOException("Split keys for a table can not be null"); + } + + TreeSet sorted = new TreeSet<>(list); + + ImmutableBytesWritable first = sorted.first(); + if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "First region of table should have empty start key. Instead has: " + + Bytes.toStringBinary(first.get())); + } + + for (ImmutableBytesWritable startKey : sorted) { + writer.append(table, startKey); + } + } + } finally { + writer.close(); + } + } + + /** + * read partition file into map + */ + private Map> readTableSplitKeys( + Configuration conf) throws IOException { + String parts = getPartitionFile(conf); + LOG.info("Read partition info from file: " + parts); + final Path partFile = new Path(parts); + + SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile)); + // values are already sorted in file, so use list + final Map> map = + new TreeMap<>(); + // key and value have same type + ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + ImmutableBytesWritable value = + ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + try { + while (reader.next(key, value)) { + + List list = map.get(key); + if (list == null) { + list = new ArrayList<>(); + } + list.add(value); + map.put(key, list); + + key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + } + } finally { + IOUtils.cleanup(LOG, reader); + } + return map; + } + + @Override + public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) { + byte[] row = CellUtil.cloneRow(value); + final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row); + ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY); + //find splitKey by input rowKey + if (table_SplitKeys.containsKey(table)) { + List list = table_SplitKeys.get(table); + int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator()); + if (index < 0) { + index = (index + 1) * (-1) - 1; + } else if (index == list.size()) { + index -= 1; + } + if (index < 0) { + index = 0; + LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY "); + } + splitId = list.get(index); + } + + // find the id of the reducer for the input + Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId)); + if (id == null) { + LOG.warn("Can not get reducer id for input record"); + return -1; + } + return id.intValue() % numPartitions; + } + + /** + * A class store pair, has two main usage + * 1. store tableName and one of its splitKey as a pair + * 2. implement comparable, so that partitioner can find splitKey of its input cell + */ + static class TableSplitKeyPair extends Pair + implements Comparable { + + private static final long serialVersionUID = -6485999667666325594L; + + public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) { + super(a, b); + } + + @Override + public int compareTo(TableSplitKeyPair other) { + if (this.getFirst().equals(other.getFirst())) { + return this.getSecond().compareTo(other.getSecond()); + } + return this.getFirst().compareTo(other.getFirst()); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java deleted file mode 100644 index 958ed83f6ce..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertTrue; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Random; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and - * writes hfiles. - */ -@Category(MediumTests.class) -public class TestMultiHFileOutputFormat { - private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class); - - private HBaseTestingUtility util = new HBaseTestingUtility(); - - private static int ROWSPERSPLIT = 10; - - private static final int KEYLEN_DEFAULT = 10; - private static final String KEYLEN_CONF = "randomkv.key.length"; - - private static final int VALLEN_DEFAULT = 10; - private static final String VALLEN_CONF = "randomkv.val.length"; - - private static final byte[][] TABLES = - { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), - Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; - - private static final byte[][] FAMILIES = - { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), - Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; - - private static final byte[] QUALIFIER = Bytes.toBytes("data"); - - public static void main(String[] args) throws Exception { - new TestMultiHFileOutputFormat().testWritingDataIntoHFiles(); - } - - /** - * Run small MR job. this MR job will write HFile into - * testWritingDataIntoHFiles/tableNames/columFamilies/ - */ - @Test - public void testWritingDataIntoHFiles() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - try { - Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); - - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - job.setReducerClass(Table_KeyValueSortReducer.class); - job.setOutputFormatClass(MultiHFileOutputFormat.class); - job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("\nStarting test testWritingDataIntoHFiles\n"); - assertTrue(job.waitForCompletion(true)); - LOG.info("\nWaiting on checking MapReduce output\n"); - assertTrue(checkMROutput(fs, testDir, 0)); - } finally { - testDir.getFileSystem(conf).delete(testDir, true); - util.shutdownMiniCluster(); - } - } - - /** - * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the - * created directory is correct or not A recursion method, the testDir had better be small size - */ - private boolean checkMROutput(FileSystem fs, Path testDir, int level) - throws FileNotFoundException, IOException { - if (level >= 3) { - return HFile.isHFileFormat(fs, testDir); - } - FileStatus[] fStats = fs.listStatus(testDir); - if (fStats == null || fStats.length <= 0) { - LOG.info("Created directory format is not correct"); - return false; - } - - for (FileStatus stats : fStats) { - // skip the _SUCCESS file created by MapReduce - if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) - continue; - if (level < 2 && !stats.isDirectory()) { - LOG.info("Created directory format is not correct"); - return false; - } - boolean flag = checkMROutput(fs, stats.getPath(), level + 1); - if (flag == false) return false; - } - return true; - } - - /** - * Simple mapper that makes output. With no input data - */ - static class Random_TableKV_GeneratingMapper - extends Mapper { - - private int keyLength; - private int valLength; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - - Configuration conf = context.getConfiguration(); - keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); - valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); - } - - @Override - protected void map(NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException, InterruptedException { - - byte keyBytes[] = new byte[keyLength]; - byte valBytes[] = new byte[valLength]; - - ArrayList tables = new ArrayList<>(); - for (int i = 0; i < TABLES.length; i++) { - tables.add(new ImmutableBytesWritable(TABLES[i])); - } - - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - - for (int i = 0; i < ROWSPERSPLIT; i++) { - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); - random.nextBytes(valBytes); - - for (ImmutableBytesWritable table : tables) { - for (byte[] family : FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(table, kv); - } - } - } - } - } - - /** - * Simple Reducer that have input , with KeyValues have no order. and output - * , with KeyValues are ordered - */ - - static class Table_KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, - org.apache.hadoop.mapreduce.Reducer.Context context) - throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet<>(KeyValue.COMPARATOR); - for (KeyValue kv : kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv : map) { - context.write(table, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } - } - -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java new file mode 100644 index 00000000000..781eaa984a2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and + * writes hfiles. + */ +@Category(MediumTests.class) +public class TestMultiTableHFileOutputFormat { + private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static int ROWSPERSPLIT = 10; + + private static final int KEYLEN_DEFAULT = 10; + private static final String KEYLEN_CONF = "randomkv.key.length"; + + private static final int VALLEN_DEFAULT = 10; + private static final String VALLEN_CONF = "randomkv.val.length"; + + private static final byte[][] TABLES = + { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), + Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; + + private static final byte[][] FAMILIES = + { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; + + private static final byte[] QUALIFIER = Bytes.toBytes("data"); + + /** + * Run small MR job. this MR job will write HFile into + * testWritingDataIntoHFiles/tableNames/columnFamilies/ + */ + @Test + public void testWritingDataIntoHFiles() throws Exception { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + try { + Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); + + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(Table_KeyValueSortReducer.class); + job.setOutputFormatClass(MultiTableHFileOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("\nStarting test testWritingDataIntoHFiles\n"); + assertTrue(job.waitForCompletion(true)); + LOG.info("\nWaiting on checking MapReduce output\n"); + assertTrue(checkMROutput(fs, testDir, 0)); + } finally { + testDir.getFileSystem(conf).delete(testDir, true); + util.shutdownMiniCluster(); + } + } + + /** + * check whether create directory and hfiles as format designed in MultiHFilePartitioner + * and also check whether the output file has same related configuration as created table + */ + @Test + public void testMultiHFilePartitioner() throws Exception { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testMultiHFilePartitioner dir writing to : " + testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + // Create several tables for testing + List tables = new ArrayList(); + + // to store splitKeys for TABLE[0] for testing; + byte[][] testKeys = new byte[0][0]; + for (int i = 0; i < TABLES.length; i++) { + TableName tableName = TableName.valueOf(TABLES[i]); + byte[][] splitKeys = generateRandomSplitKeys(3); + if (i == 0) { + testKeys = splitKeys; + } + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + for (int j = 0; j < FAMILIES.length; j++) { + HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]); + //only set Tables[0] configuration, and specify compression type and DataBlockEncode + if (i == 0) { + familyDescriptor.setCompressionType(Compression.Algorithm.GZ); + familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); + } + tableDescriptor.addFamily(familyDescriptor); + } + util.createTable(tableDescriptor, splitKeys, conf); + tables.add(tableName); + } + // set up for MapReduce job + try { + Job job = Job.getInstance(conf, "testMultiHFilePartitioner"); + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + + MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables); + + LOG.info("Starting test testWritingDataIntoHFiles"); + assertTrue(job.waitForCompletion(true)); + LOG.info("Waiting on checking MapReduce output"); + assertTrue(checkMROutput(fs, testDir, 0)); + assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys)); + } finally { + for (int i = 0; i < TABLES.length; i++) { + TableName tName = TableName.valueOf(TABLES[i]); + util.deleteTable(tName); + } + fs.delete(testDir, true); + fs.close(); + util.shutdownMiniCluster(); + } + } + + /** + * check the output hfile has same configuration as created test table + * and also check whether hfiles get split correctly + * only check TABLES[0] + */ + private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException { + FileStatus[] fStats = fs.listStatus(testDir); + for (FileStatus stats : fStats) { + if (stats.getPath().getName().equals(new String(TABLES[0]))) { + FileStatus[] cfStats = fs.listStatus(stats.getPath()); + for (FileStatus cfstat : cfStats) { + FileStatus[] hfStats = fs.listStatus(cfstat.getPath()); + + List firsttKeys = new ArrayList(); + List lastKeys = new ArrayList(); + for (FileStatus hfstat : hfStats) { + if (HFile.isHFileFormat(fs, hfstat)) { + HFile.Reader hfr = + HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf); + if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr + .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false; + firsttKeys.add(hfr.getFirstRowKey()); + lastKeys.add(hfr.getLastRowKey()); + } + } + if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) { + return false; + } + } + } + } + return true; + } + + /** + * Check whether the Hfile has been split by region boundaries + * @param splitKeys split keys for that table + * @param firstKeys first rowKey for hfiles + * @param lastKeys last rowKey for hfiles + */ + private boolean checkFileSplit(byte[][] splitKeys, List firstKeys, List lastKeys) { + Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR); + Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR); + Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR); + + int is = 0, il = 0; + for (byte[] key : lastKeys) { + while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++; + if (is == splitKeys.length) { + break; + } + if (is > 0) { + if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false; + } + il++; + } + + if (is == splitKeys.length) { + return il == lastKeys.size() - 1; + } + return true; + } + + + /** + * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the + * created directory is correct or not A recursion method, the testDir had better be small size + */ + private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException { + if (level >= 3) { + return HFile.isHFileFormat(fs, testDir); + } + FileStatus[] fStats = fs.listStatus(testDir); + if (fStats == null || fStats.length <= 0) { + LOG.info("Created directory format is not correct"); + return false; + } + + for (FileStatus stats : fStats) { + // skip the _SUCCESS file created by MapReduce + if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) + continue; + if (level < 2 && !stats.isDirectory()) { + LOG.info("Created directory format is not correct"); + return false; + } + boolean flag = checkMROutput(fs, stats.getPath(), level + 1); + if (flag == false) return false; + } + return true; + } + + + private byte[][] generateRandomSplitKeys(int numKeys) { + Random random = new Random(); + byte[][] ret = new byte[numKeys][]; + for (int i = 0; i < numKeys; i++) { + ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT); + } + return ret; + } + + + /** + * Simple mapper that makes output. With no input data + */ + static class Random_TableKV_GeneratingMapper + extends Mapper { + + private int keyLength; + private int valLength; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + @Override + protected void map(NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException, InterruptedException { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + ArrayList tables = new ArrayList<>(); + for (int i = 0; i < TABLES.length; i++) { + tables.add(new ImmutableBytesWritable(TABLES[i])); + } + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + Random random = new Random(); + + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + + for (ImmutableBytesWritable table : tables) { + for (byte[] family : FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(table, kv); + } + } + } + } + } + + /** + * Simple Reducer that have input , with KeyValues have no order. and output + * , with KeyValues are ordered + */ + + static class Table_KeyValueSortReducer + extends Reducer { + protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); + for (KeyValue kv : kvs) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(table, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } + } +} \ No newline at end of file