diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 5b1f13cc274..da507b13d0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -166,7 +166,7 @@ public class HFileOutputFormat2
/**
* Mapredue job will create a temp path for outputting results. If out != null, it means that
* the caller has set the temp working dir; If out == null, it means we need to set it here.
- * Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us
+ * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
* temp working dir at the table level and HFileOutputFormat2 has to set it here within this
* constructor.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
deleted file mode 100644
index 3c90b5987a8..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-
-import com.google.common.annotations.VisibleForTesting;
-/**
- * Create 3 level tree directory, first level is using table name as parent directory and then use
- * family name as child directory, and all related HFiles for one family are under child directory
- * -tableName1
- * -columnFamilyName1
- * -columnFamilyName2
- * -HFiles
- * -tableName2
- * -columnFamilyName1
- * -HFiles
- * -columnFamilyName2
- *
- */
-@InterfaceAudience.Public
-@VisibleForTesting
-public class MultiHFileOutputFormat extends FileOutputFormat {
- private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
-
- @Override
- public RecordWriter
- getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
- return createMultiHFileRecordWriter(context);
- }
-
- static RecordWriter
- createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
-
- // Get the path of the output directory
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = outputDir.getFileSystem(conf);
-
- // Map of tables to writers
- final Map> tableWriters = new HashMap<>();
-
- return new RecordWriter() {
- @Override
- public void write(ImmutableBytesWritable tableName, V cell)
- throws IOException, InterruptedException {
- RecordWriter tableWriter = tableWriters.get(tableName);
- // if there is new table, verify that table directory exists
- if (tableWriter == null) {
- // using table name as directory name
- final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
- fs.mkdirs(tableOutputDir);
- LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
- + tableOutputDir.toString());
-
- // Create writer for one specific table
- tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
- // Put table into map
- tableWriters.put(tableName, tableWriter);
- }
- // Write into tableWriter
- // in the original code, it does not use Row
- tableWriter.write(null, cell);
- }
-
- @Override
- public void close(TaskAttemptContext c) throws IOException, InterruptedException {
- for (RecordWriter writer : tableWriters.values()) {
- writer.close(c);
- }
- }
- };
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
new file mode 100644
index 00000000000..03256bf9338
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Create 3 level tree directory, first level is using table name as parent directory and then use
+ * family name as child directory, and all related HFiles for one family are under child directory
+ * -tableName1
+ * -columnFamilyName1
+ * -HFile (region1)
+ * -columnFamilyName2
+ * -HFile1 (region1)
+ * -HFile2 (region2)
+ * -HFile3 (region3)
+ * -tableName2
+ * -columnFamilyName1
+ * -HFile (region1)
+ * family directory and its hfiles match the output of HFileOutputFormat2
+ * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
+ */
+
+@InterfaceAudience.Public
+@VisibleForTesting
+public class MultiTableHFileOutputFormat extends FileOutputFormat {
+ private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
+
+ @Override
+ public RecordWriter
+ getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ return createMultiHFileRecordWriter(context);
+ }
+
+ static RecordWriter
+ createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
+
+ // Get the path of the output directory
+ final Path outputPath = FileOutputFormat.getOutputPath(context);
+ final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = outputDir.getFileSystem(conf);
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Admin admin = conn.getAdmin();
+
+ // Map of existing tables, avoid calling getTable() everytime
+ final Map tables = new HashMap<>();
+
+ // Map of tables to writers
+ final Map> tableWriters = new HashMap<>();
+
+ return new RecordWriter() {
+ @Override
+ public void write(ImmutableBytesWritable tableName, V cell)
+ throws IOException, InterruptedException {
+ RecordWriter tableWriter = tableWriters.get(tableName);
+ // if there is new table, verify that table directory exists
+ if (tableWriter == null) {
+ // using table name as directory name
+ final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
+ fs.mkdirs(tableOutputDir);
+ LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+ + tableOutputDir.toString());
+ // Configure for tableWriter, if table exist, write configuration of table into conf
+ Table table = null;
+ if (tables.containsKey(tableName)) {
+ table = tables.get(tableName);
+ } else {
+ table = getTable(tableName.copyBytes(), conn, admin);
+ tables.put(tableName, table);
+ }
+ if (table != null) {
+ configureForOneTable(conf, table.getTableDescriptor());
+ }
+ // Create writer for one specific table
+ tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
+ // Put table into map
+ tableWriters.put(tableName, tableWriter);
+ }
+ // Write into tableWriter
+ // in the original code, it does not use Row
+ tableWriter.write(null, cell);
+ }
+
+ @Override
+ public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+ for (RecordWriter writer : tableWriters.values()) {
+ writer.close(c);
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ };
+ }
+
+ /**
+ * Configure for one table, should be used before creating a new HFileRecordWriter,
+ * Set compression algorithms and related configuration based on column families
+ */
+ private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor)
+ throws UnsupportedEncodingException {
+ HFileOutputFormat2.configureCompression(conf, tableDescriptor);
+ HFileOutputFormat2.configureBlockSize(tableDescriptor, conf);
+ HFileOutputFormat2.configureBloomType(tableDescriptor, conf);
+ HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
+ }
+
+ /**
+ * Configure a MapReduce Job to output HFiles for performing an incremental load into
+ * the multiple tables.
+ *
+ * - Inspects the tables to configure a partitioner based on their region boundaries
+ * - Writes the partitions file and configures the partitioner
+ * - Sets the number of reduce tasks to match the total number of all tables' regions
+ * - Sets the reducer up to perform the appropriate sorting (KeyValueSortReducer)
+ *
+ *
+ * ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job.
+ * Caller needs to setup input path, output path and mapper
+ *
+ * @param job
+ * @param tables A list of tables to inspects
+ * @throws IOException
+ */
+ public static void configureIncrementalLoad(Job job, List tables) throws IOException {
+ configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class);
+ }
+
+ public static void configureIncrementalLoad(Job job, List tables,
+ Class extends OutputFormat, ?>> cls) throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ Map> tableSplitKeys =
+ MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables);
+ configureIncrementalLoad(job, tableSplitKeys, cls);
+ }
+
+ /**
+ * Same purpose as configureIncrementalLoad(Job job, List tables)
+ * Used when region startKeys of each table is available, input as >
+ *
+ * Caller needs to transfer TableName and byte[] to ImmutableBytesWritable
+ */
+ public static void configureIncrementalLoad(Job job, Map> tableSplitKeys) throws IOException {
+ configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class);
+ }
+
+ public static void configureIncrementalLoad(Job job, Map> tableSplitKeys, Class extends OutputFormat, ?>> cls) throws IOException {
+ Configuration conf = job.getConfiguration();
+
+ // file path to store
+ String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+ HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
+ LOG.info("Writing partition info into dir: " + partitionsPath.toString());
+ job.setPartitionerClass(MultiHFilePartitioner.class);
+ // get split keys for all the tables, and write them into partition file
+ MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys);
+ MultiHFilePartitioner.setPartitionFile(conf, partitionsPath);
+ partitionsPath.getFileSystem(conf).makeQualified(partitionsPath);
+ partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath);
+
+ // now only support Mapper output
+ // we can use KeyValueSortReducer directly to sort Mapper output
+ if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(KeyValueSortReducer.class);
+ } else {
+ LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+ }
+ int reducerNum = getReducerNumber(tableSplitKeys);
+ job.setNumReduceTasks(reducerNum);
+ LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count");
+
+ // setup output format
+ job.setOutputFormatClass(cls);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(KeyValue.class);
+
+ job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.initCredentials(job);
+ }
+
+ /**
+ * Check if table exist, should not dependent on HBase instance
+ * @return instance of table, if it exist
+ */
+ private static Table getTable(final byte[] tableName, Connection conn, Admin admin) {
+ if (conn == null || admin == null) {
+ LOG.info("can not get Connection or Admin");
+ return null;
+ }
+
+ try {
+ TableName table = TableName.valueOf(tableName);
+ if (admin.tableExists(table)) {
+ return conn.getTable(table);
+ }
+ } catch (IOException e) {
+ LOG.info("Exception found in getTable()" + e.toString());
+ return null;
+ }
+
+ LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist");
+ return null;
+ }
+
+ /**
+ * Get the number of reducers by tables' split keys
+ */
+ private static int getReducerNumber(
+ Map> tableSplitKeys) {
+ int reducerNum = 0;
+ for (Map.Entry> entry : tableSplitKeys.entrySet()) {
+ reducerNum += entry.getValue().size();
+ }
+ return reducerNum;
+ }
+
+ /**
+ * MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner
+ * The input is partitioned based on table's name and its region boundaries with the table.
+ * Two records are in the same partition if they have same table name and the their cells are
+ * in the same region
+ */
+ static class MultiHFilePartitioner extends Partitioner
+ implements Configurable {
+
+ public static final String DEFAULT_PATH = "_partition_multihfile.lst";
+ public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path";
+ private Configuration conf;
+ // map to receive from file
+ private Map> table_SplitKeys;
+ // each pair is map to one unique integer
+ private TreeMap partitionMap;
+
+ @Override
+ public void setConf(Configuration conf) {
+ try {
+ this.conf = conf;
+ partitionMap = new TreeMap<>();
+ table_SplitKeys = readTableSplitKeys(conf);
+
+ // initiate partitionMap by table_SplitKeys map
+ int splitNum = 0;
+ for (Map.Entry> entry : table_SplitKeys.entrySet()) {
+ ImmutableBytesWritable table = entry.getKey();
+ List list = entry.getValue();
+ for (ImmutableBytesWritable splitKey : list) {
+ partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Can't read partitions file", e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Set the path to the SequenceFile storing the sorted . It must be the case
+ * that for R reduces, there are R-1 keys in the SequenceFile.
+ */
+ public static void setPartitionFile(Configuration conf, Path p) {
+ conf.set(PARTITIONER_PATH, p.toString());
+ }
+
+ /**
+ * Get the path to the SequenceFile storing the sorted .
+ * @see #setPartitionFile(Configuration, Path)
+ */
+ public static String getPartitionFile(Configuration conf) {
+ return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+ }
+
+
+ /**
+ * Return map of
+ */
+ public static Map> getTablesRegionStartKeys(
+ Configuration conf, List tables) throws IOException {
+ final TreeMap> ret = new TreeMap<>();
+
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Admin admin = conn.getAdmin()) {
+ LOG.info("Looking up current regions for tables");
+ for (TableName tName : tables) {
+ RegionLocator table = conn.getRegionLocator(tName);
+ // if table not exist, use default split keys for this table
+ byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY };
+ if (admin.tableExists(tName)) {
+ byteKeys = table.getStartKeys();
+ }
+ List tableStartKeys = new ArrayList<>(byteKeys.length);
+ for (byte[] byteKey : byteKeys) {
+ tableStartKeys.add(new ImmutableBytesWritable(byteKey));
+ }
+ ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys);
+
+ }
+ return ret;
+ }
+ }
+
+ /**
+ * write into sequence file in order,
+ * and this format can be parsed by MultiHFilePartitioner
+ */
+ public static void writeTableSplitKeys(Configuration conf, Path partitionsPath,
+ Map> map) throws IOException {
+ LOG.info("Writing partition information to " + partitionsPath);
+
+ if (map == null || map.isEmpty()) {
+ throw new IllegalArgumentException("No regions passed for all tables");
+ }
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath),
+ Writer.keyClass(ImmutableBytesWritable.class),
+ Writer.valueClass(ImmutableBytesWritable.class));
+
+ try {
+ for (Map.Entry> entry : map.entrySet()) {
+ ImmutableBytesWritable table = entry.getKey();
+ List list = entry.getValue();
+ if (list == null) {
+ throw new IOException("Split keys for a table can not be null");
+ }
+
+ TreeSet sorted = new TreeSet<>(list);
+
+ ImmutableBytesWritable first = sorted.first();
+ if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IllegalArgumentException(
+ "First region of table should have empty start key. Instead has: "
+ + Bytes.toStringBinary(first.get()));
+ }
+
+ for (ImmutableBytesWritable startKey : sorted) {
+ writer.append(table, startKey);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * read partition file into map
+ */
+ private Map> readTableSplitKeys(
+ Configuration conf) throws IOException {
+ String parts = getPartitionFile(conf);
+ LOG.info("Read partition info from file: " + parts);
+ final Path partFile = new Path(parts);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile));
+ // values are already sorted in file, so use list
+ final Map> map =
+ new TreeMap<>();
+ // key and value have same type
+ ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
+ ImmutableBytesWritable value =
+ ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
+ try {
+ while (reader.next(key, value)) {
+
+ List list = map.get(key);
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ list.add(value);
+ map.put(key, list);
+
+ key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
+ value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, reader);
+ }
+ return map;
+ }
+
+ @Override
+ public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) {
+ byte[] row = CellUtil.cloneRow(value);
+ final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row);
+ ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
+ //find splitKey by input rowKey
+ if (table_SplitKeys.containsKey(table)) {
+ List list = table_SplitKeys.get(table);
+ int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator());
+ if (index < 0) {
+ index = (index + 1) * (-1) - 1;
+ } else if (index == list.size()) {
+ index -= 1;
+ }
+ if (index < 0) {
+ index = 0;
+ LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY ");
+ }
+ splitId = list.get(index);
+ }
+
+ // find the id of the reducer for the input
+ Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId));
+ if (id == null) {
+ LOG.warn("Can not get reducer id for input record");
+ return -1;
+ }
+ return id.intValue() % numPartitions;
+ }
+
+ /**
+ * A class store pair, has two main usage
+ * 1. store tableName and one of its splitKey as a pair
+ * 2. implement comparable, so that partitioner can find splitKey of its input cell
+ */
+ static class TableSplitKeyPair extends Pair
+ implements Comparable {
+
+ private static final long serialVersionUID = -6485999667666325594L;
+
+ public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) {
+ super(a, b);
+ }
+
+ @Override
+ public int compareTo(TableSplitKeyPair other) {
+ if (this.getFirst().equals(other.getFirst())) {
+ return this.getSecond().compareTo(other.getSecond());
+ }
+ return this.getFirst().compareTo(other.getFirst());
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
deleted file mode 100644
index 958ed83f6ce..00000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
- * writes hfiles.
- */
-@Category(MediumTests.class)
-public class TestMultiHFileOutputFormat {
- private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
-
- private HBaseTestingUtility util = new HBaseTestingUtility();
-
- private static int ROWSPERSPLIT = 10;
-
- private static final int KEYLEN_DEFAULT = 10;
- private static final String KEYLEN_CONF = "randomkv.key.length";
-
- private static final int VALLEN_DEFAULT = 10;
- private static final String VALLEN_CONF = "randomkv.val.length";
-
- private static final byte[][] TABLES =
- { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
- Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
-
- private static final byte[][] FAMILIES =
- { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
- Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
-
- private static final byte[] QUALIFIER = Bytes.toBytes("data");
-
- public static void main(String[] args) throws Exception {
- new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
- }
-
- /**
- * Run small MR job. this MR job will write HFile into
- * testWritingDataIntoHFiles/tableNames/columFamilies/
- */
- @Test
- public void testWritingDataIntoHFiles() throws Exception {
- Configuration conf = util.getConfiguration();
- util.startMiniCluster();
- Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
- FileSystem fs = testDir.getFileSystem(conf);
- LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
-
- // Set down this value or we OOME in eclipse.
- conf.setInt("mapreduce.task.io.sort.mb", 20);
- // Write a few files by setting max file size.
- conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
-
- try {
- Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
-
- FileOutputFormat.setOutputPath(job, testDir);
-
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(Random_TableKV_GeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setReducerClass(Table_KeyValueSortReducer.class);
- job.setOutputFormatClass(MultiHFileOutputFormat.class);
- job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
-
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- LOG.info("\nStarting test testWritingDataIntoHFiles\n");
- assertTrue(job.waitForCompletion(true));
- LOG.info("\nWaiting on checking MapReduce output\n");
- assertTrue(checkMROutput(fs, testDir, 0));
- } finally {
- testDir.getFileSystem(conf).delete(testDir, true);
- util.shutdownMiniCluster();
- }
- }
-
- /**
- * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
- * created directory is correct or not A recursion method, the testDir had better be small size
- */
- private boolean checkMROutput(FileSystem fs, Path testDir, int level)
- throws FileNotFoundException, IOException {
- if (level >= 3) {
- return HFile.isHFileFormat(fs, testDir);
- }
- FileStatus[] fStats = fs.listStatus(testDir);
- if (fStats == null || fStats.length <= 0) {
- LOG.info("Created directory format is not correct");
- return false;
- }
-
- for (FileStatus stats : fStats) {
- // skip the _SUCCESS file created by MapReduce
- if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
- continue;
- if (level < 2 && !stats.isDirectory()) {
- LOG.info("Created directory format is not correct");
- return false;
- }
- boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
- if (flag == false) return false;
- }
- return true;
- }
-
- /**
- * Simple mapper that makes output. With no input data
- */
- static class Random_TableKV_GeneratingMapper
- extends Mapper {
-
- private int keyLength;
- private int valLength;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
- keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
- valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
- }
-
- @Override
- protected void map(NullWritable n1, NullWritable n2,
- Mapper.Context context)
- throws java.io.IOException, InterruptedException {
-
- byte keyBytes[] = new byte[keyLength];
- byte valBytes[] = new byte[valLength];
-
- ArrayList tables = new ArrayList<>();
- for (int i = 0; i < TABLES.length; i++) {
- tables.add(new ImmutableBytesWritable(TABLES[i]));
- }
-
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
- Random random = new Random();
-
- for (int i = 0; i < ROWSPERSPLIT; i++) {
- random.nextBytes(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
- random.nextBytes(valBytes);
-
- for (ImmutableBytesWritable table : tables) {
- for (byte[] family : FAMILIES) {
- Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
- context.write(table, kv);
- }
- }
- }
- }
- }
-
- /**
- * Simple Reducer that have input , with KeyValues have no order. and output
- * , with KeyValues are ordered
- */
-
- static class Table_KeyValueSortReducer
- extends Reducer {
- protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs,
- org.apache.hadoop.mapreduce.Reducer.Context context)
- throws java.io.IOException, InterruptedException {
- TreeSet map = new TreeSet<>(KeyValue.COMPARATOR);
- for (KeyValue kv : kvs) {
- try {
- map.add(kv.clone());
- } catch (CloneNotSupportedException e) {
- throw new java.io.IOException(e);
- }
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv : map) {
- context.write(table, kv);
- if (++index % 100 == 0) context.setStatus("Wrote " + index);
- }
- }
- }
-
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
new file mode 100644
index 00000000000..781eaa984a2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
+ * writes hfiles.
+ */
+@Category(MediumTests.class)
+public class TestMultiTableHFileOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class);
+
+ private HBaseTestingUtility util = new HBaseTestingUtility();
+
+ private static int ROWSPERSPLIT = 10;
+
+ private static final int KEYLEN_DEFAULT = 10;
+ private static final String KEYLEN_CONF = "randomkv.key.length";
+
+ private static final int VALLEN_DEFAULT = 10;
+ private static final String VALLEN_CONF = "randomkv.val.length";
+
+ private static final byte[][] TABLES =
+ { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
+ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
+
+ private static final byte[][] FAMILIES =
+ { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+
+ private static final byte[] QUALIFIER = Bytes.toBytes("data");
+
+ /**
+ * Run small MR job. this MR job will write HFile into
+ * testWritingDataIntoHFiles/tableNames/columnFamilies/
+ */
+ @Test
+ public void testWritingDataIntoHFiles() throws Exception {
+ Configuration conf = util.getConfiguration();
+ util.startMiniCluster();
+ Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
+ FileSystem fs = testDir.getFileSystem(conf);
+ LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
+
+ // Set down this value or we OOME in eclipse.
+ conf.setInt("mapreduce.task.io.sort.mb", 20);
+ // Write a few files by setting max file size.
+ conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+ try {
+ Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
+
+ FileOutputFormat.setOutputPath(job, testDir);
+
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(Random_TableKV_GeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setReducerClass(Table_KeyValueSortReducer.class);
+ job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
+ job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.initCredentials(job);
+ LOG.info("\nStarting test testWritingDataIntoHFiles\n");
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("\nWaiting on checking MapReduce output\n");
+ assertTrue(checkMROutput(fs, testDir, 0));
+ } finally {
+ testDir.getFileSystem(conf).delete(testDir, true);
+ util.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * check whether create directory and hfiles as format designed in MultiHFilePartitioner
+ * and also check whether the output file has same related configuration as created table
+ */
+ @Test
+ public void testMultiHFilePartitioner() throws Exception {
+ Configuration conf = util.getConfiguration();
+ util.startMiniCluster();
+ Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner");
+ FileSystem fs = testDir.getFileSystem(conf);
+ LOG.info("testMultiHFilePartitioner dir writing to : " + testDir);
+
+ // Set down this value or we OOME in eclipse.
+ conf.setInt("mapreduce.task.io.sort.mb", 20);
+ // Write a few files by setting max file size.
+ conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+ // Create several tables for testing
+ List tables = new ArrayList();
+
+ // to store splitKeys for TABLE[0] for testing;
+ byte[][] testKeys = new byte[0][0];
+ for (int i = 0; i < TABLES.length; i++) {
+ TableName tableName = TableName.valueOf(TABLES[i]);
+ byte[][] splitKeys = generateRandomSplitKeys(3);
+ if (i == 0) {
+ testKeys = splitKeys;
+ }
+ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
+ for (int j = 0; j < FAMILIES.length; j++) {
+ HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]);
+ //only set Tables[0] configuration, and specify compression type and DataBlockEncode
+ if (i == 0) {
+ familyDescriptor.setCompressionType(Compression.Algorithm.GZ);
+ familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ }
+ tableDescriptor.addFamily(familyDescriptor);
+ }
+ util.createTable(tableDescriptor, splitKeys, conf);
+ tables.add(tableName);
+ }
+ // set up for MapReduce job
+ try {
+ Job job = Job.getInstance(conf, "testMultiHFilePartitioner");
+ FileOutputFormat.setOutputPath(job, testDir);
+
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(Random_TableKV_GeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables);
+
+ LOG.info("Starting test testWritingDataIntoHFiles");
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("Waiting on checking MapReduce output");
+ assertTrue(checkMROutput(fs, testDir, 0));
+ assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys));
+ } finally {
+ for (int i = 0; i < TABLES.length; i++) {
+ TableName tName = TableName.valueOf(TABLES[i]);
+ util.deleteTable(tName);
+ }
+ fs.delete(testDir, true);
+ fs.close();
+ util.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * check the output hfile has same configuration as created test table
+ * and also check whether hfiles get split correctly
+ * only check TABLES[0]
+ */
+ private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException {
+ FileStatus[] fStats = fs.listStatus(testDir);
+ for (FileStatus stats : fStats) {
+ if (stats.getPath().getName().equals(new String(TABLES[0]))) {
+ FileStatus[] cfStats = fs.listStatus(stats.getPath());
+ for (FileStatus cfstat : cfStats) {
+ FileStatus[] hfStats = fs.listStatus(cfstat.getPath());
+
+ List firsttKeys = new ArrayList();
+ List lastKeys = new ArrayList();
+ for (FileStatus hfstat : hfStats) {
+ if (HFile.isHFileFormat(fs, hfstat)) {
+ HFile.Reader hfr =
+ HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf);
+ if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr
+ .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false;
+ firsttKeys.add(hfr.getFirstRowKey());
+ lastKeys.add(hfr.getLastRowKey());
+ }
+ }
+ if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check whether the Hfile has been split by region boundaries
+ * @param splitKeys split keys for that table
+ * @param firstKeys first rowKey for hfiles
+ * @param lastKeys last rowKey for hfiles
+ */
+ private boolean checkFileSplit(byte[][] splitKeys, List firstKeys, List lastKeys) {
+ Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR);
+ Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR);
+ Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR);
+
+ int is = 0, il = 0;
+ for (byte[] key : lastKeys) {
+ while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++;
+ if (is == splitKeys.length) {
+ break;
+ }
+ if (is > 0) {
+ if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false;
+ }
+ il++;
+ }
+
+ if (is == splitKeys.length) {
+ return il == lastKeys.size() - 1;
+ }
+ return true;
+ }
+
+
+ /**
+ * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
+ * created directory is correct or not A recursion method, the testDir had better be small size
+ */
+ private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException {
+ if (level >= 3) {
+ return HFile.isHFileFormat(fs, testDir);
+ }
+ FileStatus[] fStats = fs.listStatus(testDir);
+ if (fStats == null || fStats.length <= 0) {
+ LOG.info("Created directory format is not correct");
+ return false;
+ }
+
+ for (FileStatus stats : fStats) {
+ // skip the _SUCCESS file created by MapReduce
+ if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
+ continue;
+ if (level < 2 && !stats.isDirectory()) {
+ LOG.info("Created directory format is not correct");
+ return false;
+ }
+ boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
+ if (flag == false) return false;
+ }
+ return true;
+ }
+
+
+ private byte[][] generateRandomSplitKeys(int numKeys) {
+ Random random = new Random();
+ byte[][] ret = new byte[numKeys][];
+ for (int i = 0; i < numKeys; i++) {
+ ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT);
+ }
+ return ret;
+ }
+
+
+ /**
+ * Simple mapper that makes output. With no input data
+ */
+ static class Random_TableKV_GeneratingMapper
+ extends Mapper {
+
+ private int keyLength;
+ private int valLength;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ }
+
+ @Override
+ protected void map(NullWritable n1, NullWritable n2,
+ Mapper.Context context)
+ throws java.io.IOException, InterruptedException {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ ArrayList tables = new ArrayList<>();
+ for (int i = 0; i < TABLES.length; i++) {
+ tables.add(new ImmutableBytesWritable(TABLES[i]));
+ }
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+ Random random = new Random();
+
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ random.nextBytes(valBytes);
+
+ for (ImmutableBytesWritable table : tables) {
+ for (byte[] family : FAMILIES) {
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+ context.write(table, kv);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Simple Reducer that have input , with KeyValues have no order. and output
+ * , with KeyValues are ordered
+ */
+
+ static class Table_KeyValueSortReducer
+ extends Reducer {
+ protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs,
+ org.apache.hadoop.mapreduce.Reducer.Context context)
+ throws java.io.IOException, InterruptedException {
+ TreeSet map = new TreeSet<>(CellComparator.COMPARATOR);
+ for (KeyValue kv : kvs) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(table, kv);
+ if (++index % 100 == 0) context.setStatus("Wrote " + index);
+ }
+ }
+ }
+}
\ No newline at end of file