diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index b8a5475e7e2..22a73c97380 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -131,224 +132,254 @@ public class HFileOutputFormat2 } static RecordWriter - createRecordWriter(final TaskAttemptContext context) - throws IOException { + createRecordWriter(final TaskAttemptContext context) throws IOException { + return new HFileRecordWriter(context, null); + } - // Get the path of the temporary output file - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); - final Configuration conf = context.getConfiguration(); - final FileSystem fs = outputdir.getFileSystem(conf); - // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompressionStr = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = HFileWriterImpl - .compressionByName(defaultCompressionStr); - final boolean compactionExclude = conf.getBoolean( - "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + protected static class HFileRecordWriter + extends RecordWriter { + private final TaskAttemptContext context; + private final Path outputPath; + private final Path outputDir; + private final Configuration conf; + private final FileSystem fs; - // create a map from column family to the compression algorithm - final Map compressionMap = createFamilyCompressionMap(conf); - final Map bloomTypeMap = createFamilyBloomTypeMap(conf); - final Map blockSizeMap = createFamilyBlockSizeMap(conf); + private final long maxsize; - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - final Map datablockEncodingMap - = createFamilyDataBlockEncodingMap(conf); - final DataBlockEncoding overriddenEncoding; - if (dataBlockEncodingStr != null) { - overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); - } else { - overriddenEncoding = null; + private final Algorithm defaultCompression; + private final boolean compactionExclude; + + private final Map compressionMap; + private final Map bloomTypeMap; + private final Map blockSizeMap; + + private final Map datablockEncodingMap; + private final DataBlockEncoding overriddenEncoding; + + private final Map writers; + private byte[] previousRow; + private final byte[] now; + private boolean rollRequested; + + /** + * Mapredue job will create a temp path for outputting results. If out != null, it means that + * the caller has set the temp working dir; If out == null, it means we need to set it here. + * Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us + * temp working dir at the table level and HFileOutputFormat2 has to set it here within this + * constructor. + */ + public HFileRecordWriter(final TaskAttemptContext taContext, final Path out) + throws IOException { + // Get the path of the temporary output file + context = taContext; + + if (out == null) { + outputPath = FileOutputFormat.getOutputPath(context); + outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); + } else { + outputPath = out; + outputDir = outputPath; + } + + conf = context.getConfiguration(); + fs = outputDir.getFileSystem(conf); + + // These configs. are from hbase-*.xml + maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); + + // Invented config. Add to hbase-*.xml if other than default compression. + String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); + defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); + compactionExclude = + conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + + // create a map from column family to the compression algorithm + compressionMap = createFamilyCompressionMap(conf); + bloomTypeMap = createFamilyBloomTypeMap(conf); + blockSizeMap = createFamilyBlockSizeMap(conf); + + // Config for data block encoding + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); + datablockEncodingMap = createFamilyDataBlockEncodingMap(conf); + if (dataBlockEncodingStr != null) { + overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); + } else { + overriddenEncoding = null; + } + + writers = new TreeMap(Bytes.BYTES_COMPARATOR); + previousRow = HConstants.EMPTY_BYTE_ARRAY; + now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + rollRequested = false; } - return new RecordWriter() { - // Map of families to writers and how much has been output on the writer. - private final Map writers = - new TreeMap(Bytes.BYTES_COMPARATOR); - private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); - private boolean rollRequested = false; + @Override + public void write(ImmutableBytesWritable row, V cell) throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - @Override - public void write(ImmutableBytesWritable row, V cell) - throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } - // null input == user explicitly wants to flush - if (row == null && kv == null) { - rollWriters(); - return; - } + byte[] rowKey = CellUtil.cloneRow(kv); + long length = kv.getLength(); + byte[] family = CellUtil.cloneFamily(kv); + WriterLength wl = this.writers.get(family); - byte [] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); - byte [] family = CellUtil.cloneFamily(kv); - WriterLength wl = this.writers.get(family); + // If this is a new column family, verify that the directory exists + if (wl == null) { + fs.mkdirs(new Path(outputDir, Bytes.toString(family))); + } - // If this is a new column family, verify that the directory exists - if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); - } + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } - // If any of the HFiles for the column families has reached - // maxsize, we need to roll all the writers - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - - // create a new WAL writer, if necessary - if (wl == null || wl.writer == null) { - if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - HRegionLocation loc = null; - String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); - if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(conf); - RegionLocator locator = - connection.getRegionLocator(TableName.valueOf(tableName))) { - loc = locator.getRegionLocation(rowKey); - } catch (Throwable e) { - LOG.warn("there's something wrong when locating rowkey: " + - Bytes.toString(rowKey), e); - loc = null; - } + // create a new WAL writer, if necessary + if (wl == null || wl.writer == null) { + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + HRegionLocation loc = null; + String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (tableName != null) { + try (Connection connection = ConnectionFactory.createConnection(conf); + RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { + loc = locator.getRegionLocation(rowKey); + } catch (Throwable e) { + LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), + e); + loc = null; } + } - if (null == loc) { + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "failed to get region location, so use default writer: " + Bytes.toString(rowKey)); + } + wl = getNewWriter(family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { if (LOG.isTraceEnabled()) { - LOG.trace("failed to get region location, so use default writer: " + - Bytes.toString(rowKey)); + LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); } wl = getNewWriter(family, conf, null); } else { if (LOG.isDebugEnabled()) { - LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); - } - InetSocketAddress initialIsa = - new InetSocketAddress(loc.getHostname(), loc.getPort()); - if (initialIsa.isUnresolved()) { - if (LOG.isTraceEnabled()) { - LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" - + loc.getPort() + ", so use default writer"); - } - wl = getNewWriter(family, conf, null); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); - } - wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); } + wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); } - } else { - wl = getNewWriter(family, conf, null); } - } - - // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - - // Copy the row so we know when a row transition. - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info("Writer=" + wl.writer.getPath() + - ((wl.written == 0)? "": ", wrote=" + wl.written)); - close(wl.writer); - } - wl.writer = null; - wl.written = 0; - } - this.rollRequested = false; - } - - /* Create a new StoreFile.Writer. - * @param family - * @return A WriterLength, containing a new StoreFile.Writer. - * @throws IOException - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", - justification="Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf, - InetSocketAddress[] favoredNodes) throws IOException { - WriterLength wl = new WriterLength(); - Path familydir = new Path(outputdir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); - compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(family); - bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(family); - blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; - DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; - encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder() - .withCompression(compression) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) - .withBlockSize(blockSize); - - if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { - contextBuilder.withIncludesTags(true); - } - - contextBuilder.withDataBlockEncoding(encoding); - HFileContext hFileContext = contextBuilder.build(); - - if (null == favoredNodes) { - wl.writer = - new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); } else { - wl.writer = - new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build(); - } - - this.writers.put(family, wl); - return wl; - } - - private void close(final StoreFileWriter w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); + wl = getNewWriter(family, conf, null); } } - @Override - public void close(TaskAttemptContext c) - throws IOException, InterruptedException { - for (WriterLength wl: this.writers.values()) { + // we now have the proper WAL writer. full steam ahead + kv.updateLatestStamp(this.now); + wl.writer.append(kv); + wl.written += length; + + // Copy the row so we know when a row transition. + this.previousRow = rowKey; + } + + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info( + "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); close(wl.writer); } + wl.writer = null; + wl.written = 0; } - }; + this.rollRequested = false; + } + + /* + * Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", + justification = "Not important") + private WriterLength getNewWriter(byte[] family, Configuration conf, + InetSocketAddress[] favoredNodes) throws IOException { + WriterLength wl = new WriterLength(); + Path familyDir = new Path(outputDir, Bytes.toString(family)); + Algorithm compression = compressionMap.get(family); + compression = compression == null ? defaultCompression : compression; + BloomType bloomType = bloomTypeMap.get(family); + bloomType = bloomType == null ? BloomType.NONE : bloomType; + Integer blockSize = blockSizeMap.get(family); + blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; + DataBlockEncoding encoding = overriddenEncoding; + encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; + encoding = encoding == null ? DataBlockEncoding.NONE : encoding; + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize); + + if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + contextBuilder.withIncludesTags(true); + } + + contextBuilder.withDataBlockEncoding(encoding); + HFileContext hFileContext = contextBuilder.build(); + + if (null == favoredNodes) { + wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familyDir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familyDir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + + this.writers.put(family, wl); + return wl; + } + + private void close(final StoreFileWriter w) throws IOException { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + w.appendTrackedTimestampsToMetadata(); + w.close(); + } + } + + @Override + public void close(TaskAttemptContext c) throws IOException, InterruptedException { + for (WriterLength wl : this.writers.values()) { + close(wl.writer); + } + } } /* @@ -503,7 +534,7 @@ public class HFileOutputFormat2 TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + regionLocator.getName() + " output configured."); } - + public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws IOException { Configuration conf = job.getConfiguration(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java new file mode 100644 index 00000000000..7c1ebbc9bf4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; + +import com.google.common.annotations.VisibleForTesting; +/** + * Create 3 level tree directory, first level is using table name as parent directory and then use + * family name as child directory, and all related HFiles for one family are under child directory + * -tableName1 + * -columnFamilyName1 + * -columnFamilyName2 + * -HFiles + * -tableName2 + * -columnFamilyName1 + * -HFiles + * -columnFamilyName2 + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +@VisibleForTesting +public class MultiHFileOutputFormat extends FileOutputFormat { + private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class); + + @Override + public RecordWriter + getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { + return createMultiHFileRecordWriter(context); + } + + static RecordWriter + createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException { + + // Get the path of the output directory + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final FileSystem fs = outputDir.getFileSystem(conf); + + // Map of tables to writers + final Map> tableWriters = + new HashMap>(); + + return new RecordWriter() { + @Override + public void write(ImmutableBytesWritable tableName, V cell) + throws IOException, InterruptedException { + RecordWriter tableWriter = tableWriters.get(tableName); + // if there is new table, verify that table directory exists + if (tableWriter == null) { + // using table name as directory name + final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes())); + fs.mkdirs(tableOutputDir); + LOG.info("Writing Table '" + tableName.toString() + "' data into following directory" + + tableOutputDir.toString()); + + // Create writer for one specific table + tableWriter = new HFileOutputFormat2.HFileRecordWriter(context, tableOutputDir); + // Put table into map + tableWriters.put(tableName, tableWriter); + } + // Write into tableWriter + // in the original code, it does not use Row + tableWriter.write(null, cell); + } + + @Override + public void close(TaskAttemptContext c) throws IOException, InterruptedException { + for (RecordWriter writer : tableWriters.values()) { + writer.close(c); + } + } + }; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java new file mode 100644 index 00000000000..738ae5ff9b8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and + * writes hfiles. + */ +@Category(MediumTests.class) +public class TestMultiHFileOutputFormat { + private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static int ROWSPERSPLIT = 10; + + private static final int KEYLEN_DEFAULT = 10; + private static final String KEYLEN_CONF = "randomkv.key.length"; + + private static final int VALLEN_DEFAULT = 10; + private static final String VALLEN_CONF = "randomkv.val.length"; + + private static final byte[][] TABLES = + { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), + Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; + + private static final byte[][] FAMILIES = + { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; + + private static final byte[] QUALIFIER = Bytes.toBytes("data"); + + public static void main(String[] args) throws Exception { + new TestMultiHFileOutputFormat().testWritingDataIntoHFiles(); + } + + /** + * Run small MR job. this MR job will write HFile into + * testWritingDataIntoHFiles/tableNames/columFamilies/ + */ + @Test + public void testWritingDataIntoHFiles() throws Exception { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + try { + Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); + + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(Table_KeyValueSortReducer.class); + job.setOutputFormatClass(MultiHFileOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("\nStarting test testWritingDataIntoHFiles\n"); + assertTrue(job.waitForCompletion(true)); + LOG.info("\nWaiting on checking MapReduce output\n"); + assertTrue(checkMROutput(fs, testDir, 0)); + } finally { + testDir.getFileSystem(conf).delete(testDir, true); + util.shutdownMiniCluster(); + } + } + + /** + * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the + * created directory is correct or not A recursion method, the testDir had better be small size + */ + private boolean checkMROutput(FileSystem fs, Path testDir, int level) + throws FileNotFoundException, IOException { + if (level >= 3) { + return HFile.isHFileFormat(fs, testDir); + } + FileStatus[] fStats = fs.listStatus(testDir); + if (fStats == null || fStats.length <= 0) { + LOG.info("Created directory format is not correct"); + return false; + } + + for (FileStatus stats : fStats) { + // skip the _SUCCESS file created by MapReduce + if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) + continue; + if (level < 2 && !stats.isDirectory()) { + LOG.info("Created directory format is not correct"); + return false; + } + boolean flag = checkMROutput(fs, stats.getPath(), level + 1); + if (flag == false) return false; + } + return true; + } + + /** + * Simple mapper that makes output. With no input data + */ + static class Random_TableKV_GeneratingMapper + extends Mapper { + + private int keyLength; + private int valLength; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + @Override + protected void map(NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException, InterruptedException { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + ArrayList tables = new ArrayList(); + for (int i = 0; i < TABLES.length; i++) { + tables.add(new ImmutableBytesWritable(TABLES[i])); + } + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + Random random = new Random(); + + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + + for (ImmutableBytesWritable table : tables) { + for (byte[] family : FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(table, kv); + } + } + } + } + } + + /** + * Simple Reducer that have input , with KeyValues have no order. and output + * , with KeyValues are ordered + */ + + static class Table_KeyValueSortReducer + extends Reducer { + protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + for (KeyValue kv : kvs) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(table, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } + } + +}