From d5e206dfa06513c0ca26e7f2c93fc38f33703494 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 27 Jun 2017 12:33:39 -0700 Subject: [PATCH] HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor) --- .../hbase/mapreduce/HFileOutputFormat2.java | 803 ++++++++++-------- .../MultiTableHFileOutputFormat.java | 536 ++---------- .../mapreduce/TestHFileOutputFormat2.java | 424 +++++---- .../TestMultiTableHFileOutputFormat.java | 382 --------- 4 files changed, 775 insertions(+), 1370 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index da507b13d0d..f847608846a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -22,14 +22,20 @@ import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -88,24 +94,48 @@ import com.google.common.annotations.VisibleForTesting; * all HFiles being written. *

* Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}. + * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)}. */ @InterfaceAudience.Public public class HFileOutputFormat2 extends FileOutputFormat { private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class); + static class TableInfo { + private HTableDescriptor hTableDescriptor; + private RegionLocator regionLocator; + + public TableInfo(HTableDescriptor hTableDescriptor, RegionLocator regionLocator) { + this.hTableDescriptor = hTableDescriptor; + this.regionLocator = regionLocator; + } + + public HTableDescriptor getHTableDescriptor() { + return hTableDescriptor; + } + + public RegionLocator getRegionLocator() { + return regionLocator; + } + } + + protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8); + + protected static byte[] combineTableNameSuffix(byte[] tableName, + byte[] suffix ) { + return Bytes.add(tableName, tableSeparator, suffix); + } // The following constants are private since these are used by // HFileOutputFormat2 to internally transfer data between job setup and // reducer run using conf. // These should not be changed by the client. - private static final String COMPRESSION_FAMILIES_CONF_KEY = + static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = + static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; - private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = + static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; - private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = + static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; // This constant is public since the client can modify this when setting @@ -121,8 +151,10 @@ public class HFileOutputFormat2 public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; - private static final String OUTPUT_TABLE_NAME_CONF_KEY = + static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = + "hbase.mapreduce.use.multi.table.hfileoutputformat"; public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy"; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -133,270 +165,277 @@ public class HFileOutputFormat2 return createRecordWriter(context); } - static RecordWriter - createRecordWriter(final TaskAttemptContext context) throws IOException { - return new HFileRecordWriter<>(context, null); + protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { + return combineTableNameSuffix(tableName, family); } - protected static class HFileRecordWriter - extends RecordWriter { - private final TaskAttemptContext context; - private final Path outputPath; - private final Path outputDir; - private final Configuration conf; - private final FileSystem fs; + static RecordWriter + createRecordWriter(final TaskAttemptContext context) + throws IOException { - private final long maxsize; + // Get the path of the temporary output file + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; + final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (writeTableNames==null || writeTableNames.isEmpty()) { + throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY + + " cannot be empty"); + } + final FileSystem fs = outputDir.getFileSystem(conf); + // These configs. are from hbase-*.xml + final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); + // Invented config. Add to hbase-*.xml if other than default compression. + final String defaultCompressionStr = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + final Algorithm defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr); + final boolean compactionExclude = conf.getBoolean( + "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - private final Algorithm defaultCompression; - private final boolean compactionExclude; + final Set allTableNames = Arrays.stream(writeTableNames.split( + Bytes.toString(tableSeparator))).collect(Collectors.toSet()); - private final Map compressionMap; - private final Map bloomTypeMap; - private final Map blockSizeMap; + // create a map from column family to the compression algorithm + final Map compressionMap = createFamilyCompressionMap(conf); + final Map bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map blockSizeMap = createFamilyBlockSizeMap(conf); - private final Map datablockEncodingMap; - private final DataBlockEncoding overriddenEncoding; - - private final Map writers; - private byte[] previousRow; - private final byte[] now; - private boolean rollRequested; - - /** - * Mapredue job will create a temp path for outputting results. If out != null, it means that - * the caller has set the temp working dir; If out == null, it means we need to set it here. - * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us - * temp working dir at the table level and HFileOutputFormat2 has to set it here within this - * constructor. - */ - public HFileRecordWriter(final TaskAttemptContext taContext, final Path out) - throws IOException { - // Get the path of the temporary output file - context = taContext; - - if (out == null) { - outputPath = FileOutputFormat.getOutputPath(context); - outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); - } else { - outputPath = out; - outputDir = outputPath; - } - - conf = context.getConfiguration(); - fs = outputDir.getFileSystem(conf); - - // These configs. are from hbase-*.xml - maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); - - // Invented config. Add to hbase-*.xml if other than default compression. - String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); - defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); - compactionExclude = - conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - - // create a map from column family to the compression algorithm - compressionMap = createFamilyCompressionMap(conf); - bloomTypeMap = createFamilyBloomTypeMap(conf); - blockSizeMap = createFamilyBlockSizeMap(conf); - - // Config for data block encoding - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - datablockEncodingMap = createFamilyDataBlockEncodingMap(conf); - if (dataBlockEncodingStr != null) { - overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); - } else { - overriddenEncoding = null; - } - - writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); - previousRow = HConstants.EMPTY_BYTE_ARRAY; - now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); - rollRequested = false; + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); + final Map datablockEncodingMap + = createFamilyDataBlockEncodingMap(conf); + final DataBlockEncoding overriddenEncoding; + if (dataBlockEncodingStr != null) { + overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); + } else { + overriddenEncoding = null; } - @Override - public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + return new RecordWriter() { + // Map of families to writers and how much has been output on the writer. + private final Map writers = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + private boolean rollRequested = false; - // null input == user explicitly wants to flush - if (row == null && kv == null) { - rollWriters(); - return; - } + @Override + public void write(ImmutableBytesWritable row, V cell) + throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); - byte[] family = CellUtil.cloneFamily(kv); - WriterLength wl = this.writers.get(family); + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } - // If this is a new column family, verify that the directory exists - if (wl == null) { - Path cfPath = new Path(outputDir, Bytes.toString(family)); - fs.mkdirs(cfPath); - configureStoragePolicy(conf, fs, family, cfPath); - } - - // If any of the HFiles for the column families has reached - // maxsize, we need to roll all the writers - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - - // create a new WAL writer, if necessary - if (wl == null || wl.writer == null) { - if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - HRegionLocation loc = null; - String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); - if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(conf); - RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { - loc = locator.getRegionLocation(rowKey); - } catch (Throwable e) { - LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), - e); - loc = null; - } - } - - if (null == loc) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "failed to get region location, so use default writer: " + Bytes.toString(rowKey)); - } - wl = getNewWriter(family, conf, null); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); - } - InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); - if (initialIsa.isUnresolved()) { - if (LOG.isTraceEnabled()) { - LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" - + loc.getPort() + ", so use default writer"); - } - wl = getNewWriter(family, conf, null); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); - } - wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); - } + byte[] rowKey = CellUtil.cloneRow(kv); + long length = kv.getLength(); + byte[] family = CellUtil.cloneFamily(kv); + byte[] tableNameBytes = null; + if (writeMultipleTables) { + tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); + if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { + throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + + "' not" + " expected"); } } else { - wl = getNewWriter(family, conf, null); + tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8); + } + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); + WriterLength wl = this.writers.get(tableAndFamily); + + // If this is a new column family, verify that the directory exists + if (wl == null) { + Path writerPath = null; + if (writeMultipleTables) { + writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes + .toString(family))); + } + else { + writerPath = new Path(outputDir, Bytes.toString(family)); + } + fs.mkdirs(writerPath); + configureStoragePolicy(conf, fs, tableAndFamily, writerPath); + } + + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } + + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } + + // create a new WAL writer, if necessary + if (wl == null || wl.writer == null) { + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + HRegionLocation loc = null; + + String tableName = Bytes.toString(tableNameBytes); + if (tableName != null) { + try (Connection connection = ConnectionFactory.createConnection(conf); + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { + loc = locator.getRegionLocation(rowKey); + } catch (Throwable e) { + LOG.warn("There's something wrong when locating rowkey: " + + Bytes.toString(rowKey) + " for tablename: " + tableName, e); + loc = null; + } } + + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to get region location, so use default writer for rowkey: " + + Bytes.toString(rowKey)); + } + wl = getNewWriter(tableNameBytes, family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + } + wl = getNewWriter(tableNameBytes, family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); + } + wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa + }); + } + } + } else { + wl = getNewWriter(tableNameBytes, family, conf, null); + } + } + + // we now have the proper WAL writer. full steam ahead + kv.updateLatestStamp(this.now); + wl.writer.append(kv); + wl.written += length; + + // Copy the row so we know when a row transition. + this.previousRow = rowKey; + } + + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info( + "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); + close(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + this.rollRequested = false; + } + + /* + * Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", + justification="Not important") + private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration + conf, InetSocketAddress[] favoredNodes) throws IOException { + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); + Path familydir = new Path(outputDir, Bytes.toString(family)); + if (writeMultipleTables) { + familydir = new Path(outputDir, + new Path(Bytes.toString(tableName), Bytes.toString(family))); + } + WriterLength wl = new WriterLength(); + Algorithm compression = compressionMap.get(tableAndFamily); + compression = compression == null ? defaultCompression : compression; + BloomType bloomType = bloomTypeMap.get(tableAndFamily); + bloomType = bloomType == null ? BloomType.NONE : bloomType; + Integer blockSize = blockSizeMap.get(tableAndFamily); + blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; + DataBlockEncoding encoding = overriddenEncoding; + encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; + encoding = encoding == null ? DataBlockEncoding.NONE : encoding; + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + HFileContextBuilder contextBuilder = new HFileContextBuilder() + .withCompression(compression) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(blockSize); + + if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + contextBuilder.withIncludesTags(true); + } + + contextBuilder.withDataBlockEncoding(encoding); + HFileContext hFileContext = contextBuilder.build(); + if (null == favoredNodes) { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + + this.writers.put(tableAndFamily, wl); + return wl; + } + + private void close(final StoreFileWriter w) throws IOException { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + w.appendTrackedTimestampsToMetadata(); + w.close(); } } - // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - - // Copy the row so we know when a row transition. - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info( - "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); + @Override + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + for (WriterLength wl: this.writers.values()) { close(wl.writer); } - wl.writer = null; - wl.written = 0; } - this.rollRequested = false; - } - - /* - * Create a new StoreFile.Writer. - * @param family - * @return A WriterLength, containing a new StoreFile.Writer. - * @throws IOException - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", - justification = "Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf, - InetSocketAddress[] favoredNodes) throws IOException { - WriterLength wl = new WriterLength(); - Path familyDir = new Path(outputDir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); - compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(family); - bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(family); - blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; - DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; - encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize); - - if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { - contextBuilder.withIncludesTags(true); - } - - contextBuilder.withDataBlockEncoding(encoding); - HFileContext hFileContext = contextBuilder.build(); - - if (null == favoredNodes) { - wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familyDir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); - } else { - wl.writer = - new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familyDir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build(); - } - - this.writers.put(family, wl); - return wl; - } - - private void close(final StoreFileWriter w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); - } - } - - @Override - public void close(TaskAttemptContext c) throws IOException, InterruptedException { - for (WriterLength wl : this.writers.values()) { - close(wl.writer); - } - } + }; } /** * Configure block storage policy for CF after the directory is created. */ static void configureStoragePolicy(final Configuration conf, final FileSystem fs, - byte[] family, Path cfPath) { - if (null == conf || null == fs || null == family || null == cfPath) { + byte[] tableAndFamily, Path cfPath) { + if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { return; } String policy = - conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family), + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), conf.get(STORAGE_POLICY_PROPERTY)); FSUtils.setStoragePolicy(fs, cfPath, policy); } @@ -413,12 +452,29 @@ public class HFileOutputFormat2 * Return the start keys of all of the regions in this table, * as a list of ImmutableBytesWritable. */ - private static List getRegionStartKeys(RegionLocator table) - throws IOException { - byte[][] byteKeys = table.getStartKeys(); - ArrayList ret = new ArrayList<>(byteKeys.length); - for (byte[] byteKey : byteKeys) { - ret.add(new ImmutableBytesWritable(byteKey)); + private static List getRegionStartKeys(List regionLocators, + boolean writeMultipleTables) + throws IOException { + + ArrayList ret = new ArrayList<>(); + for(RegionLocator regionLocator : regionLocators) + { + TableName tableName = regionLocator.getName(); + LOG.info("Looking up current regions for table " + tableName); + byte[][] byteKeys = regionLocator.getStartKeys(); + for (byte[] byteKey : byteKeys) { + byte[] fullKey = byteKey; //HFileOutputFormat2 use case + if (writeMultipleTables) + { + //MultiTableHFileOutputFormat use case + fullKey = combineTableNameSuffix(tableName.getName(), byteKey); + } + if (LOG.isDebugEnabled()) { + LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary + (fullKey) + "]"); + } + ret.add(new ImmutableBytesWritable(fullKey)); + } } return ret; } @@ -429,7 +485,7 @@ public class HFileOutputFormat2 */ @SuppressWarnings("deprecation") private static void writePartitions(Configuration conf, Path partitionsPath, - List startKeys) throws IOException { + List startKeys, boolean writeMultipleTables) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); @@ -440,14 +496,17 @@ public class HFileOutputFormat2 // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 TreeSet sorted = new TreeSet<>(startKeys); - ImmutableBytesWritable first = sorted.first(); + if (writeMultipleTables) { + first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first + ().get())); + } if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { throw new IllegalArgumentException( "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } - sorted.remove(first); + sorted.remove(sorted.first()); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); @@ -499,17 +558,25 @@ public class HFileOutputFormat2 */ public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); + ArrayList singleTableInfo = new ArrayList<>(); + singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); + configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } - static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Class> cls) throws IOException, - UnsupportedEncodingException { + static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); + if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { + throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); + } + boolean writeMultipleTables = false; + if (MultiTableHFileOutputFormat.class.equals(cls)) { + writeMultipleTables = true; + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); + } // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. @@ -528,28 +595,44 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - // record this table name for creating writer by favored nodes LOG.info("bulkload locality sensitive enabled"); - conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); } + /* Now get the region start keys for every table required */ + List allTableNames = new ArrayList<>(multiTableInfo.size()); + List regionLocators = new ArrayList<>( multiTableInfo.size()); + List tableDescriptors = new ArrayList<>( multiTableInfo.size()); + + for( TableInfo tableInfo : multiTableInfo ) + { + regionLocators.add(tableInfo.getRegionLocator()); + allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); + tableDescriptors.add(tableInfo.getHTableDescriptor()); + } + // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes + .toString(tableSeparator))); + List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + regionLocator.getName()); - List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count"); + "to match current region count for all tables"); job.setNumReduceTasks(startKeys.size()); - configurePartitioner(job, startKeys); + configurePartitioner(job, startKeys, writeMultipleTables); // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); + + conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, + tableDescriptors)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, + tableDescriptors)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, + tableDescriptors)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + regionLocator.getName() + " output configured."); + LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); } public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws @@ -560,11 +643,19 @@ public class HFileOutputFormat2 job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat2.class); + ArrayList singleTableDescriptor = new ArrayList<>(1); + singleTableDescriptor.add(tableDescriptor); + + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getNameAsString()); // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); + conf.set(COMPRESSION_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); @@ -667,7 +758,7 @@ public class HFileOutputFormat2 continue; } try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8), URLDecoder.decode(familySplit[1], "UTF-8")); } catch (UnsupportedEncodingException e) { // will not happen with UTF-8 encoding @@ -681,7 +772,8 @@ public class HFileOutputFormat2 * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, List splitPoints) + static void configurePartitioner(Job job, List splitPoints, boolean + writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file @@ -691,7 +783,7 @@ public class HFileOutputFormat2 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints); + writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it @@ -699,6 +791,34 @@ public class HFileOutputFormat2 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + @VisibleForTesting + static String serializeColumnFamilyAttribute(Function fn, List allTables) + throws UnsupportedEncodingException { + StringBuilder attributeValue = new StringBuilder(); + int i = 0; + for (HTableDescriptor tableDescriptor : allTables) { + if (tableDescriptor == null) { + // could happen with mock table instance + // CODEREVIEW: Can I set an empty string in conf if mock table instance? + return ""; + } + Collection families = tableDescriptor.getFamilies(); + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + attributeValue.append('&'); + } + attributeValue.append(URLEncoder.encode( + Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), + "UTF-8")); + attributeValue.append('='); + attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); + } + } + // Get rid of the last ampersand + return attributeValue.toString(); + } + /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. @@ -708,134 +828,65 @@ public class HFileOutputFormat2 * @throws IOException * on failure to read column family descriptors */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting - static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) - throws UnsupportedEncodingException { - StringBuilder compressionConfigValue = new StringBuilder(); - if(tableDescriptor == null){ - // could happen with mock table instance - return; - } - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - compressionConfigValue.append('&'); - } - compressionConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - compressionConfigValue.append('='); - compressionConfigValue.append(URLEncoder.encode( - familyDescriptor.getCompressionType().getName(), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); - } + static Function compressionDetails = familyDescriptor -> + familyDescriptor.getCompressionType().getName(); /** - * Serialize column family to block size map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into + * Serialize column family to block size map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - StringBuilder blockSizeConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - blockSizeConfigValue.append('&'); - } - blockSizeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - blockSizeConfigValue.append('='); - blockSizeConfigValue.append(URLEncoder.encode( - String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); - } + static Function blockSizeDetails = familyDescriptor -> String + .valueOf(familyDescriptor.getBlocksize()); /** - * Serialize column family to bloom type map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into + * Serialize column family to bloom type map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; + static Function bloomTypeDetails = familyDescriptor -> { + String bloomType = familyDescriptor.getBloomFilterType().toString(); + if (bloomType == null) { + bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; } - StringBuilder bloomTypeConfigValue = new StringBuilder(); - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - bloomTypeConfigValue.append('&'); - } - bloomTypeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; - } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); - } - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); - } + return bloomType; + }; /** * Serialize column family to data block encoding map to configuration. * Invoked while configuring the MR job for incremental load. * - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, - Configuration conf) throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; + static Function dataBlockEncodingDetails = familyDescriptor -> { + DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); + if (encoding == null) { + encoding = DataBlockEncoding.NONE; } - StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - dataBlockEncodingConfigValue.append('&'); - } - dataBlockEncodingConfigValue.append( - URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - dataBlockEncodingConfigValue.append('='); - DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); - if (encoding == null) { - encoding = DataBlockEncoding.NONE; - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), - "UTF-8")); - } - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, - dataBlockEncodingConfigValue.toString()); - } + return encoding.toString(); + }; + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java index 03256bf9338..fdcf30ea50d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java @@ -6,504 +6,118 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hbase.mapreduce; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.TreeMap; -import java.util.ArrayList; -import java.util.TreeSet; -import java.util.Collections; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; /** - * Create 3 level tree directory, first level is using table name as parent directory and then use - * family name as child directory, and all related HFiles for one family are under child directory + * Create 3 level tree directory, first level is using table name as parent + * directory and then use family name as child directory, and all related HFiles + * for one family are under child directory * -tableName1 - * -columnFamilyName1 - * -HFile (region1) - * -columnFamilyName2 - * -HFile1 (region1) - * -HFile2 (region2) - * -HFile3 (region3) + * -columnFamilyName1 + * -columnFamilyName2 + * -HFiles * -tableName2 - * -columnFamilyName1 - * -HFile (region1) - * family directory and its hfiles match the output of HFileOutputFormat2 - * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 + * -columnFamilyName1 + * -HFiles + * -columnFamilyName2 */ - @InterfaceAudience.Public @VisibleForTesting -public class MultiTableHFileOutputFormat extends FileOutputFormat { +public class MultiTableHFileOutputFormat extends HFileOutputFormat2 { private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class); - @Override - public RecordWriter - getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { - return createMultiHFileRecordWriter(context); - } - - static RecordWriter - createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException { - - // Get the path of the output directory - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); - final Configuration conf = context.getConfiguration(); - final FileSystem fs = outputDir.getFileSystem(conf); - - Connection conn = ConnectionFactory.createConnection(conf); - Admin admin = conn.getAdmin(); - - // Map of existing tables, avoid calling getTable() everytime - final Map tables = new HashMap<>(); - - // Map of tables to writers - final Map> tableWriters = new HashMap<>(); - - return new RecordWriter() { - @Override - public void write(ImmutableBytesWritable tableName, V cell) - throws IOException, InterruptedException { - RecordWriter tableWriter = tableWriters.get(tableName); - // if there is new table, verify that table directory exists - if (tableWriter == null) { - // using table name as directory name - final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes())); - fs.mkdirs(tableOutputDir); - LOG.info("Writing Table '" + tableName.toString() + "' data into following directory" - + tableOutputDir.toString()); - // Configure for tableWriter, if table exist, write configuration of table into conf - Table table = null; - if (tables.containsKey(tableName)) { - table = tables.get(tableName); - } else { - table = getTable(tableName.copyBytes(), conn, admin); - tables.put(tableName, table); - } - if (table != null) { - configureForOneTable(conf, table.getTableDescriptor()); - } - // Create writer for one specific table - tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir); - // Put table into map - tableWriters.put(tableName, tableWriter); - } - // Write into tableWriter - // in the original code, it does not use Row - tableWriter.write(null, cell); - } - - @Override - public void close(TaskAttemptContext c) throws IOException, InterruptedException { - for (RecordWriter writer : tableWriters.values()) { - writer.close(c); - } - if (conn != null) { - conn.close(); - } - if (admin != null) { - admin.close(); - } - } - }; - } - /** - * Configure for one table, should be used before creating a new HFileRecordWriter, - * Set compression algorithms and related configuration based on column families + * Creates a composite key to use as a mapper output key when using + * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job + * + * @param tableName Name of the Table - Eg: TableName.getNameAsString() + * @param suffix Usually represents a rowkey when creating a mapper key or column family + * @return byte[] representation of composite key */ - private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor) - throws UnsupportedEncodingException { - HFileOutputFormat2.configureCompression(conf, tableDescriptor); - HFileOutputFormat2.configureBlockSize(tableDescriptor, conf); - HFileOutputFormat2.configureBloomType(tableDescriptor, conf); - HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); + public static byte[] createCompositeKey(byte[] tableName, + byte[] suffix) { + return combineTableNameSuffix(tableName, suffix); } /** - * Configure a MapReduce Job to output HFiles for performing an incremental load into - * the multiple tables. - *

