HBASE-18161 Incremental Load support for Multiple-Table HFileOutputFormat (Densel Santhmayor)

This commit is contained in:
tedyu 2017-06-27 12:31:55 -07:00
parent 14957d4a7d
commit 293cb87d52
4 changed files with 775 additions and 1370 deletions

View File

@ -22,14 +22,20 @@ import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -88,24 +94,48 @@ import com.google.common.annotations.VisibleForTesting;
* all HFiles being written.
* <p>
* Using this class as part of a MapReduce job is best done
* using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
* using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)}.
*/
@InterfaceAudience.Public
public class HFileOutputFormat2
extends FileOutputFormat<ImmutableBytesWritable, Cell> {
private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
static class TableInfo {
private HTableDescriptor hTableDescriptor;
private RegionLocator regionLocator;
public TableInfo(HTableDescriptor hTableDescriptor, RegionLocator regionLocator) {
this.hTableDescriptor = hTableDescriptor;
this.regionLocator = regionLocator;
}
public HTableDescriptor getHTableDescriptor() {
return hTableDescriptor;
}
public RegionLocator getRegionLocator() {
return regionLocator;
}
}
protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
protected static byte[] combineTableNameSuffix(byte[] tableName,
byte[] suffix ) {
return Bytes.add(tableName, tableSeparator, suffix);
}
// The following constants are private since these are used by
// HFileOutputFormat2 to internally transfer data between job setup and
// reducer run using conf.
// These should not be changed by the client.
private static final String COMPRESSION_FAMILIES_CONF_KEY =
static final String COMPRESSION_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.compression";
private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomtype";
private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.blocksize";
private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
// This constant is public since the client can modify this when setting
@ -121,8 +151,10 @@ public class HFileOutputFormat2
public static final String LOCALITY_SENSITIVE_CONF_KEY =
"hbase.bulkload.locality.sensitive.enabled";
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";
public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@ -133,270 +165,277 @@ public class HFileOutputFormat2
return createRecordWriter(context);
}
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createRecordWriter(final TaskAttemptContext context) throws IOException {
return new HFileRecordWriter<>(context, null);
protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
return combineTableNameSuffix(tableName, family);
}
protected static class HFileRecordWriter<V extends Cell>
extends RecordWriter<ImmutableBytesWritable, V> {
private final TaskAttemptContext context;
private final Path outputPath;
private final Path outputDir;
private final Configuration conf;
private final FileSystem fs;
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createRecordWriter(final TaskAttemptContext context)
throws IOException {
private final long maxsize;
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
if (writeTableNames==null || writeTableNames.isEmpty()) {
throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
+ " cannot be empty");
}
final FileSystem fs = outputDir.getFileSystem(conf);
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompressionStr = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
final Algorithm defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr);
final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
private final Algorithm defaultCompression;
private final boolean compactionExclude;
final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
Bytes.toString(tableSeparator))).collect(Collectors.toSet());
private final Map<byte[], Algorithm> compressionMap;
private final Map<byte[], BloomType> bloomTypeMap;
private final Map<byte[], Integer> blockSizeMap;
// create a map from column family to the compression algorithm
final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
private final DataBlockEncoding overriddenEncoding;
private final Map<byte[], WriterLength> writers;
private byte[] previousRow;
private final byte[] now;
private boolean rollRequested;
/**
* Mapredue job will create a temp path for outputting results. If out != null, it means that
* the caller has set the temp working dir; If out == null, it means we need to set it here.
* Used by HFileOutputFormat2 and MultiTableHFileOutputFormat. MultiTableHFileOutputFormat will give us
* temp working dir at the table level and HFileOutputFormat2 has to set it here within this
* constructor.
*/
public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
throws IOException {
// Get the path of the temporary output file
context = taContext;
if (out == null) {
outputPath = FileOutputFormat.getOutputPath(context);
outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
} else {
outputPath = out;
outputDir = outputPath;
}
conf = context.getConfiguration();
fs = outputDir.getFileSystem(conf);
// These configs. are from hbase-*.xml
maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
compactionExclude =
conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
// create a map from column family to the compression algorithm
compressionMap = createFamilyCompressionMap(conf);
bloomTypeMap = createFamilyBloomTypeMap(conf);
blockSizeMap = createFamilyBlockSizeMap(conf);
// Config for data block encoding
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
if (dataBlockEncodingStr != null) {
overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
} else {
overriddenEncoding = null;
}
writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
previousRow = HConstants.EMPTY_BYTE_ARRAY;
now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
rollRequested = false;
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
final Map<byte[], DataBlockEncoding> datablockEncodingMap
= createFamilyDataBlockEncodingMap(conf);
final DataBlockEncoding overriddenEncoding;
if (dataBlockEncodingStr != null) {
overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
} else {
overriddenEncoding = null;
}
@Override
public void write(ImmutableBytesWritable row, V cell) throws IOException {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
return new RecordWriter<ImmutableBytesWritable, V>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte[], WriterLength> writers =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
private boolean rollRequested = false;
// null input == user explicitly wants to flush
if (row == null && kv == null) {
rollWriters();
return;
}
@Override
public void write(ImmutableBytesWritable row, V cell)
throws IOException {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
byte[] rowKey = CellUtil.cloneRow(kv);
long length = kv.getLength();
byte[] family = CellUtil.cloneFamily(kv);
WriterLength wl = this.writers.get(family);
// null input == user explicitly wants to flush
if (row == null && kv == null) {
rollWriters();
return;
}
// If this is a new column family, verify that the directory exists
if (wl == null) {
Path cfPath = new Path(outputDir, Bytes.toString(family));
fs.mkdirs(cfPath);
configureStoragePolicy(conf, fs, family, cfPath);
}
// If any of the HFiles for the column families has reached
// maxsize, we need to roll all the writers
if (wl != null && wl.written + length >= maxsize) {
this.rollRequested = true;
}
// This can only happen once a row is finished though
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
rollWriters();
}
// create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
if (tableName != null) {
try (Connection connection = ConnectionFactory.createConnection(conf);
RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey);
} catch (Throwable e) {
LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
e);
loc = null;
}
}
if (null == loc) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"failed to get region location, so use default writer: " + Bytes.toString(rowKey));
}
wl = getNewWriter(family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
}
InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
if (initialIsa.isUnresolved()) {
if (LOG.isTraceEnabled()) {
LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+ loc.getPort() + ", so use default writer");
}
wl = getNewWriter(family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
}
wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
}
byte[] rowKey = CellUtil.cloneRow(kv);
long length = kv.getLength();
byte[] family = CellUtil.cloneFamily(kv);
byte[] tableNameBytes = null;
if (writeMultipleTables) {
tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
"' not" + " expected");
}
} else {
wl = getNewWriter(family, conf, null);
tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
}
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
WriterLength wl = this.writers.get(tableAndFamily);
// If this is a new column family, verify that the directory exists
if (wl == null) {
Path writerPath = null;
if (writeMultipleTables) {
writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
.toString(family)));
}
else {
writerPath = new Path(outputDir, Bytes.toString(family));
}
fs.mkdirs(writerPath);
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
}
// If any of the HFiles for the column families has reached
// maxsize, we need to roll all the writers
if (wl != null && wl.written + length >= maxsize) {
this.rollRequested = true;
}
// This can only happen once a row is finished though
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
rollWriters();
}
// create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;
String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
try (Connection connection = ConnectionFactory.createConnection(conf);
RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey);
} catch (Throwable e) {
LOG.warn("There's something wrong when locating rowkey: " +
Bytes.toString(rowKey) + " for tablename: " + tableName, e);
loc = null;
} }
if (null == loc) {
if (LOG.isTraceEnabled()) {
LOG.trace("failed to get region location, so use default writer for rowkey: " +
Bytes.toString(rowKey));
}
wl = getNewWriter(tableNameBytes, family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
}
InetSocketAddress initialIsa =
new InetSocketAddress(loc.getHostname(), loc.getPort());
if (initialIsa.isUnresolved()) {
if (LOG.isTraceEnabled()) {
LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+ loc.getPort() + ", so use default writer");
}
wl = getNewWriter(tableNameBytes, family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
}
wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
});
}
}
} else {
wl = getNewWriter(tableNameBytes, family, conf, null);
}
}
// we now have the proper WAL writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
// Copy the row so we know when a row transition.
this.previousRow = rowKey;
}
private void rollWriters() throws IOException {
for (WriterLength wl : this.writers.values()) {
if (wl.writer != null) {
LOG.info(
"Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
close(wl.writer);
}
wl.writer = null;
wl.written = 0;
}
this.rollRequested = false;
}
/*
* Create a new StoreFile.Writer.
* @param family
* @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
justification="Not important")
private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
conf, InetSocketAddress[] favoredNodes) throws IOException {
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
Path familydir = new Path(outputDir, Bytes.toString(family));
if (writeMultipleTables) {
familydir = new Path(outputDir,
new Path(Bytes.toString(tableName), Bytes.toString(family)));
}
WriterLength wl = new WriterLength();
Algorithm compression = compressionMap.get(tableAndFamily);
compression = compression == null ? defaultCompression : compression;
BloomType bloomType = bloomTypeMap.get(tableAndFamily);
bloomType = bloomType == null ? BloomType.NONE : bloomType;
Integer blockSize = blockSizeMap.get(tableAndFamily);
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
DataBlockEncoding encoding = overriddenEncoding;
encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
Configuration tempConf = new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
HFileContextBuilder contextBuilder = new HFileContextBuilder()
.withCompression(compression)
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(blockSize);
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true);
}
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) {
wl.writer =
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
} else {
wl.writer =
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
this.writers.put(tableAndFamily, wl);
return wl;
}
private void close(final StoreFileWriter w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(context.getTaskAttemptID().toString()));
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
w.close();
}
}
// we now have the proper WAL writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
// Copy the row so we know when a row transition.
this.previousRow = rowKey;
}
private void rollWriters() throws IOException {
for (WriterLength wl : this.writers.values()) {
if (wl.writer != null) {
LOG.info(
"Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
@Override
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
for (WriterLength wl: this.writers.values()) {
close(wl.writer);
}
wl.writer = null;
wl.written = 0;
}
this.rollRequested = false;
}
/*
* Create a new StoreFile.Writer.
* @param family
* @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
justification = "Not important")
private WriterLength getNewWriter(byte[] family, Configuration conf,
InetSocketAddress[] favoredNodes) throws IOException {
WriterLength wl = new WriterLength();
Path familyDir = new Path(outputDir, Bytes.toString(family));
Algorithm compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
BloomType bloomType = bloomTypeMap.get(family);
bloomType = bloomType == null ? BloomType.NONE : bloomType;
Integer blockSize = blockSizeMap.get(family);
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
DataBlockEncoding encoding = overriddenEncoding;
encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
Configuration tempConf = new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true);
}
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) {
wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familyDir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
} else {
wl.writer =
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familyDir).withBloomType(bloomType)
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
this.writers.put(family, wl);
return wl;
}
private void close(final StoreFileWriter w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(context.getTaskAttemptID().toString()));
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
w.close();
}
}
@Override
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
for (WriterLength wl : this.writers.values()) {
close(wl.writer);
}
}
};
}
/**
* Configure block storage policy for CF after the directory is created.
*/
static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
byte[] family, Path cfPath) {
if (null == conf || null == fs || null == family || null == cfPath) {
byte[] tableAndFamily, Path cfPath) {
if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
return;
}
String policy =
conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
conf.get(STORAGE_POLICY_PROPERTY));
FSUtils.setStoragePolicy(fs, cfPath, policy);
}
@ -413,12 +452,29 @@ public class HFileOutputFormat2
* Return the start keys of all of the regions in this table,
* as a list of ImmutableBytesWritable.
*/
private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
throws IOException {
byte[][] byteKeys = table.getStartKeys();
ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
ret.add(new ImmutableBytesWritable(byteKey));
private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
boolean writeMultipleTables)
throws IOException {
ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
for(RegionLocator regionLocator : regionLocators)
{
TableName tableName = regionLocator.getName();
LOG.info("Looking up current regions for table " + tableName);
byte[][] byteKeys = regionLocator.getStartKeys();
for (byte[] byteKey : byteKeys) {
byte[] fullKey = byteKey; //HFileOutputFormat2 use case
if (writeMultipleTables)
{
//MultiTableHFileOutputFormat use case
fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
}
if (LOG.isDebugEnabled()) {
LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
(fullKey) + "]");
}
ret.add(new ImmutableBytesWritable(fullKey));
}
}
return ret;
}
@ -429,7 +485,7 @@ public class HFileOutputFormat2
*/
@SuppressWarnings("deprecation")
private static void writePartitions(Configuration conf, Path partitionsPath,
List<ImmutableBytesWritable> startKeys) throws IOException {
List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed");
@ -440,14 +496,17 @@ public class HFileOutputFormat2
// so we need to remove it. Otherwise we would end up with an
// empty reducer with index 0
TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
ImmutableBytesWritable first = sorted.first();
if (writeMultipleTables) {
first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
().get()));
}
if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
throw new IllegalArgumentException(
"First region of table should have empty start key. Instead has: "
+ Bytes.toStringBinary(first.get()));
}
sorted.remove(first);
sorted.remove(sorted.first());
// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
@ -499,17 +558,25 @@ public class HFileOutputFormat2
*/
public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
RegionLocator regionLocator) throws IOException {
configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
UnsupportedEncodingException {
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
}
boolean writeMultipleTables = false;
if (MultiTableHFileOutputFormat.class.equals(cls)) {
writeMultipleTables = true;
conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
}
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
@ -528,28 +595,44 @@ public class HFileOutputFormat2
KeyValueSerialization.class.getName());
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
// record this table name for creating writer by favored nodes
LOG.info("bulkload locality sensitive enabled");
conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
}
/* Now get the region start keys for every table required */
List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
List<HTableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
for( TableInfo tableInfo : multiTableInfo )
{
regionLocators.add(tableInfo.getRegionLocator());
allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
tableDescriptors.add(tableInfo.getHTableDescriptor());
}
// Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
.toString(tableSeparator)));
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + regionLocator.getName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
"to match current region count for all tables");
job.setNumReduceTasks(startKeys.size());
configurePartitioner(job, startKeys);
configurePartitioner(job, startKeys, writeMultipleTables);
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
tableDescriptors));
conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
tableDescriptors));
conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
tableDescriptors));
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
}
public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
@ -560,11 +643,19 @@ public class HFileOutputFormat2
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
ArrayList<HTableDescriptor> singleTableDescriptor = new ArrayList<>(1);
singleTableDescriptor.add(tableDescriptor);
conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getNameAsString());
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
conf.set(COMPRESSION_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
@ -667,7 +758,7 @@ public class HFileOutputFormat2
continue;
}
try {
confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
@ -681,7 +772,8 @@ public class HFileOutputFormat2
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
writeMultipleTables)
throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
@ -691,7 +783,7 @@ public class HFileOutputFormat2
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints);
writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
fs.deleteOnExit(partitionsPath);
// configure job to use it
@ -699,6 +791,34 @@ public class HFileOutputFormat2
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
static String serializeColumnFamilyAttribute(Function<HColumnDescriptor, String> fn, List<HTableDescriptor> allTables)
throws UnsupportedEncodingException {
StringBuilder attributeValue = new StringBuilder();
int i = 0;
for (HTableDescriptor tableDescriptor : allTables) {
if (tableDescriptor == null) {
// could happen with mock table instance
// CODEREVIEW: Can I set an empty string in conf if mock table instance?
return "";
}
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
attributeValue.append('&');
}
attributeValue.append(URLEncoder.encode(
Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
"UTF-8"));
attributeValue.append('=');
attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
}
}
// Get rid of the last ampersand
return attributeValue.toString();
}
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
@ -708,134 +828,65 @@ public class HFileOutputFormat2
* @throws IOException
* on failure to read column family descriptors
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
throws UnsupportedEncodingException {
StringBuilder compressionConfigValue = new StringBuilder();
if(tableDescriptor == null){
// could happen with mock table instance
return;
}
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
compressionConfigValue.append('&');
}
compressionConfigValue.append(URLEncoder.encode(
familyDescriptor.getNameAsString(), "UTF-8"));
compressionConfigValue.append('=');
compressionConfigValue.append(URLEncoder.encode(
familyDescriptor.getCompressionType().getName(), "UTF-8"));
}
// Get rid of the last ampersand
conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
}
static Function<HColumnDescriptor, String> compressionDetails = familyDescriptor ->
familyDescriptor.getCompressionType().getName();
/**
* Serialize column family to block size map to configuration.
* Invoked while configuring the MR job for incremental load.
* @param tableDescriptor to read the properties from
* @param conf to persist serialized values into
* Serialize column family to block size map to configuration. Invoked while
* configuring the MR job for incremental load.
*
* @param tableDescriptor
* to read the properties from
* @param conf
* to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
throws UnsupportedEncodingException {
StringBuilder blockSizeConfigValue = new StringBuilder();
if (tableDescriptor == null) {
// could happen with mock table instance
return;
}
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
blockSizeConfigValue.append('&');
}
blockSizeConfigValue.append(URLEncoder.encode(
familyDescriptor.getNameAsString(), "UTF-8"));
blockSizeConfigValue.append('=');
blockSizeConfigValue.append(URLEncoder.encode(
String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
}
// Get rid of the last ampersand
conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
}
static Function<HColumnDescriptor, String> blockSizeDetails = familyDescriptor -> String
.valueOf(familyDescriptor.getBlocksize());
/**
* Serialize column family to bloom type map to configuration.
* Invoked while configuring the MR job for incremental load.
* @param tableDescriptor to read the properties from
* @param conf to persist serialized values into
* Serialize column family to bloom type map to configuration. Invoked while
* configuring the MR job for incremental load.
*
* @param tableDescriptor
* to read the properties from
* @param conf
* to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
throws UnsupportedEncodingException {
if (tableDescriptor == null) {
// could happen with mock table instance
return;
static Function<HColumnDescriptor, String> bloomTypeDetails = familyDescriptor -> {
String bloomType = familyDescriptor.getBloomFilterType().toString();
if (bloomType == null) {
bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
}
StringBuilder bloomTypeConfigValue = new StringBuilder();
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
bloomTypeConfigValue.append('&');
}
bloomTypeConfigValue.append(URLEncoder.encode(
familyDescriptor.getNameAsString(), "UTF-8"));
bloomTypeConfigValue.append('=');
String bloomType = familyDescriptor.getBloomFilterType().toString();
if (bloomType == null) {
bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
}
bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
}
conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
}
return bloomType;
};
/**
* Serialize column family to data block encoding map to configuration.
* Invoked while configuring the MR job for incremental load.
*
* @param tableDescriptor to read the properties from
* @param conf to persist serialized values into
* @param tableDescriptor
* to read the properties from
* @param conf
* to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
Configuration conf) throws UnsupportedEncodingException {
if (tableDescriptor == null) {
// could happen with mock table instance
return;
static Function<HColumnDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
if (encoding == null) {
encoding = DataBlockEncoding.NONE;
}
StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
dataBlockEncodingConfigValue.append('&');
}
dataBlockEncodingConfigValue.append(
URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
dataBlockEncodingConfigValue.append('=');
DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
if (encoding == null) {
encoding = DataBlockEncoding.NONE;
}
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
"UTF-8"));
}
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
dataBlockEncodingConfigValue.toString());
}
return encoding.toString();
};
}

View File

@ -6,504 +6,118 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.TreeMap;
import java.util.ArrayList;
import java.util.TreeSet;
import java.util.Collections;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
* Create 3 level tree directory, first level is using table name as parent directory and then use
* family name as child directory, and all related HFiles for one family are under child directory
* Create 3 level tree directory, first level is using table name as parent
* directory and then use family name as child directory, and all related HFiles
* for one family are under child directory
* -tableName1
* -columnFamilyName1
* -HFile (region1)
* -columnFamilyName2
* -HFile1 (region1)
* -HFile2 (region2)
* -HFile3 (region3)
* -columnFamilyName1
* -columnFamilyName2
* -HFiles
* -tableName2
* -columnFamilyName1
* -HFile (region1)
* family directory and its hfiles match the output of HFileOutputFormat2
* @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
* -columnFamilyName1
* -HFiles
* -columnFamilyName2
*/
@InterfaceAudience.Public
@VisibleForTesting
public class MultiTableHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
@Override
public RecordWriter<ImmutableBytesWritable, Cell>
getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
return createMultiHFileRecordWriter(context);
}
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
// Get the path of the output directory
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
// Map of existing tables, avoid calling getTable() everytime
final Map<ImmutableBytesWritable, Table> tables = new HashMap<>();
// Map of tables to writers
final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters = new HashMap<>();
return new RecordWriter<ImmutableBytesWritable, V>() {
@Override
public void write(ImmutableBytesWritable tableName, V cell)
throws IOException, InterruptedException {
RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
// if there is new table, verify that table directory exists
if (tableWriter == null) {
// using table name as directory name
final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
fs.mkdirs(tableOutputDir);
LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+ tableOutputDir.toString());
// Configure for tableWriter, if table exist, write configuration of table into conf
Table table = null;
if (tables.containsKey(tableName)) {
table = tables.get(tableName);
} else {
table = getTable(tableName.copyBytes(), conn, admin);
tables.put(tableName, table);
}
if (table != null) {
configureForOneTable(conf, table.getTableDescriptor());
}
// Create writer for one specific table
tableWriter = new HFileOutputFormat2.HFileRecordWriter<>(context, tableOutputDir);
// Put table into map
tableWriters.put(tableName, tableWriter);
}
// Write <Row, Cell> into tableWriter
// in the original code, it does not use Row
tableWriter.write(null, cell);
}
@Override
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
writer.close(c);
}
if (conn != null) {
conn.close();
}
if (admin != null) {
admin.close();
}
}
};
}
/**
* Configure for one table, should be used before creating a new HFileRecordWriter,
* Set compression algorithms and related configuration based on column families
* Creates a composite key to use as a mapper output key when using
* MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
*
* @param tableName Name of the Table - Eg: TableName.getNameAsString()
* @param suffix Usually represents a rowkey when creating a mapper key or column family
* @return byte[] representation of composite key
*/
private static void configureForOneTable(Configuration conf, final HTableDescriptor tableDescriptor)
throws UnsupportedEncodingException {
HFileOutputFormat2.configureCompression(conf, tableDescriptor);
HFileOutputFormat2.configureBlockSize(tableDescriptor, conf);
HFileOutputFormat2.configureBloomType(tableDescriptor, conf);
HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
public static byte[] createCompositeKey(byte[] tableName,
byte[] suffix) {
return combineTableNameSuffix(tableName, suffix);
}
/**
* Configure a MapReduce Job to output HFiles for performing an incremental load into
* the multiple tables.
* <ul>
* <li>Inspects the tables to configure a partitioner based on their region boundaries</li>
* <li>Writes the partitions file and configures the partitioner</li>
* <li>Sets the number of reduce tasks to match the total number of all tables' regions</li>
* <li>Sets the reducer up to perform the appropriate sorting (KeyValueSortReducer)</li>
* </ul>
* Alternate api which accepts an ImmutableBytesWritable for the suffix
* @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
*/
public static byte[] createCompositeKey(byte[] tableName,
ImmutableBytesWritable suffix) {
return combineTableNameSuffix(tableName, suffix.get());
}
/**
* Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
* suffix
* @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
*/
public static byte[] createCompositeKey(String tableName,
ImmutableBytesWritable suffix) {
return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
}
/**
* Analogous to
* {@link HFileOutputFormat2#configureIncrementalLoad(Job, HTableDescriptor, RegionLocator)},
* this function will configure the requisite number of reducers to write HFiles for multple
* tables simultaneously
*
* ConfigureIncrementalLoad has set up partitioner and reducer for mapreduce job.
* Caller needs to setup input path, output path and mapper
*
* @param job
* @param tables A list of tables to inspects
* @param job See {@link org.apache.hadoop.mapreduce.Job}
* @param multiTableDescriptors Table descriptor and region locator pairs
* @throws IOException
*/
public static void configureIncrementalLoad(Job job, List<TableName> tables) throws IOException {
configureIncrementalLoad(job, tables, MultiTableHFileOutputFormat.class);
public static void configureIncrementalLoad(Job job, List<TableInfo>
multiTableDescriptors)
throws IOException {
MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
MultiTableHFileOutputFormat.class);
}
public static void configureIncrementalLoad(Job job, List<TableName> tables,
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
final private static int validateCompositeKey(byte[] keyBytes) {
Configuration conf = job.getConfiguration();
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys =
MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables);
configureIncrementalLoad(job, tableSplitKeys, cls);
int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
// Either the separator was not found or a tablename wasn't present or a key wasn't present
if (separatorIdx == -1) {
throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
.toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
}
return separatorIdx;
}
/**
* Same purpose as configureIncrementalLoad(Job job, List<TableName> tables)
* Used when region startKeys of each table is available, input as <TableName, List<RegionStartKey>>
*
* Caller needs to transfer TableName and byte[] to ImmutableBytesWritable
*/
public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
List<ImmutableBytesWritable>> tableSplitKeys) throws IOException {
configureIncrementalLoad(job, tableSplitKeys, MultiTableHFileOutputFormat.class);
protected static byte[] getTableName(byte[] keyBytes) {
int separatorIdx = validateCompositeKey(keyBytes);
return Bytes.copy(keyBytes, 0, separatorIdx);
}
public static void configureIncrementalLoad(Job job, Map<ImmutableBytesWritable,
List<ImmutableBytesWritable>> tableSplitKeys, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
// file path to store <table, splitKey>
String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
final Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
LOG.info("Writing partition info into dir: " + partitionsPath.toString());
job.setPartitionerClass(MultiHFilePartitioner.class);
// get split keys for all the tables, and write them into partition file
MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys);
MultiHFilePartitioner.setPartitionFile(conf, partitionsPath);
partitionsPath.getFileSystem(conf).makeQualified(partitionsPath);
partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath);
// now only support Mapper output <ImmutableBytesWritable, KeyValue>
// we can use KeyValueSortReducer directly to sort Mapper output
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
int reducerNum = getReducerNumber(tableSplitKeys);
job.setNumReduceTasks(reducerNum);
LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count");
// setup output format
job.setOutputFormatClass(cls);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
protected static byte[] getSuffix(byte[] keyBytes) {
int separatorIdx = validateCompositeKey(keyBytes);
return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
}
/**
* Check if table exist, should not dependent on HBase instance
* @return instance of table, if it exist
*/
private static Table getTable(final byte[] tableName, Connection conn, Admin admin) {
if (conn == null || admin == null) {
LOG.info("can not get Connection or Admin");
return null;
}
try {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
return conn.getTable(table);
}
} catch (IOException e) {
LOG.info("Exception found in getTable()" + e.toString());
return null;
}
LOG.warn("Table: '" + TableName.valueOf(tableName) + "' does not exist");
return null;
}
/**
* Get the number of reducers by tables' split keys
*/
private static int getReducerNumber(
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> tableSplitKeys) {
int reducerNum = 0;
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : tableSplitKeys.entrySet()) {
reducerNum += entry.getValue().size();
}
return reducerNum;
}
/**
* MultiTableHFileOutputFormat writes files based on partitions created by MultiHFilePartitioner
* The input is partitioned based on table's name and its region boundaries with the table.
* Two records are in the same partition if they have same table name and the their cells are
* in the same region
*/
static class MultiHFilePartitioner extends Partitioner<ImmutableBytesWritable, Cell>
implements Configurable {
public static final String DEFAULT_PATH = "_partition_multihfile.lst";
public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path";
private Configuration conf;
// map to receive <table, splitKeys> from file
private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> table_SplitKeys;
// each <table,splitKey> pair is map to one unique integer
private TreeMap<TableSplitKeyPair, Integer> partitionMap;
@Override
public void setConf(Configuration conf) {
try {
this.conf = conf;
partitionMap = new TreeMap<>();
table_SplitKeys = readTableSplitKeys(conf);
// initiate partitionMap by table_SplitKeys map
int splitNum = 0;
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : table_SplitKeys.entrySet()) {
ImmutableBytesWritable table = entry.getKey();
List<ImmutableBytesWritable> list = entry.getValue();
for (ImmutableBytesWritable splitKey : list) {
partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++);
}
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
@Override
public Configuration getConf() {
return conf;
}
/**
* Set the path to the SequenceFile storing the sorted <table, splitkey>. It must be the case
* that for <tt>R</tt> reduces, there are <tt>R-1</tt> keys in the SequenceFile.
*/
public static void setPartitionFile(Configuration conf, Path p) {
conf.set(PARTITIONER_PATH, p.toString());
}
/**
* Get the path to the SequenceFile storing the sorted <table, splitkey>.
* @see #setPartitionFile(Configuration, Path)
*/
public static String getPartitionFile(Configuration conf) {
return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
}
/**
* Return map of <tableName, the start keys of all of the regions in this table>
*/
public static Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> getTablesRegionStartKeys(
Configuration conf, List<TableName> tables) throws IOException {
final TreeMap<ImmutableBytesWritable, List<ImmutableBytesWritable>> ret = new TreeMap<>();
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
LOG.info("Looking up current regions for tables");
for (TableName tName : tables) {
RegionLocator table = conn.getRegionLocator(tName);
// if table not exist, use default split keys for this table
byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY };
if (admin.tableExists(tName)) {
byteKeys = table.getStartKeys();
}
List<ImmutableBytesWritable> tableStartKeys = new ArrayList<>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
tableStartKeys.add(new ImmutableBytesWritable(byteKey));
}
ret.put(new ImmutableBytesWritable(tName.toBytes()), tableStartKeys);
}
return ret;
}
}
/**
* write <tableName, start key of each region in table> into sequence file in order,
* and this format can be parsed by MultiHFilePartitioner
*/
public static void writeTableSplitKeys(Configuration conf, Path partitionsPath,
Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (map == null || map.isEmpty()) {
throw new IllegalArgumentException("No regions passed for all tables");
}
SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath),
Writer.keyClass(ImmutableBytesWritable.class),
Writer.valueClass(ImmutableBytesWritable.class));
try {
for (Map.Entry<ImmutableBytesWritable, List<ImmutableBytesWritable>> entry : map.entrySet()) {
ImmutableBytesWritable table = entry.getKey();
List<ImmutableBytesWritable> list = entry.getValue();
if (list == null) {
throw new IOException("Split keys for a table can not be null");
}
TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(list);
ImmutableBytesWritable first = sorted.first();
if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
throw new IllegalArgumentException(
"First region of table should have empty start key. Instead has: "
+ Bytes.toStringBinary(first.get()));
}
for (ImmutableBytesWritable startKey : sorted) {
writer.append(table, startKey);
}
}
} finally {
writer.close();
}
}
/**
* read partition file into map <table, splitKeys of this table>
*/
private Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> readTableSplitKeys(
Configuration conf) throws IOException {
String parts = getPartitionFile(conf);
LOG.info("Read partition info from file: " + parts);
final Path partFile = new Path(parts);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile));
// values are already sorted in file, so use list
final Map<ImmutableBytesWritable, List<ImmutableBytesWritable>> map =
new TreeMap<>();
// key and value have same type
ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
ImmutableBytesWritable value =
ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
try {
while (reader.next(key, value)) {
List<ImmutableBytesWritable> list = map.get(key);
if (list == null) {
list = new ArrayList<>();
}
list.add(value);
map.put(key, list);
key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf);
}
} finally {
IOUtils.cleanup(LOG, reader);
}
return map;
}
@Override
public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) {
byte[] row = CellUtil.cloneRow(value);
final ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row);
ImmutableBytesWritable splitId = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
//find splitKey by input rowKey
if (table_SplitKeys.containsKey(table)) {
List<ImmutableBytesWritable> list = table_SplitKeys.get(table);
int index = Collections.binarySearch(list, rowKey, new ImmutableBytesWritable.Comparator());
if (index < 0) {
index = (index + 1) * (-1) - 1;
} else if (index == list.size()) {
index -= 1;
}
if (index < 0) {
index = 0;
LOG.error("row key can not less than HConstants.EMPTY_BYTE_ARRAY ");
}
splitId = list.get(index);
}
// find the id of the reducer for the input
Integer id = partitionMap.get(new TableSplitKeyPair(table, splitId));
if (id == null) {
LOG.warn("Can not get reducer id for input record");
return -1;
}
return id.intValue() % numPartitions;
}
/**
* A class store pair<TableName, SplitKey>, has two main usage
* 1. store tableName and one of its splitKey as a pair
* 2. implement comparable, so that partitioner can find splitKey of its input cell
*/
static class TableSplitKeyPair extends Pair<ImmutableBytesWritable, ImmutableBytesWritable>
implements Comparable<TableSplitKeyPair> {
private static final long serialVersionUID = -6485999667666325594L;
public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) {
super(a, b);
}
@Override
public int compareTo(TableSplitKeyPair other) {
if (this.getFirst().equals(other.getFirst())) {
return this.getSecond().compareTo(other.getSecond());
}
return this.getFirst().compareTo(other.getFirst());
}
}
}
}
}

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@ -109,6 +108,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Simple test for {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.
@ -123,9 +125,9 @@ public class TestHFileOutputFormat2 {
private static final byte[][] FAMILIES
= { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
, Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private static final TableName TABLE_NAME =
TableName.valueOf("TestTable");
, Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
"TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
private HBaseTestingUtility util = new HBaseTestingUtility();
@ -146,6 +148,9 @@ public class TestHFileOutputFormat2 {
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
private boolean multiTableMapper = false;
private TableName[] tables = null;
@Override
protected void setup(Context context) throws IOException,
@ -155,6 +160,13 @@ public class TestHFileOutputFormat2 {
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
if (multiTableMapper) {
tables = TABLE_NAMES;
} else {
tables = new TableName[]{TABLE_NAMES[0]};
}
}
@Override
@ -170,19 +182,23 @@ public class TestHFileOutputFormat2 {
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
byte[] key;
for (int j = 0; j < tables.length; ++j) {
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
key = keyBytes;
if (multiTableMapper) {
key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
}
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(key, kv);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(new ImmutableBytesWritable(key), kv);
}
}
}
}
@ -196,31 +212,39 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable, Put> {
private int keyLength;
private static final int KEYLEN_DEFAULT=10;
private static final String KEYLEN_CONF="randomkv.key.length";
private static final int KEYLEN_DEFAULT = 10;
private static final String KEYLEN_CONF = "randomkv.key.length";
private int valLength;
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
private static final int VALLEN_DEFAULT = 10;
private static final String VALLEN_CONF = "randomkv.val.length";
private static final byte[] QUALIFIER = Bytes.toBytes("data");
private boolean multiTableMapper = false;
private TableName[] tables = null;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
if (multiTableMapper) {
tables = TABLE_NAMES;
} else {
tables = new TableName[]{TABLE_NAMES[0]};
}
}
@Override
protected void map(
NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable,Put>.Context context)
throws java.io.IOException ,InterruptedException
{
NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable, Put>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
@ -229,20 +253,25 @@ public class TestHFileOutputFormat2 {
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
byte[] key;
for (int j = 0; j < tables.length; ++j) {
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
key = keyBytes;
if (multiTableMapper) {
key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
}
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Put p = new Put(keyBytes);
p.addColumn(family, QUALIFIER, valBytes);
// set TTL to very low so that the scan does not return any value
p.setTTL(1l);
context.write(key, p);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Put p = new Put(keyBytes);
p.addColumn(family, QUALIFIER, valBytes);
// set TTL to very low so that the scan does not return any value
p.setTTL(1l);
context.write(new ImmutableBytesWritable(key), p);
}
}
}
}
@ -365,7 +394,7 @@ public class TestHFileOutputFormat2 {
HFile.Reader rd =
HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
Map<byte[],byte[]> finfo = rd.loadFileInfo();
byte[] range = finfo.get("TIMERANGE".getBytes());
byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
assertNotNull(range);
// unmarshall and check values.
@ -438,6 +467,9 @@ public class TestHFileOutputFormat2 {
Path dir =
util.getDataTestDir("WritingTagData");
try {
conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
// turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
@ -537,6 +569,7 @@ public class TestHFileOutputFormat2 {
doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
}
//@Ignore("Wahtevs")
@Test
public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
@ -544,43 +577,80 @@ public class TestHFileOutputFormat2 {
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, String tableStr) throws Exception {
boolean putSortReducer, String tableStr) throws Exception {
doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
Arrays.asList(tableStr));
}
@Test
public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
doIncrementalLoadTest(false, false, true,
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
()));
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, List<String> tableStr) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
int hostCount = 1;
int regionNum = 5;
if(shouldKeepLocality) {
if (shouldKeepLocality) {
// We should change host count higher than hdfs replica count when MiniHBaseCluster supports
// explicit hostnames parameter just like MiniDFSCluster does.
hostCount = 3;
regionNum = 20;
}
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
String[] hostnames = new String[hostCount];
for(int i = 0; i < hostCount; ++i) {
for (int i = 0; i < hostCount; ++i) {
hostnames[i] = "datanode_" + i;
}
util.startMiniCluster(1, hostCount, hostnames);
TableName tableName = TableName.valueOf(tableStr);
Table table = util.createTable(tableName, FAMILIES, splitKeys);
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
FileSystem fs = testDir.getFileSystem(conf);
try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin =
util.getConnection().getAdmin();) {
Map<String, Table> allTables = new HashMap<>(tableStr.size());
List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
boolean writeMultipleTables = tableStr.size() > 1;
for (String tableStrSingle : tableStr) {
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
TableName tableName = TableName.valueOf(tableStrSingle);
Table table = util.createTable(tableName, FAMILIES, splitKeys);
RegionLocator r = util.getConnection().getRegionLocator(tableName);
assertEquals("Should start with empty table", 0, util.countRows(table));
int numRegions = r.getStartKeys().length;
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
// Generate the bulk load files
runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer);
allTables.put(tableStrSingle, table);
tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
}
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
// Generate the bulk load files
runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
for (Table tableSingle : allTables.values()) {
// This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
}
int numTableDirs = 0;
for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
Path tablePath = testDir;
if (writeMultipleTables) {
if (allTables.containsKey(tf.getPath().getName())) {
++numTableDirs;
tablePath = tf.getPath();
}
else {
continue;
}
}
// Make sure that a directory was created for every CF
int dir = 0;
for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
for (byte[] family : FAMILIES) {
if (Bytes.toString(family).equals(f.getPath().getName())) {
++dir;
@ -588,95 +658,132 @@ public class TestHFileOutputFormat2 {
}
}
assertEquals("Column family not found in FS.", FAMILIES.length, dir);
}
if (writeMultipleTables) {
assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
}
Admin admin = util.getConnection().getAdmin();
try {
// handle the split case
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
admin.disableTable(table.getName());
Table chosenTable = allTables.values().iterator().next();
// Choose a semi-random table if multiple tables are available
LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
admin.disableTable(chosenTable.getName());
util.waitUntilNoRegionsInTransition();
util.deleteTable(table.getName());
util.deleteTable(chosenTable.getName());
byte[][] newSplitKeys = generateRandomSplitKeys(14);
table = util.createTable(tableName, FAMILIES, newSplitKeys);
Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
while (util.getConnection().getRegionLocator(tableName)
.getAllRegionLocations().size() != 15 ||
!admin.isTableAvailable(table.getName())) {
while (util.getConnection().getRegionLocator(chosenTable.getName())
.getAllRegionLocations().size() != 15 ||
!admin.isTableAvailable(table.getName())) {
Thread.sleep(200);
LOG.info("Waiting for new region assignment to happen");
}
}
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r);
// Ensure data shows up
int expectedRows = 0;
if (putSortReducer) {
// no rows should be extracted
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
} else {
expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
assertEquals(FAMILIES.length, res.rawCells().length);
Cell first = res.rawCells()[0];
for (Cell kv : res.rawCells()) {
assertTrue(CellUtil.matchingRow(first, kv));
assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
}
for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
Path tableDir = testDir;
String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
if (writeMultipleTables) {
tableDir = new Path(testDir, tableNameStr);
}
results.close();
}
String tableDigestBefore = util.checksumRows(table);
// Check region locality
HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
for (HRegion region : util.getHBaseCluster().getRegions(tableName)) {
hbd.add(region.getHDFSBlocksDistribution());
}
for (String hostname : hostnames) {
float locality = hbd.getBlockLocalityIndex(hostname);
LOG.info("locality of [" + hostname + "]: " + locality);
assertEquals(100, (int) (locality * 100));
}
Table currentTable = allTables.get(tableNameStr);
TableName currentTableName = currentTable.getName();
new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
.getRegionLocator());
// Cause regions to reopen
admin.disableTable(tableName);
while (!admin.isTableDisabled(tableName)) {
Thread.sleep(200);
LOG.info("Waiting for table to disable");
// Ensure data shows up
int expectedRows = 0;
if (putSortReducer) {
// no rows should be extracted
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(currentTable));
} else {
expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(currentTable));
Scan scan = new Scan();
ResultScanner results = currentTable.getScanner(scan);
for (Result res : results) {
assertEquals(FAMILIES.length, res.rawCells().length);
Cell first = res.rawCells()[0];
for (Cell kv : res.rawCells()) {
assertTrue(CellUtil.matchingRow(first, kv));
assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
}
}
results.close();
}
String tableDigestBefore = util.checksumRows(currentTable);
// Check region locality
HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
hbd.add(region.getHDFSBlocksDistribution());
}
for (String hostname : hostnames) {
float locality = hbd.getBlockLocalityIndex(hostname);
LOG.info("locality of [" + hostname + "]: " + locality);
assertEquals(100, (int) (locality * 100));
}
// Cause regions to reopen
admin.disableTable(currentTableName);
while (!admin.isTableDisabled(currentTableName)) {
Thread.sleep(200);
LOG.info("Waiting for table to disable");
}
admin.enableTable(currentTableName);
util.waitTableAvailable(currentTableName);
assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(currentTable));
}
admin.enableTable(tableName);
util.waitTableAvailable(tableName);
assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(table));
} finally {
for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
tableInfoSingle.getRegionLocator().close();
}
for (Entry<String, Table> singleTable : allTables.entrySet() ) {
singleTable.getValue().close();
util.deleteTable(singleTable.getValue().getName());
}
testDir.getFileSystem(conf).delete(testDir, true);
util.deleteTable(tableName);
util.shutdownMiniCluster();
}
}
private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException,
UnsupportedEncodingException, InterruptedException, ClassNotFoundException {
private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
boolean putSortReducer) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job, putSortReducer);
HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
if (tableInfo.size() > 1) {
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
int sum = 0;
for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
}
assertEquals(sum, job.getNumReduceTasks());
}
else {
RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
regionLocator);
assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
}
FileOutputFormat.setOutputPath(job, outDir);
assertFalse(util.getTestFileSystem().exists(outDir)) ;
assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
assertTrue(job.waitForCompletion(true));
}
@ -696,7 +803,10 @@ public class TestHFileOutputFormat2 {
getMockColumnFamiliesForCompression(numCfs);
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForCompression(table, familyToCompression);
HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.compressionDetails,
Arrays.asList(table.getTableDescriptor())));
// read back family specific compression setting from the configuration
Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
@ -707,14 +817,14 @@ public class TestHFileOutputFormat2 {
for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
assertEquals("Compression configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForCompression(Table table,
Map<String, Compression.Algorithm> familyToCompression) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@ -766,7 +876,9 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBloomType(table,
familyToBloomType);
HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
Arrays.asList(table.getTableDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@ -779,14 +891,14 @@ public class TestHFileOutputFormat2 {
for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
assertEquals("BloomType configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForBloomType(Table table,
Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@ -835,7 +947,10 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBlockSize(table,
familyToBlockSize);
HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
.getTableDescriptor())));
// read back family specific data block encoding settings from the
// configuration
@ -849,14 +964,14 @@ public class TestHFileOutputFormat2 {
) {
assertEquals("BlockSize configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForBlockSize(Table table,
Map<String, Integer> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@ -910,7 +1025,10 @@ public class TestHFileOutputFormat2 {
setupMockColumnFamiliesForDataBlockEncoding(table,
familyToDataBlockEncoding);
HTableDescriptor tableDescriptor = table.getTableDescriptor();
HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
HFileOutputFormat2.serializeColumnFamilyAttribute
(HFileOutputFormat2.dataBlockEncodingDetails, Arrays
.asList(tableDescriptor)));
// read back family specific data block encoding settings from the
// configuration
@ -923,14 +1041,14 @@ public class TestHFileOutputFormat2 {
for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
assertEquals("DataBlockEncoding configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
}
}
}
private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
@ -995,7 +1113,7 @@ public class TestHFileOutputFormat2 {
// Setup table descriptor
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
Mockito.doReturn(htd).when(table).getTableDescriptor();
for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
htd.addFamily(hcd);
@ -1099,15 +1217,15 @@ public class TestHFileOutputFormat2 {
util.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection();
Admin admin = conn.getAdmin();
Table table = util.createTable(TABLE_NAME, FAMILIES);
RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
final FileSystem fs = util.getDFSCluster().getFileSystem();
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath = new Path(
FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
Bytes.toString(FAMILIES[0])));
assertEquals(0, fs.listStatus(storePath).length);
@ -1117,8 +1235,8 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
testDir, false);
runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
.getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
}
@ -1132,12 +1250,12 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
admin.compact(TABLE_NAME);
admin.compact(TABLE_NAMES[0]);
try {
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@ -1152,11 +1270,11 @@ public class TestHFileOutputFormat2 {
}
// a major compaction should work though
admin.majorCompact(TABLE_NAME);
admin.majorCompact(TABLE_NAMES[0]);
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@ -1182,13 +1300,13 @@ public class TestHFileOutputFormat2 {
Admin admin = conn.getAdmin()){
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
final FileSystem fs = util.getDFSCluster().getFileSystem();
Table table = util.createTable(TABLE_NAME, FAMILIES);
Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath = new Path(
FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
Bytes.toString(FAMILIES[0])));
assertEquals(0, fs.listStatus(storePath).length);
@ -1196,7 +1314,7 @@ public class TestHFileOutputFormat2 {
Put p = new Put(Bytes.toBytes("test"));
p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
table.put(p);
admin.flush(TABLE_NAME);
admin.flush(TABLE_NAMES[0]);
assertEquals(1, util.countRows(table));
quickPoll(new Callable<Boolean>() {
@Override
@ -1209,8 +1327,9 @@ public class TestHFileOutputFormat2 {
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
true);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
.getTableDescriptor(), regionLocator)), testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
@ -1224,7 +1343,7 @@ public class TestHFileOutputFormat2 {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
admin.compact(TABLE_NAME);
admin.compact(TABLE_NAMES[0]);
try {
quickPoll(new Callable<Boolean>() {
@Override
@ -1238,7 +1357,7 @@ public class TestHFileOutputFormat2 {
}
// a major compaction should work though
admin.majorCompact(TABLE_NAME);
admin.majorCompact(TABLE_NAMES[0]);
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@ -1273,15 +1392,15 @@ public class TestHFileOutputFormat2 {
if ("newtable".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
byte[][] splitKeys = generateRandomSplitKeys(4);
try (Table table = util.createTable(tname, FAMILIES, splitKeys)) {
}
Table table = util.createTable(tname, FAMILIES, splitKeys);
} else if ("incremental".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
try(Connection c = ConnectionFactory.createConnection(conf);
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out");
runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false);
runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
.getTableDescriptor(tname), regionLocator)), outDir, false);
}
} else {
throw new RuntimeException(
@ -1294,8 +1413,10 @@ public class TestHFileOutputFormat2 {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
"ONE_SSD");
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
util.startMiniDFSCluster(3);
@ -1313,8 +1434,10 @@ public class TestHFileOutputFormat2 {
assertEquals("HOT", spB);
// alter table cf schema to change storage policies
HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
HFileOutputFormat2.configureStoragePolicy(conf, fs,
HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
HFileOutputFormat2.configureStoragePolicy(conf, fs,
HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
spA = getStoragePolicyName(fs, cf1Dir);
spB = getStoragePolicyName(fs, cf2Dir);
LOG.debug("Storage policy of cf 0: [" + spA + "].");
@ -1368,6 +1491,5 @@ public class TestHFileOutputFormat2 {
return null;
}
}

View File

@ -1,382 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test for{@link MultiTableHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
* writes hfiles.
*/
@Category(MediumTests.class)
public class TestMultiTableHFileOutputFormat {
private static final Log LOG = LogFactory.getLog(TestMultiTableHFileOutputFormat.class);
private HBaseTestingUtility util = new HBaseTestingUtility();
private static int ROWSPERSPLIT = 10;
private static final int KEYLEN_DEFAULT = 10;
private static final String KEYLEN_CONF = "randomkv.key.length";
private static final int VALLEN_DEFAULT = 10;
private static final String VALLEN_CONF = "randomkv.val.length";
private static final byte[][] TABLES =
{ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
private static final byte[][] FAMILIES =
{ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
private static final byte[] QUALIFIER = Bytes.toBytes("data");
/**
* Run small MR job. this MR job will write HFile into
* testWritingDataIntoHFiles/tableNames/columnFamilies/
*/
@Test
public void testWritingDataIntoHFiles() throws Exception {
Configuration conf = util.getConfiguration();
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
FileSystem fs = testDir.getFileSystem(conf);
LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files by setting max file size.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
try {
Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
FileOutputFormat.setOutputPath(job, testDir);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setReducerClass(Table_KeyValueSortReducer.class);
job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("\nStarting test testWritingDataIntoHFiles\n");
assertTrue(job.waitForCompletion(true));
LOG.info("\nWaiting on checking MapReduce output\n");
assertTrue(checkMROutput(fs, testDir, 0));
} finally {
testDir.getFileSystem(conf).delete(testDir, true);
util.shutdownMiniCluster();
}
}
/**
* check whether create directory and hfiles as format designed in MultiHFilePartitioner
* and also check whether the output file has same related configuration as created table
*/
@Test
public void testMultiHFilePartitioner() throws Exception {
Configuration conf = util.getConfiguration();
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner");
FileSystem fs = testDir.getFileSystem(conf);
LOG.info("testMultiHFilePartitioner dir writing to : " + testDir);
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files by setting max file size.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
// Create several tables for testing
List<TableName> tables = new ArrayList<TableName>();
// to store splitKeys for TABLE[0] for testing;
byte[][] testKeys = new byte[0][0];
for (int i = 0; i < TABLES.length; i++) {
TableName tableName = TableName.valueOf(TABLES[i]);
byte[][] splitKeys = generateRandomSplitKeys(3);
if (i == 0) {
testKeys = splitKeys;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
for (int j = 0; j < FAMILIES.length; j++) {
HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]);
//only set Tables[0] configuration, and specify compression type and DataBlockEncode
if (i == 0) {
familyDescriptor.setCompressionType(Compression.Algorithm.GZ);
familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
}
tableDescriptor.addFamily(familyDescriptor);
}
util.createTable(tableDescriptor, splitKeys, conf);
tables.add(tableName);
}
// set up for MapReduce job
try {
Job job = Job.getInstance(conf, "testMultiHFilePartitioner");
FileOutputFormat.setOutputPath(job, testDir);
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tables);
LOG.info("Starting test testWritingDataIntoHFiles");
assertTrue(job.waitForCompletion(true));
LOG.info("Waiting on checking MapReduce output");
assertTrue(checkMROutput(fs, testDir, 0));
assertTrue(checkFileConfAndSplitKeys(conf, fs, testDir, testKeys));
} finally {
for (int i = 0; i < TABLES.length; i++) {
TableName tName = TableName.valueOf(TABLES[i]);
util.deleteTable(tName);
}
fs.delete(testDir, true);
fs.close();
util.shutdownMiniCluster();
}
}
/**
* check the output hfile has same configuration as created test table
* and also check whether hfiles get split correctly
* only check TABLES[0]
*/
private boolean checkFileConfAndSplitKeys(Configuration conf, FileSystem fs, Path testDir, byte[][] splitKeys) throws IOException {
FileStatus[] fStats = fs.listStatus(testDir);
for (FileStatus stats : fStats) {
if (stats.getPath().getName().equals(new String(TABLES[0]))) {
FileStatus[] cfStats = fs.listStatus(stats.getPath());
for (FileStatus cfstat : cfStats) {
FileStatus[] hfStats = fs.listStatus(cfstat.getPath());
List<byte[]> firsttKeys = new ArrayList<byte[]>();
List<byte[]> lastKeys = new ArrayList<byte[]>();
for (FileStatus hfstat : hfStats) {
if (HFile.isHFileFormat(fs, hfstat)) {
HFile.Reader hfr =
HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), true, conf);
if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || !hfr
.getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false;
firsttKeys.add(hfr.getFirstRowKey());
lastKeys.add(hfr.getLastRowKey());
}
}
if (checkFileSplit(splitKeys, firsttKeys, lastKeys) == false) {
return false;
}
}
}
}
return true;
}
/**
* Check whether the Hfile has been split by region boundaries
* @param splitKeys split keys for that table
* @param firstKeys first rowKey for hfiles
* @param lastKeys last rowKey for hfiles
*/
private boolean checkFileSplit(byte[][] splitKeys, List<byte[]> firstKeys, List<byte[]> lastKeys) {
Collections.sort(firstKeys, Bytes.BYTES_RAWCOMPARATOR);
Collections.sort(lastKeys, Bytes.BYTES_RAWCOMPARATOR);
Arrays.sort(splitKeys, Bytes.BYTES_RAWCOMPARATOR);
int is = 0, il = 0;
for (byte[] key : lastKeys) {
while (is < splitKeys.length && Bytes.compareTo(key, splitKeys[is]) >= 0) is++;
if (is == splitKeys.length) {
break;
}
if (is > 0) {
if (Bytes.compareTo(firstKeys.get(il), splitKeys[is - 1]) < 0) return false;
}
il++;
}
if (is == splitKeys.length) {
return il == lastKeys.size() - 1;
}
return true;
}
/**
* MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
* created directory is correct or not A recursion method, the testDir had better be small size
*/
private boolean checkMROutput(FileSystem fs, Path testDir, int level) throws IOException {
if (level >= 3) {
return HFile.isHFileFormat(fs, testDir);
}
FileStatus[] fStats = fs.listStatus(testDir);
if (fStats == null || fStats.length <= 0) {
LOG.info("Created directory format is not correct");
return false;
}
for (FileStatus stats : fStats) {
// skip the _SUCCESS file created by MapReduce
if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
continue;
if (level < 2 && !stats.isDirectory()) {
LOG.info("Created directory format is not correct");
return false;
}
boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
if (flag == false) return false;
}
return true;
}
private byte[][] generateRandomSplitKeys(int numKeys) {
Random random = new Random();
byte[][] ret = new byte[numKeys][];
for (int i = 0; i < numKeys; i++) {
ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT);
}
return ret;
}
/**
* Simple mapper that makes <TableName, KeyValue> output. With no input data
*/
static class Random_TableKV_GeneratingMapper
extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
private int keyLength;
private int valLength;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
@Override
protected void map(NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
ArrayList<ImmutableBytesWritable> tables = new ArrayList<>();
for (int i = 0; i < TABLES.length; i++) {
tables.add(new ImmutableBytesWritable(TABLES[i]));
}
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
for (ImmutableBytesWritable table : tables) {
for (byte[] family : FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(table, kv);
}
}
}
}
}
/**
* Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
* <TableName, KeyValue>, with KeyValues are ordered
*/
static class Table_KeyValueSortReducer
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
for (KeyValue kv : kvs) {
try {
map.add(kv.clone());
} catch (CloneNotSupportedException e) {
throw new java.io.IOException(e);
}
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv : map) {
context.write(table, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}
}