diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index da507b13d0d..f847608846a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -22,14 +22,20 @@ import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -88,24 +94,48 @@ import com.google.common.annotations.VisibleForTesting;
* all HFiles being written.
*
* Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
+ * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)}.
*/
@InterfaceAudience.Public
public class HFileOutputFormat2
extends FileOutputFormat {
private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
+ static class TableInfo {
+ private HTableDescriptor hTableDescriptor;
+ private RegionLocator regionLocator;
+
+ public TableInfo(HTableDescriptor hTableDescriptor, RegionLocator regionLocator) {
+ this.hTableDescriptor = hTableDescriptor;
+ this.regionLocator = regionLocator;
+ }
+
+ public HTableDescriptor getHTableDescriptor() {
+ return hTableDescriptor;
+ }
+
+ public RegionLocator getRegionLocator() {
+ return regionLocator;
+ }
+ }
+
+ protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
+
+ protected static byte[] combineTableNameSuffix(byte[] tableName,
+ byte[] suffix ) {
+ return Bytes.add(tableName, tableSeparator, suffix);
+ }
// The following constants are private since these are used by
// HFileOutputFormat2 to internally transfer data between job setup and
// reducer run using conf.
// These should not be changed by the client.
- private static final String COMPRESSION_FAMILIES_CONF_KEY =
+ static final String COMPRESSION_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.compression";
- private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+ static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomtype";
- private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+ static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.blocksize";
- private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+ static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
// This constant is public since the client can modify this when setting
@@ -121,8 +151,10 @@ public class HFileOutputFormat2
public static final String LOCALITY_SENSITIVE_CONF_KEY =
"hbase.bulkload.locality.sensitive.enabled";
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
- private static final String OUTPUT_TABLE_NAME_CONF_KEY =
+ static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
+ static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+ "hbase.mapreduce.use.multi.table.hfileoutputformat";
public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@@ -133,270 +165,277 @@ public class HFileOutputFormat2
return createRecordWriter(context);
}
- static RecordWriter
- createRecordWriter(final TaskAttemptContext context) throws IOException {
- return new HFileRecordWriter<>(context, null);
+ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
+ return combineTableNameSuffix(tableName, family);
}
- protected static class HFileRecordWriter
- extends RecordWriter {
- private final TaskAttemptContext context;
- private final Path outputPath;
- private final Path outputDir;
- private final Configuration conf;
- private final FileSystem fs;
+ static RecordWriter
+ createRecordWriter(final TaskAttemptContext context)
+ throws IOException {
- private final long maxsize;
+ // Get the path of the temporary output file
+ final Path outputPath = FileOutputFormat.getOutputPath(context);
+ final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Configuration conf = context.getConfiguration();
+ final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
+ final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+ if (writeTableNames==null || writeTableNames.isEmpty()) {
+ throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
+ + " cannot be empty");
+ }
+ final FileSystem fs = outputDir.getFileSystem(conf);
+ // These configs. are from hbase-*.xml
+ final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE);
+ // Invented config. Add to hbase-*.xml if other than default compression.
+ final String defaultCompressionStr = conf.get("hfile.compression",
+ Compression.Algorithm.NONE.getName());
+ final Algorithm defaultCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr);
+ final boolean compactionExclude = conf.getBoolean(
+ "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
- private final Algorithm defaultCompression;
- private final boolean compactionExclude;
+ final Set allTableNames = Arrays.stream(writeTableNames.split(
+ Bytes.toString(tableSeparator))).collect(Collectors.toSet());
- private final Map compressionMap;
- private final Map bloomTypeMap;
- private final Map blockSizeMap;
+ // create a map from column family to the compression algorithm
+ final Map compressionMap = createFamilyCompressionMap(conf);
+ final Map bloomTypeMap = createFamilyBloomTypeMap(conf);
+ final Map blockSizeMap = createFamilyBlockSizeMap(conf);
- private final Map datablockEncodingMap;
- private final DataBlockEncoding overriddenEncoding;
-
- private final Map writers;
- private byte[] previousRow;
- private final byte[] now;
- private boolean rollRequested;
-
- /**
- * Mapredue job will create a temp path for outputting results. If out != null, it means that
- * the caller has set the temp working dir; If out == null, it means we need to set it here.
- * Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
- * temp working dir at the table level and HFileOutputFormat2 has to set it here within this
- * constructor.
- */
- public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
- throws IOException {
- // Get the path of the temporary output file
- context = taContext;
-
- if (out == null) {
- outputPath = FileOutputFormat.getOutputPath(context);
- outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
- } else {
- outputPath = out;
- outputDir = outputPath;
- }
-
- conf = context.getConfiguration();
- fs = outputDir.getFileSystem(conf);
-
- // These configs. are from hbase-*.xml
- maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
-
- // Invented config. Add to hbase-*.xml if other than default compression.
- String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
- defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
- compactionExclude =
- conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
- // create a map from column family to the compression algorithm
- compressionMap = createFamilyCompressionMap(conf);
- bloomTypeMap = createFamilyBloomTypeMap(conf);
- blockSizeMap = createFamilyBlockSizeMap(conf);
-
- // Config for data block encoding
- String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
- datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
- if (dataBlockEncodingStr != null) {
- overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
- } else {
- overriddenEncoding = null;
- }
-
- writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- previousRow = HConstants.EMPTY_BYTE_ARRAY;
- now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
- rollRequested = false;
+ String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+ final Map datablockEncodingMap
+ = createFamilyDataBlockEncodingMap(conf);
+ final DataBlockEncoding overriddenEncoding;
+ if (dataBlockEncodingStr != null) {
+ overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+ } else {
+ overriddenEncoding = null;
}
- @Override
- public void write(ImmutableBytesWritable row, V cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ return new RecordWriter() {
+ // Map of families to writers and how much has been output on the writer.
+ private final Map writers =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+ private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+ private boolean rollRequested = false;
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
+ @Override
+ public void write(ImmutableBytesWritable row, V cell)
+ throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- byte[] rowKey = CellUtil.cloneRow(kv);
- long length = kv.getLength();
- byte[] family = CellUtil.cloneFamily(kv);
- WriterLength wl = this.writers.get(family);
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
- // If this is a new column family, verify that the directory exists
- if (wl == null) {
- Path cfPath = new Path(outputDir, Bytes.toString(family));
- fs.mkdirs(cfPath);
- configureStoragePolicy(conf, fs, family, cfPath);
- }
-
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
-
- // This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
-
- // create a new WAL writer, if necessary
- if (wl == null || wl.writer == null) {
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- HRegionLocation loc = null;
- String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
- if (tableName != null) {
- try (Connection connection = ConnectionFactory.createConnection(conf);
- RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
- loc = locator.getRegionLocation(rowKey);
- } catch (Throwable e) {
- LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
- e);
- loc = null;
- }
- }
-
- if (null == loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "failed to get region location, so use default writer: " + Bytes.toString(rowKey));
- }
- wl = getNewWriter(family, conf, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
- }
- InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
- if (initialIsa.isUnresolved()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
- + loc.getPort() + ", so use default writer");
- }
- wl = getNewWriter(family, conf, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
- }
- wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
- }
+ byte[] rowKey = CellUtil.cloneRow(kv);
+ long length = kv.getLength();
+ byte[] family = CellUtil.cloneFamily(kv);
+ byte[] tableNameBytes = null;
+ if (writeMultipleTables) {
+ tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
+ if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+ throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
+ "' not" + " expected");
}
} else {
- wl = getNewWriter(family, conf, null);
+ tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
+ }
+ byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
+ WriterLength wl = this.writers.get(tableAndFamily);
+
+ // If this is a new column family, verify that the directory exists
+ if (wl == null) {
+ Path writerPath = null;
+ if (writeMultipleTables) {
+ writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
+ .toString(family)));
+ }
+ else {
+ writerPath = new Path(outputDir, Bytes.toString(family));
+ }
+ fs.mkdirs(writerPath);
+ configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
+ }
+
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
+
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new WAL writer, if necessary
+ if (wl == null || wl.writer == null) {
+ if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+ HRegionLocation loc = null;
+
+ String tableName = Bytes.toString(tableNameBytes);
+ if (tableName != null) {
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ RegionLocator locator =
+ connection.getRegionLocator(TableName.valueOf(tableName))) {
+ loc = locator.getRegionLocation(rowKey);
+ } catch (Throwable e) {
+ LOG.warn("There's something wrong when locating rowkey: " +
+ Bytes.toString(rowKey) + " for tablename: " + tableName, e);
+ loc = null;
+ } }
+
+ if (null == loc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("failed to get region location, so use default writer for rowkey: " +
+ Bytes.toString(rowKey));
+ }
+ wl = getNewWriter(tableNameBytes, family, conf, null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+ }
+ InetSocketAddress initialIsa =
+ new InetSocketAddress(loc.getHostname(), loc.getPort());
+ if (initialIsa.isUnresolved()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+ + loc.getPort() + ", so use default writer");
+ }
+ wl = getNewWriter(tableNameBytes, family, conf, null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+ }
+ wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
+ });
+ }
+ }
+ } else {
+ wl = getNewWriter(tableNameBytes, family, conf, null);
+ }
+ }
+
+ // we now have the proper WAL writer. full steam ahead
+ kv.updateLatestStamp(this.now);
+ wl.writer.append(kv);
+ wl.written += length;
+
+ // Copy the row so we know when a row transition.
+ this.previousRow = rowKey;
+ }
+
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info(
+ "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
+ close(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+ this.rollRequested = false;
+ }
+
+ /*
+ * Create a new StoreFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new StoreFile.Writer.
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+ justification="Not important")
+ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
+ conf, InetSocketAddress[] favoredNodes) throws IOException {
+ byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
+ Path familydir = new Path(outputDir, Bytes.toString(family));
+ if (writeMultipleTables) {
+ familydir = new Path(outputDir,
+ new Path(Bytes.toString(tableName), Bytes.toString(family)));
+ }
+ WriterLength wl = new WriterLength();
+ Algorithm compression = compressionMap.get(tableAndFamily);
+ compression = compression == null ? defaultCompression : compression;
+ BloomType bloomType = bloomTypeMap.get(tableAndFamily);
+ bloomType = bloomType == null ? BloomType.NONE : bloomType;
+ Integer blockSize = blockSizeMap.get(tableAndFamily);
+ blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+ DataBlockEncoding encoding = overriddenEncoding;
+ encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
+ encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ HFileContextBuilder contextBuilder = new HFileContextBuilder()
+ .withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(blockSize);
+
+ if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+ contextBuilder.withIncludesTags(true);
+ }
+
+ contextBuilder.withDataBlockEncoding(encoding);
+ HFileContext hFileContext = contextBuilder.build();
+ if (null == favoredNodes) {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+ } else {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+ .withFavoredNodes(favoredNodes).build();
+ }
+
+ this.writers.put(tableAndFamily, wl);
+ return wl;
+ }
+
+ private void close(final StoreFileWriter w) throws IOException {
+ if (w != null) {
+ w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+ Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(true));
+ w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude));
+ w.appendTrackedTimestampsToMetadata();
+ w.close();
}
}
- // we now have the proper WAL writer. full steam ahead
- kv.updateLatestStamp(this.now);
- wl.writer.append(kv);
- wl.written += length;
-
- // Copy the row so we know when a row transition.
- this.previousRow = rowKey;
- }
-
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info(
- "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
+ @Override
+ public void close(TaskAttemptContext c)
+ throws IOException, InterruptedException {
+ for (WriterLength wl: this.writers.values()) {
close(wl.writer);
}
- wl.writer = null;
- wl.written = 0;
}
- this.rollRequested = false;
- }
-
- /*
- * Create a new StoreFile.Writer.
- * @param family
- * @return A WriterLength, containing a new StoreFile.Writer.
- * @throws IOException
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
- justification = "Not important")
- private WriterLength getNewWriter(byte[] family, Configuration conf,
- InetSocketAddress[] favoredNodes) throws IOException {
- WriterLength wl = new WriterLength();
- Path familyDir = new Path(outputDir, Bytes.toString(family));
- Algorithm compression = compressionMap.get(family);
- compression = compression == null ? defaultCompression : compression;
- BloomType bloomType = bloomTypeMap.get(family);
- bloomType = bloomType == null ? BloomType.NONE : bloomType;
- Integer blockSize = blockSizeMap.get(family);
- blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
- DataBlockEncoding encoding = overriddenEncoding;
- encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
- encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
-
- if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
- contextBuilder.withIncludesTags(true);
- }
-
- contextBuilder.withDataBlockEncoding(encoding);
- HFileContext hFileContext = contextBuilder.build();
-
- if (null == favoredNodes) {
- wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familyDir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
- } else {
- wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withOutputDir(familyDir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
- .withFavoredNodes(favoredNodes).build();
- }
-
- this.writers.put(family, wl);
- return wl;
- }
-
- private void close(final StoreFileWriter w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
- Bytes.toBytes(compactionExclude));
- w.appendTrackedTimestampsToMetadata();
- w.close();
- }
- }
-
- @Override
- public void close(TaskAttemptContext c) throws IOException, InterruptedException {
- for (WriterLength wl : this.writers.values()) {
- close(wl.writer);
- }
- }
+ };
}
/**
* Configure block storage policy for CF after the directory is created.
*/
static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
- byte[] family, Path cfPath) {
- if (null == conf || null == fs || null == family || null == cfPath) {
+ byte[] tableAndFamily, Path cfPath) {
+ if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
return;
}
String policy =
- conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+ conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
conf.get(STORAGE_POLICY_PROPERTY));
FSUtils.setStoragePolicy(fs, cfPath, policy);
}
@@ -413,12 +452,29 @@ public class HFileOutputFormat2
* Return the start keys of all of the regions in this table,
* as a list of ImmutableBytesWritable.
*/
- private static List getRegionStartKeys(RegionLocator table)
- throws IOException {
- byte[][] byteKeys = table.getStartKeys();
- ArrayList ret = new ArrayList<>(byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- ret.add(new ImmutableBytesWritable(byteKey));
+ private static List getRegionStartKeys(List regionLocators,
+ boolean writeMultipleTables)
+ throws IOException {
+
+ ArrayList ret = new ArrayList<>();
+ for(RegionLocator regionLocator : regionLocators)
+ {
+ TableName tableName = regionLocator.getName();
+ LOG.info("Looking up current regions for table " + tableName);
+ byte[][] byteKeys = regionLocator.getStartKeys();
+ for (byte[] byteKey : byteKeys) {
+ byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+ if (writeMultipleTables)
+ {
+ //MultiTableHFileOutputFormat use case
+ fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
+ (fullKey) + "]");
+ }
+ ret.add(new ImmutableBytesWritable(fullKey));
+ }
}
return ret;
}
@@ -429,7 +485,7 @@ public class HFileOutputFormat2
*/
@SuppressWarnings("deprecation")
private static void writePartitions(Configuration conf, Path partitionsPath,
- List startKeys) throws IOException {
+ List startKeys, boolean writeMultipleTables) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed");
@@ -440,14 +496,17 @@ public class HFileOutputFormat2
// so we need to remove it. Otherwise we would end up with an
// empty reducer with index 0
TreeSet sorted = new TreeSet<>(startKeys);
-
ImmutableBytesWritable first = sorted.first();
+ if (writeMultipleTables) {
+ first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
+ ().get()));
+ }
if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
throw new IllegalArgumentException(
"First region of table should have empty start key. Instead has: "
+ Bytes.toStringBinary(first.get()));
}
- sorted.remove(first);
+ sorted.remove(sorted.first());
// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
@@ -499,17 +558,25 @@ public class HFileOutputFormat2
*/
public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
RegionLocator regionLocator) throws IOException {
- configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
+ ArrayList singleTableInfo = new ArrayList<>();
+ singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+ configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
}
- static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
- RegionLocator regionLocator, Class extends OutputFormat, ?>> cls) throws IOException,
- UnsupportedEncodingException {
+ static void configureIncrementalLoad(Job job, List multiTableInfo, Class extends OutputFormat, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
+ if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
+ throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
+ }
+ boolean writeMultipleTables = false;
+ if (MultiTableHFileOutputFormat.class.equals(cls)) {
+ writeMultipleTables = true;
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+ }
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
@@ -528,28 +595,44 @@ public class HFileOutputFormat2
KeyValueSerialization.class.getName());
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- // record this table name for creating writer by favored nodes
LOG.info("bulkload locality sensitive enabled");
- conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
}
+ /* Now get the region start keys for every table required */
+ List allTableNames = new ArrayList<>(multiTableInfo.size());
+ List regionLocators = new ArrayList<>( multiTableInfo.size());
+ List tableDescriptors = new ArrayList<>( multiTableInfo.size());
+
+ for( TableInfo tableInfo : multiTableInfo )
+ {
+ regionLocators.add(tableInfo.getRegionLocator());
+ allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+ tableDescriptors.add(tableInfo.getHTableDescriptor());
+ }
+ // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
+ conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
+ .toString(tableSeparator)));
+ List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
// Use table's region boundaries for TOP split points.
- LOG.info("Looking up current regions for table " + regionLocator.getName());
- List startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
- "to match current region count");
+ "to match current region count for all tables");
job.setNumReduceTasks(startKeys.size());
- configurePartitioner(job, startKeys);
+ configurePartitioner(job, startKeys, writeMultipleTables);
// Set compression algorithms based on column families
- configureCompression(conf, tableDescriptor);
- configureBloomType(tableDescriptor, conf);
- configureBlockSize(tableDescriptor, conf);
- configureDataBlockEncoding(tableDescriptor, conf);
+
+ conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
+ tableDescriptors));
+ conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
+ tableDescriptors));
+ conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
+ tableDescriptors));
+ conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
+ LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
}
public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
@@ -560,11 +643,19 @@ public class HFileOutputFormat2
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
+ ArrayList singleTableDescriptor = new ArrayList<>(1);
+ singleTableDescriptor.add(tableDescriptor);
+
+ conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getNameAsString());
// Set compression algorithms based on column families
- configureCompression(conf, tableDescriptor);
- configureBloomType(tableDescriptor, conf);
- configureBlockSize(tableDescriptor, conf);
- configureDataBlockEncoding(tableDescriptor, conf);
+ conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
+ conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
+ conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
+ conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
@@ -667,7 +758,7 @@ public class HFileOutputFormat2
continue;
}
try {
- confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
@@ -681,7 +772,8 @@ public class HFileOutputFormat2
* Configure job
with a TotalOrderPartitioner, partitioning against
* splitPoints
. Cleans up the partitions file after job exists.
*/
- static void configurePartitioner(Job job, List splitPoints)
+ static void configurePartitioner(Job job, List splitPoints, boolean
+ writeMultipleTables)
throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
@@ -691,7 +783,7 @@ public class HFileOutputFormat2
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
fs.makeQualified(partitionsPath);
- writePartitions(conf, partitionsPath, splitPoints);
+ writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
fs.deleteOnExit(partitionsPath);
// configure job to use it
@@ -699,6 +791,34 @@ public class HFileOutputFormat2
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+ @VisibleForTesting
+ static String serializeColumnFamilyAttribute(Function fn, List allTables)
+ throws UnsupportedEncodingException {
+ StringBuilder attributeValue = new StringBuilder();
+ int i = 0;
+ for (HTableDescriptor tableDescriptor : allTables) {
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ // CODEREVIEW: Can I set an empty string in conf if mock table instance?
+ return "";
+ }
+ Collection families = tableDescriptor.getFamilies();
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ attributeValue.append('&');
+ }
+ attributeValue.append(URLEncoder.encode(
+ Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
+ "UTF-8"));
+ attributeValue.append('=');
+ attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
+ }
+ }
+ // Get rid of the last ampersand
+ return attributeValue.toString();
+ }
+
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
@@ -708,134 +828,65 @@ public class HFileOutputFormat2
* @throws IOException
* on failure to read column family descriptors
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
- static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
- throws UnsupportedEncodingException {
- StringBuilder compressionConfigValue = new StringBuilder();
- if(tableDescriptor == null){
- // could happen with mock table instance
- return;
- }
- Collection families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- compressionConfigValue.append('&');
- }
- compressionConfigValue.append(URLEncoder.encode(
- familyDescriptor.getNameAsString(), "UTF-8"));
- compressionConfigValue.append('=');
- compressionConfigValue.append(URLEncoder.encode(
- familyDescriptor.getCompressionType().getName(), "UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
- }
+ static Function compressionDetails = familyDescriptor ->
+ familyDescriptor.getCompressionType().getName();
/**
- * Serialize column family to block size map to configuration.
- * Invoked while configuring the MR job for incremental load.
- * @param tableDescriptor to read the properties from
- * @param conf to persist serialized values into
+ * Serialize column family to block size map to configuration. Invoked while
+ * configuring the MR job for incremental load.
+ *
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
- throws UnsupportedEncodingException {
- StringBuilder blockSizeConfigValue = new StringBuilder();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- Collection families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- blockSizeConfigValue.append('&');
- }
- blockSizeConfigValue.append(URLEncoder.encode(
- familyDescriptor.getNameAsString(), "UTF-8"));
- blockSizeConfigValue.append('=');
- blockSizeConfigValue.append(URLEncoder.encode(
- String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
- }
+ static Function blockSizeDetails = familyDescriptor -> String
+ .valueOf(familyDescriptor.getBlocksize());
/**
- * Serialize column family to bloom type map to configuration.
- * Invoked while configuring the MR job for incremental load.
- * @param tableDescriptor to read the properties from
- * @param conf to persist serialized values into
+ * Serialize column family to bloom type map to configuration. Invoked while
+ * configuring the MR job for incremental load.
+ *
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
- throws UnsupportedEncodingException {
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
+ static Function bloomTypeDetails = familyDescriptor -> {
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
}
- StringBuilder bloomTypeConfigValue = new StringBuilder();
- Collection families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- bloomTypeConfigValue.append('&');
- }
- bloomTypeConfigValue.append(URLEncoder.encode(
- familyDescriptor.getNameAsString(), "UTF-8"));
- bloomTypeConfigValue.append('=');
- String bloomType = familyDescriptor.getBloomFilterType().toString();
- if (bloomType == null) {
- bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
- }
- bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
- }
- conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
- }
+ return bloomType;
+ };
/**
* Serialize column family to data block encoding map to configuration.
* Invoked while configuring the MR job for incremental load.
*
- * @param tableDescriptor to read the properties from
- * @param conf to persist serialized values into
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
- Configuration conf) throws UnsupportedEncodingException {
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
+ static Function dataBlockEncodingDetails = familyDescriptor -> {
+ DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+ if (encoding == null) {
+ encoding = DataBlockEncoding.NONE;
}
- StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
- Collection families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- dataBlockEncodingConfigValue.append('&');
- }
- dataBlockEncodingConfigValue.append(
- URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
- dataBlockEncodingConfigValue.append('=');
- DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
- if (encoding == null) {
- encoding = DataBlockEncoding.NONE;
- }
- dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
- "UTF-8"));
- }
- conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
- dataBlockEncodingConfigValue.toString());
- }
+ return encoding.toString();
+ };
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
index 03256bf9338..fdcf30ea50d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
@@ -6,504 +6,118 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hbase.mapreduce;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.TreeMap;
-import java.util.ArrayList;
-import java.util.TreeSet;
-import java.util.Collections;
-
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
/**
- * Create 3 level tree directory, first level is using table name as parent directory and then use
- * family name as child directory, and all related HFiles for one family are under child directory
+ * Create 3 level tree directory, first level is using table name as parent
+ * directory and then use family name as child directory, and all related HFiles
+ * for one family are under child directory
* -tableName1
- * -columnFamilyName1
- * -HFile (region1)
- * -columnFamilyName2
- * -HFile1 (region1)
- * -HFile2 (region2)
- * -HFile3 (region3)
+ * -columnFamilyName1
+ * -columnFamilyName2
+ * -HFiles
* -tableName2
- * -columnFamilyName1
- * -HFile (region1)
- * family directory and its hfiles match the output of HFileOutputFormat2
- * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
+ * -columnFamilyName1
+ * -HFiles
+ * -columnFamilyName2
*/
-
@InterfaceAudience.Public
@VisibleForTesting
-public class MultiTableHFileOutputFormat extends FileOutputFormat {
+public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
- @Override
- public RecordWriter
- getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
- return createMultiHFileRecordWriter(context);
- }
-
- static RecordWriter
- createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
-
- // Get the path of the output directory
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = outputDir.getFileSystem(conf);
-
- Connection conn = ConnectionFactory.createConnection(conf);
- Admin admin = conn.getAdmin();
-
- // Map of existing tables, avoid calling getTable() everytime
- final Map tables = new HashMap<>();
-
- // Map of tables to writers
- final Map> tableWriters = new HashMap<>();
-
- return new RecordWriter() {
- @Override
- public void write(ImmutableBytesWritable tableName, V cell)
- throws IOException, InterruptedException {
- RecordWriter tableWriter = tableWriters.get(tableName);
- // if there is new table, verify that table directory exists
- if (tableWriter == null) {
- // using table name as directory name
- final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
- fs.mkdirs(tableOutputDir);
- LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
- + tableOutputDir.toString());
- // Configure for tableWriter, if table exist, write configuration of table into conf
- Table table = null;
- if (tables.containsKey(tableName)) {
- table = tables.get(tableName);
- } else {
- table = getTable(tableName.copyBytes(), conn, admin);
- tables.put(tableName, table);
- }
- if (table != null) {
- configureForOneTable(conf, table.getTableDescriptor());
- }
- // Create writer for one specific table
- tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
- // Put table into map
- tableWriters.put(tableName, tableWriter);
- }
- // Write into tableWriter
- // in the original code, it does not use Row
- tableWriter.write(null, cell);
- }
-
- @Override
- public void close(TaskAttemptContext c) throws IOException, InterruptedException {
- for (RecordWriter writer : tableWriters.values()) {
- writer.close(c);
- }
- if (conn != null) {
- conn.close();
- }
- if (admin != null) {
- admin.close();
- }
- }
- };
- }
-
/**
- * Configure for one table, should be used before creating a new HFileRecordWriter,
- * Set compression algorithms and related configuration based on column families
+ * Creates a composite key to use as a mapper output key when using
+ * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
+ *
+ * @param tableName Name of the Table - Eg: TableName.getNameAsString()
+ * @param suffix Usually represents a rowkey when creating a mapper key or column family
+ * @return byte[] representation of composite key
*/
- private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor)
- throws UnsupportedEncodingException {
- HFileOutputFormat2.configureCompression(conf, tableDescriptor);
- HFileOutputFormat2.configureBlockSize(tableDescriptor, conf);
- HFileOutputFormat2.configureBloomType(tableDescriptor, conf);
- HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
+ public static byte[] createCompositeKey(byte[] tableName,
+ byte[] suffix) {
+ return combineTableNameSuffix(tableName, suffix);
}
/**
- * Configure a MapReduce Job to output HFiles for performing an incremental load into
- * the multiple tables.
- *
- * - Inspects the tables to configure a partitioner based on their region boundaries
- * - Writes the partitions file and configures the partitioner
- * - Sets the number of reduce tasks to match the total number of all tables' regions
- * - Sets the reducer up to perform the appropriate sorting (KeyValueSortReducer)
- *
+ * Alternate api which accepts an ImmutableBytesWritable for the suffix
+ * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+ */
+ public static byte[] createCompositeKey(byte[] tableName,
+ ImmutableBytesWritable suffix) {
+ return combineTableNameSuffix(tableName, suffix.get());
+ }
+
+ /**
+ * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
+ * suffix
+ * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+ */
+ public static byte[] createCompositeKey(String tableName,
+ ImmutableBytesWritable suffix) {
+ return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
+ }
+
+ /**
+ * Analogous to
+ * {@link HFileOutputFormat2#configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)},
+ * this function will configure the requisite number of reducers to write HFiles for multple
+ * tables simultaneously
*
- * ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job.
- * Caller needs to setup input path, output path and mapper
- *
- * @param job
- * @param tables A list of tables to inspects
+ * @param job See {@link org.apache.hadoop.mapreduce.Job}
+ * @param multiTableDescriptors Table descriptor and region locator pairs
* @throws IOException
*/
- public static void configureIncrementalLoad(Job job, List tables) throws IOException {
- configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class);
+ public static void configureIncrementalLoad(Job job, List
+ multiTableDescriptors)
+ throws IOException {
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
+ MultiTableHFileOutputFormat.class);
}
- public static void configureIncrementalLoad(Job job, List tables,
- Class extends OutputFormat, ?>> cls) throws IOException {
+ final private static int validateCompositeKey(byte[] keyBytes) {
- Configuration conf = job.getConfiguration();
- Map> tableSplitKeys =
- MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables);
- configureIncrementalLoad(job, tableSplitKeys, cls);
+ int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
+
+ // Either the separator was not found or a tablename wasn't present or a key wasn't present
+ if (separatorIdx == -1) {
+ throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
+ .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
+ }
+ return separatorIdx;
}
- /**
- * Same purpose as configureIncrementalLoad(Job job, List tables)
- * Used when region startKeys of each table is available, input as >
- *
- * Caller needs to transfer TableName and byte[] to ImmutableBytesWritable
- */
- public static void configureIncrementalLoad(Job job, Map> tableSplitKeys) throws IOException {
- configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class);
+ protected static byte[] getTableName(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, 0, separatorIdx);
}
- public static void configureIncrementalLoad(Job job, Map> tableSplitKeys, Class extends OutputFormat, ?>> cls) throws IOException {
- Configuration conf = job.getConfiguration();
-
- // file path to store
- String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
- LOG.info("Writing partition info into dir: " + partitionsPath.toString());
- job.setPartitionerClass(MultiHFilePartitioner.class);
- // get split keys for all the tables, and write them into partition file
- MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys);
- MultiHFilePartitioner.setPartitionFile(conf, partitionsPath);
- partitionsPath.getFileSystem(conf).makeQualified(partitionsPath);
- partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath);
-
- // now only support Mapper output
- // we can use KeyValueSortReducer directly to sort Mapper output
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
- }
- int reducerNum = getReducerNumber(tableSplitKeys);
- job.setNumReduceTasks(reducerNum);
- LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count");
-
- // setup output format
- job.setOutputFormatClass(cls);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
-
- job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
+ protected static byte[] getSuffix(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
}
-
- /**
- * Check if table exist, should not dependent on HBase instance
- * @return instance of table, if it exist
- */
- private static Table getTable(final byte[] tableName, Connection conn, Admin admin) {
- if (conn == null || admin == null) {
- LOG.info("can not get Connection or Admin");
- return null;
- }
-
- try {
- TableName table = TableName.valueOf(tableName);
- if (admin.tableExists(table)) {
- return conn.getTable(table);
- }
- } catch (IOException e) {
- LOG.info("Exception found in getTable()" + e.toString());
- return null;
- }
-
- LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist");
- return null;
- }
-
- /**
- * Get the number of reducers by tables' split keys
- */
- private static int getReducerNumber(
- Map> tableSplitKeys) {
- int reducerNum = 0;
- for (Map.Entry> entry : tableSplitKeys.entrySet()) {
- reducerNum += entry.getValue().size();
- }
- return reducerNum;
- }
-
- /**
- * MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner
- * The input is partitioned based on table's name and its region boundaries with the table.
- * Two records are in the same partition if they have same table name and the their cells are
- * in the same region
- */
- static class MultiHFilePartitioner extends Partitioner
- implements Configurable {
-
- public static final String DEFAULT_PATH = "_partition_multihfile.lst";
- public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path";
- private Configuration conf;
- // map to receive from file
- private Map> table_SplitKeys;
- // each pair is map to one unique integer
- private TreeMap partitionMap;
-
- @Override
- public void setConf(Configuration conf) {
- try {
- this.conf = conf;
- partitionMap = new TreeMap<>();
- table_SplitKeys = readTableSplitKeys(conf);
-
- // initiate partitionMap by table_SplitKeys map
- int splitNum = 0;
- for (Map.Entry> entry : table_SplitKeys.entrySet()) {
- ImmutableBytesWritable table = entry.getKey();
- List list = entry.getValue();
- for (ImmutableBytesWritable splitKey : list) {
- partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++);
- }
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("Can't read partitions file", e);
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Set the path to the SequenceFile storing the sorted . It must be the case
- * that for R reduces, there are R-1 keys in the SequenceFile.
- */
- public static void setPartitionFile(Configuration conf, Path p) {
- conf.set(PARTITIONER_PATH, p.toString());
- }
-
- /**
- * Get the path to the SequenceFile storing the sorted .
- * @see #setPartitionFile(Configuration, Path)
- */
- public static String getPartitionFile(Configuration conf) {
- return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
- }
-
-
- /**
- * Return map of
- */
- public static Map> getTablesRegionStartKeys(
- Configuration conf, List tables) throws IOException {
- final TreeMap> ret = new TreeMap<>();
-
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Admin admin = conn.getAdmin()) {
- LOG.info("Looking up current regions for tables");
- for (TableName tName : tables) {
- RegionLocator table = conn.getRegionLocator(tName);
- // if table not exist, use default split keys for this table
- byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY };
- if (admin.tableExists(tName)) {
- byteKeys = table.getStartKeys();
- }
- List tableStartKeys = new ArrayList<>(byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- tableStartKeys.add(new ImmutableBytesWritable(byteKey));
- }
- ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys);
-
- }
- return ret;
- }
- }
-
- /**
- * write into sequence file in order,
- * and this format can be parsed by MultiHFilePartitioner
- */
- public static void writeTableSplitKeys(Configuration conf, Path partitionsPath,
- Map> map) throws IOException {
- LOG.info("Writing partition information to " + partitionsPath);
-
- if (map == null || map.isEmpty()) {
- throw new IllegalArgumentException("No regions passed for all tables");
- }
-
- SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath),
- Writer.keyClass(ImmutableBytesWritable.class),
- Writer.valueClass(ImmutableBytesWritable.class));
-
- try {
- for (Map.Entry> entry : map.entrySet()) {
- ImmutableBytesWritable table = entry.getKey();
- List list = entry.getValue();
- if (list == null) {
- throw new IOException("Split keys for a table can not be null");
- }
-
- TreeSet sorted = new TreeSet<>(list);
-
- ImmutableBytesWritable first = sorted.first();
- if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IllegalArgumentException(
- "First region of table should have empty start key. Instead has: "
- + Bytes.toStringBinary(first.get()));
- }
-
- for (ImmutableBytesWritable startKey : sorted) {
- writer.append(table, startKey);
- }
- }
- } finally {
- writer.close();
- }
- }
-
- /**
- * read partition file into map
- */
- private Map> readTableSplitKeys(
- Configuration conf) throws IOException {
- String parts = getPartitionFile(conf);
- LOG.info("Read partition info from file: " + parts);
- final Path partFile = new Path(parts);
-
- SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile));
- // values are already sorted in file, so use list
- final Map> map =
- new TreeMap<>();
- // key and value have same type
- ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
- ImmutableBytesWritable value =
- ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
- try {
- while (reader.next(key, value)) {
-
- List list = map.get(key);
- if (list == null) {
- list = new ArrayList<>();
- }
- list.add(value);
- map.put(key, list);
-
- key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
- value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
- }
- } finally {
- IOUtils.cleanup(LOG, reader);
- }
- return map;
- }
-
- @Override
- public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) {
- byte[] row = CellUtil.cloneRow(value);
- final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row);
- ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
- //find splitKey by input rowKey
- if (table_SplitKeys.containsKey(table)) {
- List list = table_SplitKeys.get(table);
- int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator());
- if (index < 0) {
- index = (index + 1) * (-1) - 1;
- } else if (index == list.size()) {
- index -= 1;
- }
- if (index < 0) {
- index = 0;
- LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY ");
- }
- splitId = list.get(index);
- }
-
- // find the id of the reducer for the input
- Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId));
- if (id == null) {
- LOG.warn("Can not get reducer id for input record");
- return -1;
- }
- return id.intValue() % numPartitions;
- }
-
- /**
- * A class store pair, has two main usage
- * 1. store tableName and one of its splitKey as a pair
- * 2. implement comparable, so that partitioner can find splitKey of its input cell
- */
- static class TableSplitKeyPair extends Pair
- implements Comparable {
-
- private static final long serialVersionUID = -6485999667666325594L;
-
- public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) {
- super(a, b);
- }
-
- @Override
- public int compareTo(TableSplitKeyPair other) {
- if (this.getFirst().equals(other.getFirst())) {
- return this.getSecond().compareTo(other.getSecond());
- }
- return this.getFirst().compareTo(other.getFirst());
- }
- }
- }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 3533f8a243f..87522b6e069 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -109,6 +108,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
/**
* Simple test for {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.
@@ -123,9 +125,9 @@ public class TestHFileOutputFormat2 {
private static final byte[][] FAMILIES
= { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
- , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
- private static final TableName TABLE_NAME =
- TableName.valueOf("TestTable");
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+ private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
+ "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -146,6 +148,9 @@ public class TestHFileOutputFormat2 {
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
+ private boolean multiTableMapper = false;
+ private TableName[] tables = null;
+
@Override
protected void setup(Context context) throws IOException,
@@ -155,6 +160,13 @@ public class TestHFileOutputFormat2 {
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+ false);
+ if (multiTableMapper) {
+ tables = TABLE_NAMES;
+ } else {
+ tables = new TableName[]{TABLE_NAMES[0]};
+ }
}
@Override
@@ -170,19 +182,23 @@ public class TestHFileOutputFormat2 {
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
Random random = new Random();
- for (int i = 0; i < ROWSPERSPLIT; i++) {
+ byte[] key;
+ for (int j = 0; j < tables.length; ++j) {
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ random.nextBytes(valBytes);
+ key = keyBytes;
+ if (multiTableMapper) {
+ key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+ }
- random.nextBytes(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
- random.nextBytes(valBytes);
- ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
-
- for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
- Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
- context.write(key, kv);
+ for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+ context.write(new ImmutableBytesWritable(key), kv);
+ }
}
}
}
@@ -196,31 +212,39 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable, Put> {
private int keyLength;
- private static final int KEYLEN_DEFAULT=10;
- private static final String KEYLEN_CONF="randomkv.key.length";
+ private static final int KEYLEN_DEFAULT = 10;
+ private static final String KEYLEN_CONF = "randomkv.key.length";
private int valLength;
- private static final int VALLEN_DEFAULT=10;
- private static final String VALLEN_CONF="randomkv.val.length";
- private static final byte [] QUALIFIER = Bytes.toBytes("data");
+ private static final int VALLEN_DEFAULT = 10;
+ private static final String VALLEN_CONF = "randomkv.val.length";
+ private static final byte[] QUALIFIER = Bytes.toBytes("data");
+ private boolean multiTableMapper = false;
+ private TableName[] tables = null;
@Override
protected void setup(Context context) throws IOException,
- InterruptedException {
+ InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+ false);
+ if (multiTableMapper) {
+ tables = TABLE_NAMES;
+ } else {
+ tables = new TableName[]{TABLE_NAMES[0]};
+ }
}
@Override
protected void map(
- NullWritable n1, NullWritable n2,
- Mapper.Context context)
- throws java.io.IOException ,InterruptedException
- {
+ NullWritable n1, NullWritable n2,
+ Mapper.Context context)
+ throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
@@ -229,20 +253,25 @@ public class TestHFileOutputFormat2 {
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
- for (int i = 0; i < ROWSPERSPLIT; i++) {
+ byte[] key;
+ for (int j = 0; j < tables.length; ++j) {
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ random.nextBytes(valBytes);
+ key = keyBytes;
+ if (multiTableMapper) {
+ key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+ }
- random.nextBytes(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
- random.nextBytes(valBytes);
- ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
-
- for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
- Put p = new Put(keyBytes);
- p.addColumn(family, QUALIFIER, valBytes);
- // set TTL to very low so that the scan does not return any value
- p.setTTL(1l);
- context.write(key, p);
+ for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+ Put p = new Put(keyBytes);
+ p.addColumn(family, QUALIFIER, valBytes);
+ // set TTL to very low so that the scan does not return any value
+ p.setTTL(1l);
+ context.write(new ImmutableBytesWritable(key), p);
+ }
}
}
}
@@ -365,7 +394,7 @@ public class TestHFileOutputFormat2 {
HFile.Reader rd =
HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
Map finfo = rd.loadFileInfo();
- byte[] range = finfo.get("TIMERANGE".getBytes());
+ byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
assertNotNull(range);
// unmarshall and check values.
@@ -438,6 +467,9 @@ public class TestHFileOutputFormat2 {
Path dir =
util.getDataTestDir("WritingTagData");
try {
+ conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
+ // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
+ conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
@@ -537,6 +569,7 @@ public class TestHFileOutputFormat2 {
doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
}
+ //@Ignore("Wahtevs")
@Test
public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
@@ -544,43 +577,80 @@ public class TestHFileOutputFormat2 {
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
- boolean putSortReducer, String tableStr) throws Exception {
+ boolean putSortReducer, String tableStr) throws Exception {
+ doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
+ Arrays.asList(tableStr));
+ }
+
+ @Test
+ public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
+ LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
+ doIncrementalLoadTest(false, false, true,
+ Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
+ ()));
+ }
+
+ private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
+ boolean putSortReducer, List tableStr) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
- conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
+ conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
int hostCount = 1;
int regionNum = 5;
- if(shouldKeepLocality) {
+ if (shouldKeepLocality) {
// We should change host count higher than hdfs replica count when MiniHBaseCluster supports
// explicit hostnames parameter just like MiniDFSCluster does.
hostCount = 3;
regionNum = 20;
}
- byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
String[] hostnames = new String[hostCount];
- for(int i = 0; i < hostCount; ++i) {
+ for (int i = 0; i < hostCount; ++i) {
hostnames[i] = "datanode_" + i;
}
util.startMiniCluster(1, hostCount, hostnames);
- TableName tableName = TableName.valueOf(tableStr);
- Table table = util.createTable(tableName, FAMILIES, splitKeys);
- Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
- FileSystem fs = testDir.getFileSystem(conf);
- try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin =
- util.getConnection().getAdmin();) {
+
+ Map allTables = new HashMap<>(tableStr.size());
+ List tableInfo = new ArrayList<>(tableStr.size());
+ boolean writeMultipleTables = tableStr.size() > 1;
+ for (String tableStrSingle : tableStr) {
+ byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+ TableName tableName = TableName.valueOf(tableStrSingle);
+ Table table = util.createTable(tableName, FAMILIES, splitKeys);
+
+ RegionLocator r = util.getConnection().getRegionLocator(tableName);
assertEquals("Should start with empty table", 0, util.countRows(table));
int numRegions = r.getStartKeys().length;
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
- // Generate the bulk load files
- runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer);
+ allTables.put(tableStrSingle, table);
+ tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
+ }
+ Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+ // Generate the bulk load files
+ runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
+
+ for (Table tableSingle : allTables.values()) {
// This doesn't write into the table, just makes files
- assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
+ assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
+ }
+ int numTableDirs = 0;
+ for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
+ Path tablePath = testDir;
+
+ if (writeMultipleTables) {
+ if (allTables.containsKey(tf.getPath().getName())) {
+ ++numTableDirs;
+ tablePath = tf.getPath();
+ }
+ else {
+ continue;
+ }
+ }
// Make sure that a directory was created for every CF
int dir = 0;
- for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+ for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
for (byte[] family : FAMILIES) {
if (Bytes.toString(family).equals(f.getPath().getName())) {
++dir;
@@ -588,95 +658,132 @@ public class TestHFileOutputFormat2 {
}
}
assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+ }
+ if (writeMultipleTables) {
+ assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
+ }
+ Admin admin = util.getConnection().getAdmin();
+ try {
// handle the split case
if (shouldChangeRegions) {
- LOG.info("Changing regions in table");
- admin.disableTable(table.getName());
+ Table chosenTable = allTables.values().iterator().next();
+ // Choose a semi-random table if multiple tables are available
+ LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
+ admin.disableTable(chosenTable.getName());
util.waitUntilNoRegionsInTransition();
- util.deleteTable(table.getName());
+ util.deleteTable(chosenTable.getName());
byte[][] newSplitKeys = generateRandomSplitKeys(14);
- table = util.createTable(tableName, FAMILIES, newSplitKeys);
+ Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
- while (util.getConnection().getRegionLocator(tableName)
- .getAllRegionLocations().size() != 15 ||
- !admin.isTableAvailable(table.getName())) {
+ while (util.getConnection().getRegionLocator(chosenTable.getName())
+ .getAllRegionLocations().size() != 15 ||
+ !admin.isTableAvailable(table.getName())) {
Thread.sleep(200);
LOG.info("Waiting for new region assignment to happen");
}
}
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r);
-
- // Ensure data shows up
- int expectedRows = 0;
- if (putSortReducer) {
- // no rows should be extracted
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
- util.countRows(table));
- } else {
- expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
- util.countRows(table));
- Scan scan = new Scan();
- ResultScanner results = table.getScanner(scan);
- for (Result res : results) {
- assertEquals(FAMILIES.length, res.rawCells().length);
- Cell first = res.rawCells()[0];
- for (Cell kv : res.rawCells()) {
- assertTrue(CellUtil.matchingRow(first, kv));
- assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
- }
+ for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
+ Path tableDir = testDir;
+ String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
+ LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
+ if (writeMultipleTables) {
+ tableDir = new Path(testDir, tableNameStr);
}
- results.close();
- }
- String tableDigestBefore = util.checksumRows(table);
- // Check region locality
- HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
- for (HRegion region : util.getHBaseCluster().getRegions(tableName)) {
- hbd.add(region.getHDFSBlocksDistribution());
- }
- for (String hostname : hostnames) {
- float locality = hbd.getBlockLocalityIndex(hostname);
- LOG.info("locality of [" + hostname + "]: " + locality);
- assertEquals(100, (int) (locality * 100));
- }
+ Table currentTable = allTables.get(tableNameStr);
+ TableName currentTableName = currentTable.getName();
+ new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
+ .getRegionLocator());
- // Cause regions to reopen
- admin.disableTable(tableName);
- while (!admin.isTableDisabled(tableName)) {
- Thread.sleep(200);
- LOG.info("Waiting for table to disable");
+ // Ensure data shows up
+ int expectedRows = 0;
+ if (putSortReducer) {
+ // no rows should be extracted
+ assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ util.countRows(currentTable));
+ } else {
+ expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+ assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ util.countRows(currentTable));
+ Scan scan = new Scan();
+ ResultScanner results = currentTable.getScanner(scan);
+ for (Result res : results) {
+ assertEquals(FAMILIES.length, res.rawCells().length);
+ Cell first = res.rawCells()[0];
+ for (Cell kv : res.rawCells()) {
+ assertTrue(CellUtil.matchingRow(first, kv));
+ assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+ }
+ }
+ results.close();
+ }
+ String tableDigestBefore = util.checksumRows(currentTable);
+ // Check region locality
+ HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
+ for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
+ hbd.add(region.getHDFSBlocksDistribution());
+ }
+ for (String hostname : hostnames) {
+ float locality = hbd.getBlockLocalityIndex(hostname);
+ LOG.info("locality of [" + hostname + "]: " + locality);
+ assertEquals(100, (int) (locality * 100));
+ }
+
+ // Cause regions to reopen
+ admin.disableTable(currentTableName);
+ while (!admin.isTableDisabled(currentTableName)) {
+ Thread.sleep(200);
+ LOG.info("Waiting for table to disable");
+ }
+ admin.enableTable(currentTableName);
+ util.waitTableAvailable(currentTableName);
+ assertEquals("Data should remain after reopening of regions",
+ tableDigestBefore, util.checksumRows(currentTable));
}
- admin.enableTable(tableName);
- util.waitTableAvailable(tableName);
- assertEquals("Data should remain after reopening of regions",
- tableDigestBefore, util.checksumRows(table));
} finally {
+ for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+ tableInfoSingle.getRegionLocator().close();
+ }
+ for (Entry singleTable : allTables.entrySet() ) {
+ singleTable.getValue().close();
+ util.deleteTable(singleTable.getValue().getName());
+ }
testDir.getFileSystem(conf).delete(testDir, true);
- util.deleteTable(tableName);
util.shutdownMiniCluster();
}
}
- private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
- RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException,
- UnsupportedEncodingException, InterruptedException, ClassNotFoundException {
+ private void runIncrementalPELoad(Configuration conf, List tableInfo, Path outDir,
+ boolean putSortReducer) throws IOException,
+ InterruptedException, ClassNotFoundException {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job, putSortReducer);
- HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
+ if (tableInfo.size() > 1) {
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
+ int sum = 0;
+ for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+ sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
+ }
+ assertEquals(sum, job.getNumReduceTasks());
+ }
+ else {
+ RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
+ HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
+ regionLocator);
+ assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
+ }
+
FileOutputFormat.setOutputPath(job, outDir);
assertFalse(util.getTestFileSystem().exists(outDir)) ;
- assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
-
assertTrue(job.waitForCompletion(true));
}
@@ -696,7 +803,10 @@ public class TestHFileOutputFormat2 {
getMockColumnFamiliesForCompression(numCfs);
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForCompression(table, familyToCompression);
- HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
+ conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
+ HFileOutputFormat2.serializeColumnFamilyAttribute
+ (HFileOutputFormat2.compressionDetails,
+ Arrays.asList(table.getTableDescriptor())));
// read back family specific compression setting from the configuration
Map retrievedFamilyToCompressionMap = HFileOutputFormat2
@@ -707,14 +817,14 @@ public class TestHFileOutputFormat2 {
for (Entry entry : familyToCompression.entrySet()) {
assertEquals("Compression configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
- retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
+ retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForCompression(Table table,
Map familyToCompression) throws IOException {
- HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry entry : familyToCompression.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@@ -766,7 +876,9 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBloomType(table,
familyToBloomType);
- HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
+ conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
+ HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
+ Arrays.asList(table.getTableDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@@ -779,14 +891,14 @@ public class TestHFileOutputFormat2 {
for (Entry entry : familyToBloomType.entrySet()) {
assertEquals("BloomType configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
- retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
+ retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForBloomType(Table table,
Map familyToDataBlockEncoding) throws IOException {
- HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@@ -835,7 +947,10 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBlockSize(table,
familyToBlockSize);
- HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
+ conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
+ HFileOutputFormat2.serializeColumnFamilyAttribute
+ (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
+ .getTableDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@@ -849,14 +964,14 @@ public class TestHFileOutputFormat2 {
) {
assertEquals("BlockSize configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
- retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
+ retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForBlockSize(Table table,
Map familyToDataBlockEncoding) throws IOException {
- HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@@ -910,7 +1025,10 @@ public class TestHFileOutputFormat2 {
setupMockColumnFamiliesForDataBlockEncoding(table,
familyToDataBlockEncoding);
HTableDescriptor tableDescriptor = table.getTableDescriptor();
- HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
+ conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ HFileOutputFormat2.serializeColumnFamilyAttribute
+ (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
+ .asList(tableDescriptor)));
// read back family specific data block encoding settings from the
// configuration
@@ -923,14 +1041,14 @@ public class TestHFileOutputFormat2 {
for (Entry entry : familyToDataBlockEncoding.entrySet()) {
assertEquals("DataBlockEncoding configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
- retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
+ retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
Map familyToDataBlockEncoding) throws IOException {
- HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@@ -995,7 +1113,7 @@ public class TestHFileOutputFormat2 {
// Setup table descriptor
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
- HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+ HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
Mockito.doReturn(htd).when(table).getTableDescriptor();
for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
htd.addFamily(hcd);
@@ -1099,15 +1217,15 @@ public class TestHFileOutputFormat2 {
util.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection();
Admin admin = conn.getAdmin();
- Table table = util.createTable(TABLE_NAME, FAMILIES);
- RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
+ Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
+ RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
final FileSystem fs = util.getDFSCluster().getFileSystem();
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath = new Path(
- FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
- new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
+ FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
+ new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
Bytes.toString(FAMILIES[0])));
assertEquals(0, fs.listStatus(storePath).length);
@@ -1117,8 +1235,8 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
- runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
- testDir, false);
+ runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
+ .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
}
@@ -1132,12 +1250,12 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
- admin.compact(TABLE_NAME);
+ admin.compact(TABLE_NAMES[0]);
try {
quickPoll(new Callable() {
@Override
public Boolean call() throws Exception {
- List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@@ -1152,11 +1270,11 @@ public class TestHFileOutputFormat2 {
}
// a major compaction should work though
- admin.majorCompact(TABLE_NAME);
+ admin.majorCompact(TABLE_NAMES[0]);
quickPoll(new Callable() {
@Override
public Boolean call() throws Exception {
- List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@@ -1182,13 +1300,13 @@ public class TestHFileOutputFormat2 {
Admin admin = conn.getAdmin()){
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
final FileSystem fs = util.getDFSCluster().getFileSystem();
- Table table = util.createTable(TABLE_NAME, FAMILIES);
+ Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath = new Path(
- FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
- new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
+ FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
+ new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
Bytes.toString(FAMILIES[0])));
assertEquals(0, fs.listStatus(storePath).length);
@@ -1196,7 +1314,7 @@ public class TestHFileOutputFormat2 {
Put p = new Put(Bytes.toBytes("test"));
p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
table.put(p);
- admin.flush(TABLE_NAME);
+ admin.flush(TABLE_NAMES[0]);
assertEquals(1, util.countRows(table));
quickPoll(new Callable() {
@Override
@@ -1209,8 +1327,9 @@ public class TestHFileOutputFormat2 {
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
true);
- RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
- runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false);
+ RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
+ runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
+ .getTableDescriptor(), regionLocator)), testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
@@ -1224,7 +1343,7 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
- admin.compact(TABLE_NAME);
+ admin.compact(TABLE_NAMES[0]);
try {
quickPoll(new Callable() {
@Override
@@ -1238,7 +1357,7 @@ public class TestHFileOutputFormat2 {
}
// a major compaction should work though
- admin.majorCompact(TABLE_NAME);
+ admin.majorCompact(TABLE_NAMES[0]);
quickPoll(new Callable() {
@Override
public Boolean call() throws Exception {
@@ -1273,15 +1392,15 @@ public class TestHFileOutputFormat2 {
if ("newtable".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
byte[][] splitKeys = generateRandomSplitKeys(4);
- try (Table table = util.createTable(tname, FAMILIES, splitKeys)) {
- }
+ Table table = util.createTable(tname, FAMILIES, splitKeys);
} else if ("incremental".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
try(Connection c = ConnectionFactory.createConnection(conf);
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out");
- runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false);
+ runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
+ .getTableDescriptor(tname), regionLocator)), outDir, false);
}
} else {
throw new RuntimeException(
@@ -1294,8 +1413,10 @@ public class TestHFileOutputFormat2 {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
- conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
- "ONE_SSD");
+
+ conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
+ Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
+ TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
util.startMiniDFSCluster(3);
@@ -1313,8 +1434,10 @@ public class TestHFileOutputFormat2 {
assertEquals("HOT", spB);
// alter table cf schema to change storage policies
- HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
- HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
+ HFileOutputFormat2.configureStoragePolicy(conf, fs,
+ HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
+ HFileOutputFormat2.configureStoragePolicy(conf, fs,
+ HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
spA = getStoragePolicyName(fs, cf1Dir);
spB = getStoragePolicyName(fs, cf2Dir);
LOG.debug("Storage policy of cf 0: [" + spA + "].");
@@ -1368,6 +1491,5 @@ public class TestHFileOutputFormat2 {
return null;
}
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
deleted file mode 100644
index 781eaa984a2..00000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableHFileOutputFormat.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.PerformanceEvaluation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
- * writes hfiles.
- */
-@Category(MediumTests.class)
-public class TestMultiTableHFileOutputFormat {
- private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class);
-
- private HBaseTestingUtility util = new HBaseTestingUtility();
-
- private static int ROWSPERSPLIT = 10;
-
- private static final int KEYLEN_DEFAULT = 10;
- private static final String KEYLEN_CONF = "randomkv.key.length";
-
- private static final int VALLEN_DEFAULT = 10;
- private static final String VALLEN_CONF = "randomkv.val.length";
-
- private static final byte[][] TABLES =
- { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
- Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
-
- private static final byte[][] FAMILIES =
- { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
- Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
-
- private static final byte[] QUALIFIER = Bytes.toBytes("data");
-
- /**
- * Run small MR job. this MR job will write HFile into
- * testWritingDataIntoHFiles/tableNames/columnFamilies/
- */
- @Test
- public void testWritingDataIntoHFiles() throws Exception {
- Configuration conf = util.getConfiguration();
- util.startMiniCluster();
- Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
- FileSystem fs = testDir.getFileSystem(conf);
- LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
-
- // Set down this value or we OOME in eclipse.
- conf.setInt("mapreduce.task.io.sort.mb", 20);
- // Write a few files by setting max file size.
- conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
-
- try {
- Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
-
- FileOutputFormat.setOutputPath(job, testDir);
-
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(Random_TableKV_GeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setReducerClass(Table_KeyValueSortReducer.class);
- job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
- job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
-
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- LOG.info("\nStarting test testWritingDataIntoHFiles\n");
- assertTrue(job.waitForCompletion(true));
- LOG.info("\nWaiting on checking MapReduce output\n");
- assertTrue(checkMROutput(fs, testDir, 0));
- } finally {
- testDir.getFileSystem(conf).delete(testDir, true);
- util.shutdownMiniCluster();
- }
- }
-
- /**
- * check whether create directory and hfiles as format designed in MultiHFilePartitioner
- * and also check whether the output file has same related configuration as created table
- */
- @Test
- public void testMultiHFilePartitioner() throws Exception {
- Configuration conf = util.getConfiguration();
- util.startMiniCluster();
- Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner");
- FileSystem fs = testDir.getFileSystem(conf);
- LOG.info("testMultiHFilePartitioner dir writing to : " + testDir);
-
- // Set down this value or we OOME in eclipse.
- conf.setInt("mapreduce.task.io.sort.mb", 20);
- // Write a few files by setting max file size.
- conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
-
- // Create several tables for testing
- List tables = new ArrayList();
-
- // to store splitKeys for TABLE[0] for testing;
- byte[][] testKeys = new byte[0][0];
- for (int i = 0; i < TABLES.length; i++) {
- TableName tableName = TableName.valueOf(TABLES[i]);
- byte[][] splitKeys = generateRandomSplitKeys(3);
- if (i == 0) {
- testKeys = splitKeys;
- }
- HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
- for (int j = 0; j < FAMILIES.length; j++) {
- HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]);
- //only set Tables[0] configuration, and specify compression type and DataBlockEncode
- if (i == 0) {
- familyDescriptor.setCompressionType(Compression.Algorithm.GZ);
- familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- }
- tableDescriptor.addFamily(familyDescriptor);
- }
- util.createTable(tableDescriptor, splitKeys, conf);
- tables.add(tableName);
- }
- // set up for MapReduce job
- try {
- Job job = Job.getInstance(conf, "testMultiHFilePartitioner");
- FileOutputFormat.setOutputPath(job, testDir);
-
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(Random_TableKV_GeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables);
-
- LOG.info("Starting test testWritingDataIntoHFiles");
- assertTrue(job.waitForCompletion(true));
- LOG.info("Waiting on checking MapReduce output");
- assertTrue(checkMROutput(fs, testDir, 0));
- assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys));
- } finally {
- for (int i = 0; i < TABLES.length; i++) {
- TableName tName = TableName.valueOf(TABLES[i]);
- util.deleteTable(tName);
- }
- fs.delete(testDir, true);
- fs.close();
- util.shutdownMiniCluster();
- }
- }
-
- /**
- * check the output hfile has same configuration as created test table
- * and also check whether hfiles get split correctly
- * only check TABLES[0]
- */
- private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException {
- FileStatus[] fStats = fs.listStatus(testDir);
- for (FileStatus stats : fStats) {
- if (stats.getPath().getName().equals(new String(TABLES[0]))) {
- FileStatus[] cfStats = fs.listStatus(stats.getPath());
- for (FileStatus cfstat : cfStats) {
- FileStatus[] hfStats = fs.listStatus(cfstat.getPath());
-
- List firsttKeys = new ArrayList();
- List lastKeys = new ArrayList();
- for (FileStatus hfstat : hfStats) {
- if (HFile.isHFileFormat(fs, hfstat)) {
- HFile.Reader hfr =
- HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf);
- if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr
- .getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false;
- firsttKeys.add(hfr.getFirstRowKey());
- lastKeys.add(hfr.getLastRowKey());
- }
- }
- if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) {
- return false;
- }
- }
- }
- }
- return true;
- }
-
- /**
- * Check whether the Hfile has been split by region boundaries
- * @param splitKeys split keys for that table
- * @param firstKeys first rowKey for hfiles
- * @param lastKeys last rowKey for hfiles
- */
- private boolean checkFileSplit(byte[][] splitKeys, List firstKeys, List lastKeys) {
- Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR);
- Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR);
- Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR);
-
- int is = 0, il = 0;
- for (byte[] key : lastKeys) {
- while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++;
- if (is == splitKeys.length) {
- break;
- }
- if (is > 0) {
- if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false;
- }
- il++;
- }
-
- if (is == splitKeys.length) {
- return il == lastKeys.size() - 1;
- }
- return true;
- }
-
-
- /**
- * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
- * created directory is correct or not A recursion method, the testDir had better be small size
- */
- private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException {
- if (level >= 3) {
- return HFile.isHFileFormat(fs, testDir);
- }
- FileStatus[] fStats = fs.listStatus(testDir);
- if (fStats == null || fStats.length <= 0) {
- LOG.info("Created directory format is not correct");
- return false;
- }
-
- for (FileStatus stats : fStats) {
- // skip the _SUCCESS file created by MapReduce
- if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
- continue;
- if (level < 2 && !stats.isDirectory()) {
- LOG.info("Created directory format is not correct");
- return false;
- }
- boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
- if (flag == false) return false;
- }
- return true;
- }
-
-
- private byte[][] generateRandomSplitKeys(int numKeys) {
- Random random = new Random();
- byte[][] ret = new byte[numKeys][];
- for (int i = 0; i < numKeys; i++) {
- ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT);
- }
- return ret;
- }
-
-
- /**
- * Simple mapper that makes output. With no input data
- */
- static class Random_TableKV_GeneratingMapper
- extends Mapper {
-
- private int keyLength;
- private int valLength;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
- keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
- valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
- }
-
- @Override
- protected void map(NullWritable n1, NullWritable n2,
- Mapper.Context context)
- throws java.io.IOException, InterruptedException {
-
- byte keyBytes[] = new byte[keyLength];
- byte valBytes[] = new byte[valLength];
-
- ArrayList tables = new ArrayList<>();
- for (int i = 0; i < TABLES.length; i++) {
- tables.add(new ImmutableBytesWritable(TABLES[i]));
- }
-
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
- Random random = new Random();
-
- for (int i = 0; i < ROWSPERSPLIT; i++) {
- random.nextBytes(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
- random.nextBytes(valBytes);
-
- for (ImmutableBytesWritable table : tables) {
- for (byte[] family : FAMILIES) {
- Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
- context.write(table, kv);
- }
- }
- }
- }
- }
-
- /**
- * Simple Reducer that have input , with KeyValues have no order. and output
- * , with KeyValues are ordered
- */
-
- static class Table_KeyValueSortReducer
- extends Reducer {
- protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs,
- org.apache.hadoop.mapreduce.Reducer.Context context)
- throws java.io.IOException, InterruptedException {
- TreeSet map = new TreeSet<>(CellComparator.COMPARATOR);
- for (KeyValue kv : kvs) {
- try {
- map.add(kv.clone());
- } catch (CloneNotSupportedException e) {
- throw new java.io.IOException(e);
- }
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv : map) {
- context.write(table, kv);
- if (++index % 100 == 0) context.setStatus("Wrote " + index);
- }
- }
- }
-}
\ No newline at end of file