+ * Alternate api which accepts an ImmutableBytesWritable for the suffix + * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) + */ + public static byte[] createCompositeKey(byte[] tableName, + ImmutableBytesWritable suffix) { + return combineTableNameSuffix(tableName, suffix.get()); + } + + /** + * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the + * suffix + * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) + */ + public static byte[] createCompositeKey(String tableName, + ImmutableBytesWritable suffix) { + return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get()); + } + + /** + * Analogous to + * {@link HFileOutputFormat2#configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)}, + * this function will configure the requisite number of reducers to write HFiles for multple + * tables simultaneously * - * ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job. - * Caller needs to setup input path, output path and mapper - * - * @param job - * @param tables A list of tables to inspects + * @param job See {@link org.apache.hadoop.mapreduce.Job} + * @param multiTableDescriptors Table descriptor and region locator pairs * @throws IOException */ - public static void configureIncrementalLoad(Job job, List tables) throws IOException { - configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class); + public static void configureIncrementalLoad(Job job, List + multiTableDescriptors) + throws IOException { + MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors, + MultiTableHFileOutputFormat.class); } - public static void configureIncrementalLoad(Job job, List tables, - Class> cls) throws IOException { + final private static int validateCompositeKey(byte[] keyBytes) { - Configuration conf = job.getConfiguration(); - Map> tableSplitKeys = - MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables); - configureIncrementalLoad(job, tableSplitKeys, cls); + int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator); + + // Either the separator was not found or a tablename wasn't present or a key wasn't present + if (separatorIdx == -1) { + throw new IllegalArgumentException("Invalid format for composite key [" + Bytes + .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key"); + } + return separatorIdx; } - /** - * Same purpose as configureIncrementalLoad(Job job, List tables) - * Used when region startKeys of each table is available, input as > - * - * Caller needs to transfer TableName and byte[] to ImmutableBytesWritable - */ - public static void configureIncrementalLoad(Job job, Map> tableSplitKeys) throws IOException { - configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class); + protected static byte[] getTableName(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, 0, separatorIdx); } - public static void configureIncrementalLoad(Job job, Map> tableSplitKeys, Class> cls) throws IOException { - Configuration conf = job.getConfiguration(); - - // file path to store - String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); - LOG.info("Writing partition info into dir: " + partitionsPath.toString()); - job.setPartitionerClass(MultiHFilePartitioner.class); - // get split keys for all the tables, and write them into partition file - MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys); - MultiHFilePartitioner.setPartitionFile(conf, partitionsPath); - partitionsPath.getFileSystem(conf).makeQualified(partitionsPath); - partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath); - - // now only support Mapper output - // we can use KeyValueSortReducer directly to sort Mapper output - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); - } else { - LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); - } - int reducerNum = getReducerNumber(tableSplitKeys); - job.setNumReduceTasks(reducerNum); - LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count"); - - // setup output format - job.setOutputFormatClass(cls); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - - job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); + protected static byte[] getSuffix(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1); } - - /** - * Check if table exist, should not dependent on HBase instance - * @return instance of table, if it exist - */ - private static Table getTable(final byte[] tableName, Connection conn, Admin admin) { - if (conn == null || admin == null) { - LOG.info("can not get Connection or Admin"); - return null; - } - - try { - TableName table = TableName.valueOf(tableName); - if (admin.tableExists(table)) { - return conn.getTable(table); - } - } catch (IOException e) { - LOG.info("Exception found in getTable()" + e.toString()); - return null; - } - - LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist"); - return null; - } - - /** - * Get the number of reducers by tables' split keys - */ - private static int getReducerNumber( - Map> tableSplitKeys) { - int reducerNum = 0; - for (Map.Entry> entry : tableSplitKeys.entrySet()) { - reducerNum += entry.getValue().size(); - } - return reducerNum; - } - - /** - * MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner - * The input is partitioned based on table's name and its region boundaries with the table. - * Two records are in the same partition if they have same table name and the their cells are - * in the same region - */ - static class MultiHFilePartitioner extends Partitioner - implements Configurable { - - public static final String DEFAULT_PATH = "_partition_multihfile.lst"; - public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path"; - private Configuration conf; - // map to receive from file - private Map> table_SplitKeys; - // each pair is map to one unique integer - private TreeMap partitionMap; - - @Override - public void setConf(Configuration conf) { - try { - this.conf = conf; - partitionMap = new TreeMap<>(); - table_SplitKeys = readTableSplitKeys(conf); - - // initiate partitionMap by table_SplitKeys map - int splitNum = 0; - for (Map.Entry> entry : table_SplitKeys.entrySet()) { - ImmutableBytesWritable table = entry.getKey(); - List list = entry.getValue(); - for (ImmutableBytesWritable splitKey : list) { - partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++); - } - } - } catch (IOException e) { - throw new IllegalArgumentException("Can't read partitions file", e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - /** - * Set the path to the SequenceFile storing the sorted . It must be the case - * that for R reduces, there are R-1 keys in the SequenceFile. - */ - public static void setPartitionFile(Configuration conf, Path p) { - conf.set(PARTITIONER_PATH, p.toString()); - } - - /** - * Get the path to the SequenceFile storing the sorted . - * @see #setPartitionFile(Configuration, Path) - */ - public static String getPartitionFile(Configuration conf) { - return conf.get(PARTITIONER_PATH, DEFAULT_PATH); - } - - - /** - * Return map of - */ - public static Map> getTablesRegionStartKeys( - Configuration conf, List tables) throws IOException { - final TreeMap> ret = new TreeMap<>(); - - try (Connection conn = ConnectionFactory.createConnection(conf); - Admin admin = conn.getAdmin()) { - LOG.info("Looking up current regions for tables"); - for (TableName tName : tables) { - RegionLocator table = conn.getRegionLocator(tName); - // if table not exist, use default split keys for this table - byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY }; - if (admin.tableExists(tName)) { - byteKeys = table.getStartKeys(); - } - List tableStartKeys = new ArrayList<>(byteKeys.length); - for (byte[] byteKey : byteKeys) { - tableStartKeys.add(new ImmutableBytesWritable(byteKey)); - } - ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys); - - } - return ret; - } - } - - /** - * write into sequence file in order, - * and this format can be parsed by MultiHFilePartitioner - */ - public static void writeTableSplitKeys(Configuration conf, Path partitionsPath, - Map> map) throws IOException { - LOG.info("Writing partition information to " + partitionsPath); - - if (map == null || map.isEmpty()) { - throw new IllegalArgumentException("No regions passed for all tables"); - } - - SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), - Writer.keyClass(ImmutableBytesWritable.class), - Writer.valueClass(ImmutableBytesWritable.class)); - - try { - for (Map.Entry> entry : map.entrySet()) { - ImmutableBytesWritable table = entry.getKey(); - List list = entry.getValue(); - if (list == null) { - throw new IOException("Split keys for a table can not be null"); - } - - TreeSet sorted = new TreeSet<>(list); - - ImmutableBytesWritable first = sorted.first(); - if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException( - "First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); - } - - for (ImmutableBytesWritable startKey : sorted) { - writer.append(table, startKey); - } - } - } finally { - writer.close(); - } - } - - /** - * read partition file into map - */ - private Map> readTableSplitKeys( - Configuration conf) throws IOException { - String parts = getPartitionFile(conf); - LOG.info("Read partition info from file: " + parts); - final Path partFile = new Path(parts); - - SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile)); - // values are already sorted in file, so use list - final Map> map = - new TreeMap<>(); - // key and value have same type - ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); - ImmutableBytesWritable value = - ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); - try { - while (reader.next(key, value)) { - - List list = map.get(key); - if (list == null) { - list = new ArrayList<>(); - } - list.add(value); - map.put(key, list); - - key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); - value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); - } - } finally { - IOUtils.cleanup(LOG, reader); - } - return map; - } - - @Override - public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) { - byte[] row = CellUtil.cloneRow(value); - final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row); - ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY); - //find splitKey by input rowKey - if (table_SplitKeys.containsKey(table)) { - List list = table_SplitKeys.get(table); - int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator()); - if (index < 0) { - index = (index + 1) * (-1) - 1; - } else if (index == list.size()) { - index -= 1; - } - if (index < 0) { - index = 0; - LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY "); - } - splitId = list.get(index); - } - - // find the id of the reducer for the input - Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId)); - if (id == null) { - LOG.warn("Can not get reducer id for input record"); - return -1; - } - return id.intValue() % numPartitions; - } - - /** - * A class store pair, has two main usage - * 1. store tableName and one of its splitKey as a pair - * 2. implement comparable, so that partitioner can find splitKey of its input cell - */ - static class TableSplitKeyPair extends Pair - implements Comparable { - - private static final long serialVersionUID = -6485999667666325594L; - - public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) { - super(a, b); - } - - @Override - public int compareTo(TableSplitKeyPair other) { - if (this.getFirst().equals(other.getFirst())) { - return this.getSecond().compareTo(other.getSecond()); - } - return this.getFirst().compareTo(other.getFirst()); - } - } - } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 3533f8a243f..87522b6e069 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -109,6 +108,9 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * Simple test for {@link HFileOutputFormat2}. * Sets up and runs a mapreduce job that writes hfile output. @@ -123,9 +125,9 @@ public class TestHFileOutputFormat2 { private static final byte[][] FAMILIES = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) - , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; - private static final TableName TABLE_NAME = - TableName.valueOf("TestTable"); + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; + private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", + "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); private HBaseTestingUtility util = new HBaseTestingUtility(); @@ -146,6 +148,9 @@ public class TestHFileOutputFormat2 { private static final int VALLEN_DEFAULT=10; private static final String VALLEN_CONF="randomkv.val.length"; private static final byte [] QUALIFIER = Bytes.toBytes("data"); + private boolean multiTableMapper = false; + private TableName[] tables = null; + @Override protected void setup(Context context) throws IOException, @@ -155,6 +160,13 @@ public class TestHFileOutputFormat2 { Configuration conf = context.getConfiguration(); keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + false); + if (multiTableMapper) { + tables = TABLE_NAMES; + } else { + tables = new TableName[]{TABLE_NAMES[0]}; + } } @Override @@ -170,19 +182,23 @@ public class TestHFileOutputFormat2 { int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - for (int i = 0; i < ROWSPERSPLIT; i++) { + byte[] key; + for (int j = 0; j < tables.length; ++j) { + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + key = keyBytes; + if (multiTableMapper) { + key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); + } - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - - for (byte[] family : TestHFileOutputFormat2.FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(key, kv); + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(new ImmutableBytesWritable(key), kv); + } } } } @@ -196,31 +212,39 @@ public class TestHFileOutputFormat2 { ImmutableBytesWritable, Put> { private int keyLength; - private static final int KEYLEN_DEFAULT=10; - private static final String KEYLEN_CONF="randomkv.key.length"; + private static final int KEYLEN_DEFAULT = 10; + private static final String KEYLEN_CONF = "randomkv.key.length"; private int valLength; - private static final int VALLEN_DEFAULT=10; - private static final String VALLEN_CONF="randomkv.val.length"; - private static final byte [] QUALIFIER = Bytes.toBytes("data"); + private static final int VALLEN_DEFAULT = 10; + private static final String VALLEN_CONF = "randomkv.val.length"; + private static final byte[] QUALIFIER = Bytes.toBytes("data"); + private boolean multiTableMapper = false; + private TableName[] tables = null; @Override protected void setup(Context context) throws IOException, - InterruptedException { + InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + false); + if (multiTableMapper) { + tables = TABLE_NAMES; + } else { + tables = new TableName[]{TABLE_NAMES[0]}; + } } @Override protected void map( - NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException ,InterruptedException - { + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException, InterruptedException { byte keyBytes[] = new byte[keyLength]; byte valBytes[] = new byte[valLength]; @@ -229,20 +253,25 @@ public class TestHFileOutputFormat2 { assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; Random random = new Random(); - for (int i = 0; i < ROWSPERSPLIT; i++) { + byte[] key; + for (int j = 0; j < tables.length; ++j) { + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + key = keyBytes; + if (multiTableMapper) { + key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); + } - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - - for (byte[] family : TestHFileOutputFormat2.FAMILIES) { - Put p = new Put(keyBytes); - p.addColumn(family, QUALIFIER, valBytes); - // set TTL to very low so that the scan does not return any value - p.setTTL(1l); - context.write(key, p); + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Put p = new Put(keyBytes); + p.addColumn(family, QUALIFIER, valBytes); + // set TTL to very low so that the scan does not return any value + p.setTTL(1l); + context.write(new ImmutableBytesWritable(key), p); + } } } } @@ -365,7 +394,7 @@ public class TestHFileOutputFormat2 { HFile.Reader rd = HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); Map finfo = rd.loadFileInfo(); - byte[] range = finfo.get("TIMERANGE".getBytes()); + byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8")); assertNotNull(range); // unmarshall and check values. @@ -438,6 +467,9 @@ public class TestHFileOutputFormat2 { Path dir = util.getDataTestDir("WritingTagData"); try { + conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); + // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs + conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); Job job = new Job(conf); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); @@ -537,6 +569,7 @@ public class TestHFileOutputFormat2 { doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); } + //@Ignore("Wahtevs") @Test public void testMRIncrementalLoadWithPutSortReducer() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); @@ -544,43 +577,80 @@ public class TestHFileOutputFormat2 { } private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, - boolean putSortReducer, String tableStr) throws Exception { + boolean putSortReducer, String tableStr) throws Exception { + doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, + Arrays.asList(tableStr)); + } + + @Test + public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { + LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); + doIncrementalLoadTest(false, false, true, + Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList + ())); + } + + private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, + boolean putSortReducer, List tableStr) throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); - conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); + conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); int hostCount = 1; int regionNum = 5; - if(shouldKeepLocality) { + if (shouldKeepLocality) { // We should change host count higher than hdfs replica count when MiniHBaseCluster supports // explicit hostnames parameter just like MiniDFSCluster does. hostCount = 3; regionNum = 20; } - byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); String[] hostnames = new String[hostCount]; - for(int i = 0; i < hostCount; ++i) { + for (int i = 0; i < hostCount; ++i) { hostnames[i] = "datanode_" + i; } util.startMiniCluster(1, hostCount, hostnames); - TableName tableName = TableName.valueOf(tableStr); - Table table = util.createTable(tableName, FAMILIES, splitKeys); - Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); - FileSystem fs = testDir.getFileSystem(conf); - try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin = - util.getConnection().getAdmin();) { + + Map allTables = new HashMap<>(tableStr.size()); + List tableInfo = new ArrayList<>(tableStr.size()); + boolean writeMultipleTables = tableStr.size() > 1; + for (String tableStrSingle : tableStr) { + byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); + TableName tableName = TableName.valueOf(tableStrSingle); + Table table = util.createTable(tableName, FAMILIES, splitKeys); + + RegionLocator r = util.getConnection().getRegionLocator(tableName); assertEquals("Should start with empty table", 0, util.countRows(table)); int numRegions = r.getStartKeys().length; assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); - // Generate the bulk load files - runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer); + allTables.put(tableStrSingle, table); + tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); + } + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + // Generate the bulk load files + runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); + + for (Table tableSingle : allTables.values()) { // This doesn't write into the table, just makes files - assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); + assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); + } + int numTableDirs = 0; + for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) { + Path tablePath = testDir; + + if (writeMultipleTables) { + if (allTables.containsKey(tf.getPath().getName())) { + ++numTableDirs; + tablePath = tf.getPath(); + } + else { + continue; + } + } // Make sure that a directory was created for every CF int dir = 0; - for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) { + for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) { for (byte[] family : FAMILIES) { if (Bytes.toString(family).equals(f.getPath().getName())) { ++dir; @@ -588,95 +658,132 @@ public class TestHFileOutputFormat2 { } } assertEquals("Column family not found in FS.", FAMILIES.length, dir); + } + if (writeMultipleTables) { + assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); + } + Admin admin = util.getConnection().getAdmin(); + try { // handle the split case if (shouldChangeRegions) { - LOG.info("Changing regions in table"); - admin.disableTable(table.getName()); + Table chosenTable = allTables.values().iterator().next(); + // Choose a semi-random table if multiple tables are available + LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); + admin.disableTable(chosenTable.getName()); util.waitUntilNoRegionsInTransition(); - util.deleteTable(table.getName()); + util.deleteTable(chosenTable.getName()); byte[][] newSplitKeys = generateRandomSplitKeys(14); - table = util.createTable(tableName, FAMILIES, newSplitKeys); + Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); - while (util.getConnection().getRegionLocator(tableName) - .getAllRegionLocations().size() != 15 || - !admin.isTableAvailable(table.getName())) { + while (util.getConnection().getRegionLocator(chosenTable.getName()) + .getAllRegionLocations().size() != 15 || + !admin.isTableAvailable(table.getName())) { Thread.sleep(200); LOG.info("Waiting for new region assignment to happen"); } } // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r); - - // Ensure data shows up - int expectedRows = 0; - if (putSortReducer) { - // no rows should be extracted - assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, - util.countRows(table)); - } else { - expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, - util.countRows(table)); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - for (Result res : results) { - assertEquals(FAMILIES.length, res.rawCells().length); - Cell first = res.rawCells()[0]; - for (Cell kv : res.rawCells()) { - assertTrue(CellUtil.matchingRow(first, kv)); - assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); - } + for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { + Path tableDir = testDir; + String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); + LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); + if (writeMultipleTables) { + tableDir = new Path(testDir, tableNameStr); } - results.close(); - } - String tableDigestBefore = util.checksumRows(table); - // Check region locality - HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); - for (HRegion region : util.getHBaseCluster().getRegions(tableName)) { - hbd.add(region.getHDFSBlocksDistribution()); - } - for (String hostname : hostnames) { - float locality = hbd.getBlockLocalityIndex(hostname); - LOG.info("locality of [" + hostname + "]: " + locality); - assertEquals(100, (int) (locality * 100)); - } + Table currentTable = allTables.get(tableNameStr); + TableName currentTableName = currentTable.getName(); + new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo + .getRegionLocator()); - // Cause regions to reopen - admin.disableTable(tableName); - while (!admin.isTableDisabled(tableName)) { - Thread.sleep(200); - LOG.info("Waiting for table to disable"); + // Ensure data shows up + int expectedRows = 0; + if (putSortReducer) { + // no rows should be extracted + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(currentTable)); + } else { + expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(currentTable)); + Scan scan = new Scan(); + ResultScanner results = currentTable.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.rawCells().length); + Cell first = res.rawCells()[0]; + for (Cell kv : res.rawCells()) { + assertTrue(CellUtil.matchingRow(first, kv)); + assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + } + } + results.close(); + } + String tableDigestBefore = util.checksumRows(currentTable); + // Check region locality + HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); + for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { + hbd.add(region.getHDFSBlocksDistribution()); + } + for (String hostname : hostnames) { + float locality = hbd.getBlockLocalityIndex(hostname); + LOG.info("locality of [" + hostname + "]: " + locality); + assertEquals(100, (int) (locality * 100)); + } + + // Cause regions to reopen + admin.disableTable(currentTableName); + while (!admin.isTableDisabled(currentTableName)) { + Thread.sleep(200); + LOG.info("Waiting for table to disable"); + } + admin.enableTable(currentTableName); + util.waitTableAvailable(currentTableName); + assertEquals("Data should remain after reopening of regions", + tableDigestBefore, util.checksumRows(currentTable)); } - admin.enableTable(tableName); - util.waitTableAvailable(tableName); - assertEquals("Data should remain after reopening of regions", - tableDigestBefore, util.checksumRows(table)); } finally { + for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { + tableInfoSingle.getRegionLocator().close(); + } + for (Entry singleTable : allTables.entrySet() ) { + singleTable.getValue().close(); + util.deleteTable(singleTable.getValue().getName()); + } testDir.getFileSystem(conf).delete(testDir, true); - util.deleteTable(tableName); util.shutdownMiniCluster(); } } - private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException, - UnsupportedEncodingException, InterruptedException, ClassNotFoundException { + private void runIncrementalPELoad(Configuration conf, List tableInfo, Path outDir, + boolean putSortReducer) throws IOException, + InterruptedException, ClassNotFoundException { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job, putSortReducer); - HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); + if (tableInfo.size() > 1) { + MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); + int sum = 0; + for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { + sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); + } + assertEquals(sum, job.getNumReduceTasks()); + } + else { + RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); + HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), + regionLocator); + assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); + } + FileOutputFormat.setOutputPath(job, outDir); assertFalse(util.getTestFileSystem().exists(outDir)) ; - assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); - assertTrue(job.waitForCompletion(true)); } @@ -696,7 +803,10 @@ public class TestHFileOutputFormat2 { getMockColumnFamiliesForCompression(numCfs); Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForCompression(table, familyToCompression); - HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor()); + conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.compressionDetails, + Arrays.asList(table.getTableDescriptor()))); // read back family specific compression setting from the configuration Map retrievedFamilyToCompressionMap = HFileOutputFormat2 @@ -707,14 +817,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToCompression.entrySet()) { assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForCompression(Table table, Map familyToCompression) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToCompression.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -766,7 +876,9 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForBloomType(table, familyToBloomType); - HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf); + conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, + Arrays.asList(table.getTableDescriptor()))); // read back family specific data block encoding settings from the // configuration @@ -779,14 +891,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToBloomType.entrySet()) { assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes())); + retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForBloomType(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -835,7 +947,10 @@ public class TestHFileOutputFormat2 { Table table = Mockito.mock(Table.class); setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); - HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf); + conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table + .getTableDescriptor()))); // read back family specific data block encoding settings from the // configuration @@ -849,14 +964,14 @@ public class TestHFileOutputFormat2 { ) { assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes())); + retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForBlockSize(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -910,7 +1025,10 @@ public class TestHFileOutputFormat2 { setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); HTableDescriptor tableDescriptor = table.getTableDescriptor(); - HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); + conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + HFileOutputFormat2.serializeColumnFamilyAttribute + (HFileOutputFormat2.dataBlockEncodingDetails, Arrays + .asList(tableDescriptor))); // read back family specific data block encoding settings from the // configuration @@ -923,14 +1041,14 @@ public class TestHFileOutputFormat2 { for (Entry entry : familyToDataBlockEncoding.entrySet()) { assertEquals("DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), entry.getValue(), - retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes())); + retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); } } } private void setupMockColumnFamiliesForDataBlockEncoding(Table table, Map familyToDataBlockEncoding) throws IOException { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); for (Entry entry : familyToDataBlockEncoding.entrySet()) { mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) .setMaxVersions(1) @@ -995,7 +1113,7 @@ public class TestHFileOutputFormat2 { // Setup table descriptor Table table = Mockito.mock(Table.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class); - HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); + HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) { htd.addFamily(hcd); @@ -1099,15 +1217,15 @@ public class TestHFileOutputFormat2 { util.startMiniCluster(); try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); - Table table = util.createTable(TABLE_NAME, FAMILIES); - RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { + Table table = util.createTable(TABLE_NAMES[0], FAMILIES); + RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { final FileSystem fs = util.getDFSCluster().getFileSystem(); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir final Path storePath = new Path( - FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), - new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(), + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), + new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), Bytes.toString(FAMILIES[0]))); assertEquals(0, fs.listStatus(storePath).length); @@ -1117,8 +1235,8 @@ public class TestHFileOutputFormat2 { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); - runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME), - testDir, false); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table + .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); } @@ -1132,12 +1250,12 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME); + admin.compact(TABLE_NAMES[0]); try { quickPoll(new Callable() { @Override public Boolean call() throws Exception { - List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); for (HRegion region : regions) { for (Store store : region.getStores()) { store.closeAndArchiveCompactedFiles(); @@ -1152,11 +1270,11 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME); + admin.majorCompact(TABLE_NAMES[0]); quickPoll(new Callable() { @Override public Boolean call() throws Exception { - List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); for (HRegion region : regions) { for (Store store : region.getStores()) { store.closeAndArchiveCompactedFiles(); @@ -1182,13 +1300,13 @@ public class TestHFileOutputFormat2 { Admin admin = conn.getAdmin()){ Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); final FileSystem fs = util.getDFSCluster().getFileSystem(); - Table table = util.createTable(TABLE_NAME, FAMILIES); + Table table = util.createTable(TABLE_NAMES[0], FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir final Path storePath = new Path( - FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), - new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(), + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]), + new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), Bytes.toString(FAMILIES[0]))); assertEquals(0, fs.listStatus(storePath).length); @@ -1196,7 +1314,7 @@ public class TestHFileOutputFormat2 { Put p = new Put(Bytes.toBytes("test")); p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); table.put(p); - admin.flush(TABLE_NAME); + admin.flush(TABLE_NAMES[0]); assertEquals(1, util.countRows(table)); quickPoll(new Callable() { @Override @@ -1209,8 +1327,9 @@ public class TestHFileOutputFormat2 { conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); - RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); - runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false); + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table + .getTableDescriptor(), regionLocator)), testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); @@ -1224,7 +1343,7 @@ public class TestHFileOutputFormat2 { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME); + admin.compact(TABLE_NAMES[0]); try { quickPoll(new Callable() { @Override @@ -1238,7 +1357,7 @@ public class TestHFileOutputFormat2 { } // a major compaction should work though - admin.majorCompact(TABLE_NAME); + admin.majorCompact(TABLE_NAMES[0]); quickPoll(new Callable() { @Override public Boolean call() throws Exception { @@ -1273,15 +1392,15 @@ public class TestHFileOutputFormat2 { if ("newtable".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); byte[][] splitKeys = generateRandomSplitKeys(4); - try (Table table = util.createTable(tname, FAMILIES, splitKeys)) { - } + Table table = util.createTable(tname, FAMILIES, splitKeys); } else if ("incremental".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); try(Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); RegionLocator regionLocator = c.getRegionLocator(tname)) { Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false); + runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin + .getTableDescriptor(tname), regionLocator)), outDir, false); } } else { throw new RuntimeException( @@ -1294,8 +1413,10 @@ public class TestHFileOutputFormat2 { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); - conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]), - "ONE_SSD"); + + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + + Bytes.toString(HFileOutputFormat2.combineTableNameSuffix( + TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD"); Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); util.startMiniDFSCluster(3); @@ -1313,8 +1434,10 @@ public class TestHFileOutputFormat2 { assertEquals("HOT", spB); // alter table cf schema to change storage policies - HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir); - HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, + HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, + HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); spA = getStoragePolicyName(fs, cf1Dir); spB = getStoragePolicyName(fs, cf2Dir); LOG.debug("Storage policy of cf 0: [" + spA + "]."); @@ -1368,6 +1491,5 @@ public class TestHFileOutputFormat2 { return null; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java deleted file mode 100644 index 781eaa984a2..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.PerformanceEvaluation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and - * writes hfiles. - */ -@Category(MediumTests.class) -public class TestMultiTableHFileOutputFormat { - private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class); - - private HBaseTestingUtility util = new HBaseTestingUtility(); - - private static int ROWSPERSPLIT = 10; - - private static final int KEYLEN_DEFAULT = 10; - private static final String KEYLEN_CONF = "randomkv.key.length"; - - private static final int VALLEN_DEFAULT = 10; - private static final String VALLEN_CONF = "randomkv.val.length"; - - private static final byte[][] TABLES = - { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), - Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; - - private static final byte[][] FAMILIES = - { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), - Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; - - private static final byte[] QUALIFIER = Bytes.toBytes("data"); - - /** - * Run small MR job. this MR job will write HFile into - * testWritingDataIntoHFiles/tableNames/columnFamilies/ - */ - @Test - public void testWritingDataIntoHFiles() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - try { - Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); - - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - job.setReducerClass(Table_KeyValueSortReducer.class); - job.setOutputFormatClass(MultiTableHFileOutputFormat.class); - job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("\nStarting test testWritingDataIntoHFiles\n"); - assertTrue(job.waitForCompletion(true)); - LOG.info("\nWaiting on checking MapReduce output\n"); - assertTrue(checkMROutput(fs, testDir, 0)); - } finally { - testDir.getFileSystem(conf).delete(testDir, true); - util.shutdownMiniCluster(); - } - } - - /** - * check whether create directory and hfiles as format designed in MultiHFilePartitioner - * and also check whether the output file has same related configuration as created table - */ - @Test - public void testMultiHFilePartitioner() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testMultiHFilePartitioner dir writing to : " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - // Create several tables for testing - List tables = new ArrayList(); - - // to store splitKeys for TABLE[0] for testing; - byte[][] testKeys = new byte[0][0]; - for (int i = 0; i < TABLES.length; i++) { - TableName tableName = TableName.valueOf(TABLES[i]); - byte[][] splitKeys = generateRandomSplitKeys(3); - if (i == 0) { - testKeys = splitKeys; - } - HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); - for (int j = 0; j < FAMILIES.length; j++) { - HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]); - //only set Tables[0] configuration, and specify compression type and DataBlockEncode - if (i == 0) { - familyDescriptor.setCompressionType(Compression.Algorithm.GZ); - familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - } - tableDescriptor.addFamily(familyDescriptor); - } - util.createTable(tableDescriptor, splitKeys, conf); - tables.add(tableName); - } - // set up for MapReduce job - try { - Job job = Job.getInstance(conf, "testMultiHFilePartitioner"); - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - - MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables); - - LOG.info("Starting test testWritingDataIntoHFiles"); - assertTrue(job.waitForCompletion(true)); - LOG.info("Waiting on checking MapReduce output"); - assertTrue(checkMROutput(fs, testDir, 0)); - assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys)); - } finally { - for (int i = 0; i < TABLES.length; i++) { - TableName tName = TableName.valueOf(TABLES[i]); - util.deleteTable(tName); - } - fs.delete(testDir, true); - fs.close(); - util.shutdownMiniCluster(); - } - } - - /** - * check the output hfile has same configuration as created test table - * and also check whether hfiles get split correctly - * only check TABLES[0] - */ - private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException { - FileStatus[] fStats = fs.listStatus(testDir); - for (FileStatus stats : fStats) { - if (stats.getPath().getName().equals(new String(TABLES[0]))) { - FileStatus[] cfStats = fs.listStatus(stats.getPath()); - for (FileStatus cfstat : cfStats) { - FileStatus[] hfStats = fs.listStatus(cfstat.getPath()); - - List firsttKeys = new ArrayList(); - List lastKeys = new ArrayList(); - for (FileStatus hfstat : hfStats) { - if (HFile.isHFileFormat(fs, hfstat)) { - HFile.Reader hfr = - HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf); - if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr - .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false; - firsttKeys.add(hfr.getFirstRowKey()); - lastKeys.add(hfr.getLastRowKey()); - } - } - if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) { - return false; - } - } - } - } - return true; - } - - /** - * Check whether the Hfile has been split by region boundaries - * @param splitKeys split keys for that table - * @param firstKeys first rowKey for hfiles - * @param lastKeys last rowKey for hfiles - */ - private boolean checkFileSplit(byte[][] splitKeys, List firstKeys, List lastKeys) { - Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR); - Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR); - Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR); - - int is = 0, il = 0; - for (byte[] key : lastKeys) { - while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++; - if (is == splitKeys.length) { - break; - } - if (is > 0) { - if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false; - } - il++; - } - - if (is == splitKeys.length) { - return il == lastKeys.size() - 1; - } - return true; - } - - - /** - * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the - * created directory is correct or not A recursion method, the testDir had better be small size - */ - private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException { - if (level >= 3) { - return HFile.isHFileFormat(fs, testDir); - } - FileStatus[] fStats = fs.listStatus(testDir); - if (fStats == null || fStats.length <= 0) { - LOG.info("Created directory format is not correct"); - return false; - } - - for (FileStatus stats : fStats) { - // skip the _SUCCESS file created by MapReduce - if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) - continue; - if (level < 2 && !stats.isDirectory()) { - LOG.info("Created directory format is not correct"); - return false; - } - boolean flag = checkMROutput(fs, stats.getPath(), level + 1); - if (flag == false) return false; - } - return true; - } - - - private byte[][] generateRandomSplitKeys(int numKeys) { - Random random = new Random(); - byte[][] ret = new byte[numKeys][]; - for (int i = 0; i < numKeys; i++) { - ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT); - } - return ret; - } - - - /** - * Simple mapper that makes output. With no input data - */ - static class Random_TableKV_GeneratingMapper - extends Mapper { - - private int keyLength; - private int valLength; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - - Configuration conf = context.getConfiguration(); - keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); - valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); - } - - @Override - protected void map(NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException, InterruptedException { - - byte keyBytes[] = new byte[keyLength]; - byte valBytes[] = new byte[valLength]; - - ArrayList tables = new ArrayList<>(); - for (int i = 0; i < TABLES.length; i++) { - tables.add(new ImmutableBytesWritable(TABLES[i])); - } - - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - - for (int i = 0; i < ROWSPERSPLIT; i++) { - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); - random.nextBytes(valBytes); - - for (ImmutableBytesWritable table : tables) { - for (byte[] family : FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(table, kv); - } - } - } - } - } - - /** - * Simple Reducer that have input , with KeyValues have no order. and output - * , with KeyValues are ordered - */ - - static class Table_KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, - org.apache.hadoop.mapreduce.Reducer.Context context) - throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); - for (KeyValue kv : kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv : map) { - context.write(table, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } - } -} \ No newline at end of file