HBASE-3727 MultiHFileOutputFormat (yi liang)
This commit is contained in:
parent
84b7010a30
commit
60847a2d76
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -131,224 +132,254 @@ public class HFileOutputFormat2
|
||||||
}
|
}
|
||||||
|
|
||||||
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
|
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
|
||||||
createRecordWriter(final TaskAttemptContext context)
|
createRecordWriter(final TaskAttemptContext context) throws IOException {
|
||||||
throws IOException {
|
return new HFileRecordWriter<V>(context, null);
|
||||||
|
}
|
||||||
|
|
||||||
// Get the path of the temporary output file
|
protected static class HFileRecordWriter<V extends Cell>
|
||||||
final Path outputPath = FileOutputFormat.getOutputPath(context);
|
extends RecordWriter<ImmutableBytesWritable, V> {
|
||||||
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
|
private final TaskAttemptContext context;
|
||||||
final Configuration conf = context.getConfiguration();
|
private final Path outputPath;
|
||||||
final FileSystem fs = outputdir.getFileSystem(conf);
|
private final Path outputDir;
|
||||||
// These configs. are from hbase-*.xml
|
private final Configuration conf;
|
||||||
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
private final FileSystem fs;
|
||||||
HConstants.DEFAULT_MAX_FILE_SIZE);
|
|
||||||
// Invented config. Add to hbase-*.xml if other than default compression.
|
|
||||||
final String defaultCompressionStr = conf.get("hfile.compression",
|
|
||||||
Compression.Algorithm.NONE.getName());
|
|
||||||
final Algorithm defaultCompression = HFileWriterImpl
|
|
||||||
.compressionByName(defaultCompressionStr);
|
|
||||||
final boolean compactionExclude = conf.getBoolean(
|
|
||||||
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
|
|
||||||
|
|
||||||
// create a map from column family to the compression algorithm
|
private final long maxsize;
|
||||||
final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
|
|
||||||
final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
|
|
||||||
final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
|
|
||||||
|
|
||||||
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
|
private final Algorithm defaultCompression;
|
||||||
final Map<byte[], DataBlockEncoding> datablockEncodingMap
|
private final boolean compactionExclude;
|
||||||
= createFamilyDataBlockEncodingMap(conf);
|
|
||||||
final DataBlockEncoding overriddenEncoding;
|
private final Map<byte[], Algorithm> compressionMap;
|
||||||
if (dataBlockEncodingStr != null) {
|
private final Map<byte[], BloomType> bloomTypeMap;
|
||||||
overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
|
private final Map<byte[], Integer> blockSizeMap;
|
||||||
} else {
|
|
||||||
overriddenEncoding = null;
|
private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
|
||||||
|
private final DataBlockEncoding overriddenEncoding;
|
||||||
|
|
||||||
|
private final Map<byte[], WriterLength> writers;
|
||||||
|
private byte[] previousRow;
|
||||||
|
private final byte[] now;
|
||||||
|
private boolean rollRequested;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapredue job will create a temp path for outputting results. If out != null, it means that
|
||||||
|
* the caller has set the temp working dir; If out == null, it means we need to set it here.
|
||||||
|
* Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us
|
||||||
|
* temp working dir at the table level and HFileOutputFormat2 has to set it here within this
|
||||||
|
* constructor.
|
||||||
|
*/
|
||||||
|
public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
|
||||||
|
throws IOException {
|
||||||
|
// Get the path of the temporary output file
|
||||||
|
context = taContext;
|
||||||
|
|
||||||
|
if (out == null) {
|
||||||
|
outputPath = FileOutputFormat.getOutputPath(context);
|
||||||
|
outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
|
||||||
|
} else {
|
||||||
|
outputPath = out;
|
||||||
|
outputDir = outputPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = context.getConfiguration();
|
||||||
|
fs = outputDir.getFileSystem(conf);
|
||||||
|
|
||||||
|
// These configs. are from hbase-*.xml
|
||||||
|
maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
|
||||||
|
|
||||||
|
// Invented config. Add to hbase-*.xml if other than default compression.
|
||||||
|
String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
|
||||||
|
defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
|
||||||
|
compactionExclude =
|
||||||
|
conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
|
||||||
|
|
||||||
|
// create a map from column family to the compression algorithm
|
||||||
|
compressionMap = createFamilyCompressionMap(conf);
|
||||||
|
bloomTypeMap = createFamilyBloomTypeMap(conf);
|
||||||
|
blockSizeMap = createFamilyBlockSizeMap(conf);
|
||||||
|
|
||||||
|
// Config for data block encoding
|
||||||
|
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
|
||||||
|
datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
|
||||||
|
if (dataBlockEncodingStr != null) {
|
||||||
|
overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
|
||||||
|
} else {
|
||||||
|
overriddenEncoding = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
|
||||||
|
previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
||||||
|
now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
|
||||||
|
rollRequested = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RecordWriter<ImmutableBytesWritable, V>() {
|
@Override
|
||||||
// Map of families to writers and how much has been output on the writer.
|
public void write(ImmutableBytesWritable row, V cell) throws IOException {
|
||||||
private final Map<byte [], WriterLength> writers =
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||||
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
|
|
||||||
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
|
||||||
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
|
|
||||||
private boolean rollRequested = false;
|
|
||||||
|
|
||||||
@Override
|
// null input == user explicitly wants to flush
|
||||||
public void write(ImmutableBytesWritable row, V cell)
|
if (row == null && kv == null) {
|
||||||
throws IOException {
|
rollWriters();
|
||||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// null input == user explicitly wants to flush
|
byte[] rowKey = CellUtil.cloneRow(kv);
|
||||||
if (row == null && kv == null) {
|
long length = kv.getLength();
|
||||||
rollWriters();
|
byte[] family = CellUtil.cloneFamily(kv);
|
||||||
return;
|
WriterLength wl = this.writers.get(family);
|
||||||
}
|
|
||||||
|
|
||||||
byte [] rowKey = CellUtil.cloneRow(kv);
|
// If this is a new column family, verify that the directory exists
|
||||||
long length = kv.getLength();
|
if (wl == null) {
|
||||||
byte [] family = CellUtil.cloneFamily(kv);
|
fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
|
||||||
WriterLength wl = this.writers.get(family);
|
}
|
||||||
|
|
||||||
// If this is a new column family, verify that the directory exists
|
// If any of the HFiles for the column families has reached
|
||||||
if (wl == null) {
|
// maxsize, we need to roll all the writers
|
||||||
fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
|
if (wl != null && wl.written + length >= maxsize) {
|
||||||
}
|
this.rollRequested = true;
|
||||||
|
}
|
||||||
|
|
||||||
// If any of the HFiles for the column families has reached
|
// This can only happen once a row is finished though
|
||||||
// maxsize, we need to roll all the writers
|
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
|
||||||
if (wl != null && wl.written + length >= maxsize) {
|
rollWriters();
|
||||||
this.rollRequested = true;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// This can only happen once a row is finished though
|
// create a new WAL writer, if necessary
|
||||||
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
|
if (wl == null || wl.writer == null) {
|
||||||
rollWriters();
|
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
||||||
}
|
HRegionLocation loc = null;
|
||||||
|
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
|
||||||
// create a new WAL writer, if necessary
|
if (tableName != null) {
|
||||||
if (wl == null || wl.writer == null) {
|
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||||
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
|
||||||
HRegionLocation loc = null;
|
loc = locator.getRegionLocation(rowKey);
|
||||||
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
|
} catch (Throwable e) {
|
||||||
if (tableName != null) {
|
LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
|
||||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
e);
|
||||||
RegionLocator locator =
|
loc = null;
|
||||||
connection.getRegionLocator(TableName.valueOf(tableName))) {
|
|
||||||
loc = locator.getRegionLocation(rowKey);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("there's something wrong when locating rowkey: " +
|
|
||||||
Bytes.toString(rowKey), e);
|
|
||||||
loc = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (null == loc) {
|
if (null == loc) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(
|
||||||
|
"failed to get region location, so use default writer: " + Bytes.toString(rowKey));
|
||||||
|
}
|
||||||
|
wl = getNewWriter(family, conf, null);
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
|
||||||
|
}
|
||||||
|
InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
|
||||||
|
if (initialIsa.isUnresolved()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("failed to get region location, so use default writer: " +
|
LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
|
||||||
Bytes.toString(rowKey));
|
+ loc.getPort() + ", so use default writer");
|
||||||
}
|
}
|
||||||
wl = getNewWriter(family, conf, null);
|
wl = getNewWriter(family, conf, null);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
|
LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
|
||||||
}
|
|
||||||
InetSocketAddress initialIsa =
|
|
||||||
new InetSocketAddress(loc.getHostname(), loc.getPort());
|
|
||||||
if (initialIsa.isUnresolved()) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
|
|
||||||
+ loc.getPort() + ", so use default writer");
|
|
||||||
}
|
|
||||||
wl = getNewWriter(family, conf, null);
|
|
||||||
} else {
|
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
|
|
||||||
}
|
|
||||||
wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
|
|
||||||
}
|
}
|
||||||
|
wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
wl = getNewWriter(family, conf, null);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// we now have the proper WAL writer. full steam ahead
|
|
||||||
kv.updateLatestStamp(this.now);
|
|
||||||
wl.writer.append(kv);
|
|
||||||
wl.written += length;
|
|
||||||
|
|
||||||
// Copy the row so we know when a row transition.
|
|
||||||
this.previousRow = rowKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void rollWriters() throws IOException {
|
|
||||||
for (WriterLength wl : this.writers.values()) {
|
|
||||||
if (wl.writer != null) {
|
|
||||||
LOG.info("Writer=" + wl.writer.getPath() +
|
|
||||||
((wl.written == 0)? "": ", wrote=" + wl.written));
|
|
||||||
close(wl.writer);
|
|
||||||
}
|
|
||||||
wl.writer = null;
|
|
||||||
wl.written = 0;
|
|
||||||
}
|
|
||||||
this.rollRequested = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create a new StoreFile.Writer.
|
|
||||||
* @param family
|
|
||||||
* @return A WriterLength, containing a new StoreFile.Writer.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
|
|
||||||
justification="Not important")
|
|
||||||
private WriterLength getNewWriter(byte[] family, Configuration conf,
|
|
||||||
InetSocketAddress[] favoredNodes) throws IOException {
|
|
||||||
WriterLength wl = new WriterLength();
|
|
||||||
Path familydir = new Path(outputdir, Bytes.toString(family));
|
|
||||||
Algorithm compression = compressionMap.get(family);
|
|
||||||
compression = compression == null ? defaultCompression : compression;
|
|
||||||
BloomType bloomType = bloomTypeMap.get(family);
|
|
||||||
bloomType = bloomType == null ? BloomType.NONE : bloomType;
|
|
||||||
Integer blockSize = blockSizeMap.get(family);
|
|
||||||
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
|
|
||||||
DataBlockEncoding encoding = overriddenEncoding;
|
|
||||||
encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
|
|
||||||
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
|
|
||||||
Configuration tempConf = new Configuration(conf);
|
|
||||||
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
|
||||||
HFileContextBuilder contextBuilder = new HFileContextBuilder()
|
|
||||||
.withCompression(compression)
|
|
||||||
.withChecksumType(HStore.getChecksumType(conf))
|
|
||||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
|
||||||
.withBlockSize(blockSize);
|
|
||||||
|
|
||||||
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
|
|
||||||
contextBuilder.withIncludesTags(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
contextBuilder.withDataBlockEncoding(encoding);
|
|
||||||
HFileContext hFileContext = contextBuilder.build();
|
|
||||||
|
|
||||||
if (null == favoredNodes) {
|
|
||||||
wl.writer =
|
|
||||||
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
|
|
||||||
.withOutputDir(familydir).withBloomType(bloomType)
|
|
||||||
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
|
|
||||||
} else {
|
} else {
|
||||||
wl.writer =
|
wl = getNewWriter(family, conf, null);
|
||||||
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
|
|
||||||
.withOutputDir(familydir).withBloomType(bloomType)
|
|
||||||
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
|
|
||||||
.withFavoredNodes(favoredNodes).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.writers.put(family, wl);
|
|
||||||
return wl;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void close(final StoreFileWriter w) throws IOException {
|
|
||||||
if (w != null) {
|
|
||||||
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
|
||||||
Bytes.toBytes(System.currentTimeMillis()));
|
|
||||||
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
|
|
||||||
Bytes.toBytes(context.getTaskAttemptID().toString()));
|
|
||||||
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
|
|
||||||
Bytes.toBytes(true));
|
|
||||||
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
|
|
||||||
Bytes.toBytes(compactionExclude));
|
|
||||||
w.appendTrackedTimestampsToMetadata();
|
|
||||||
w.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
// we now have the proper WAL writer. full steam ahead
|
||||||
public void close(TaskAttemptContext c)
|
kv.updateLatestStamp(this.now);
|
||||||
throws IOException, InterruptedException {
|
wl.writer.append(kv);
|
||||||
for (WriterLength wl: this.writers.values()) {
|
wl.written += length;
|
||||||
|
|
||||||
|
// Copy the row so we know when a row transition.
|
||||||
|
this.previousRow = rowKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void rollWriters() throws IOException {
|
||||||
|
for (WriterLength wl : this.writers.values()) {
|
||||||
|
if (wl.writer != null) {
|
||||||
|
LOG.info(
|
||||||
|
"Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
|
||||||
close(wl.writer);
|
close(wl.writer);
|
||||||
}
|
}
|
||||||
|
wl.writer = null;
|
||||||
|
wl.written = 0;
|
||||||
}
|
}
|
||||||
};
|
this.rollRequested = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Create a new StoreFile.Writer.
|
||||||
|
* @param family
|
||||||
|
* @return A WriterLength, containing a new StoreFile.Writer.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
|
||||||
|
justification = "Not important")
|
||||||
|
private WriterLength getNewWriter(byte[] family, Configuration conf,
|
||||||
|
InetSocketAddress[] favoredNodes) throws IOException {
|
||||||
|
WriterLength wl = new WriterLength();
|
||||||
|
Path familyDir = new Path(outputDir, Bytes.toString(family));
|
||||||
|
Algorithm compression = compressionMap.get(family);
|
||||||
|
compression = compression == null ? defaultCompression : compression;
|
||||||
|
BloomType bloomType = bloomTypeMap.get(family);
|
||||||
|
bloomType = bloomType == null ? BloomType.NONE : bloomType;
|
||||||
|
Integer blockSize = blockSizeMap.get(family);
|
||||||
|
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
|
||||||
|
DataBlockEncoding encoding = overriddenEncoding;
|
||||||
|
encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
|
||||||
|
encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
|
||||||
|
Configuration tempConf = new Configuration(conf);
|
||||||
|
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
||||||
|
HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
|
||||||
|
.withChecksumType(HStore.getChecksumType(conf))
|
||||||
|
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
|
||||||
|
|
||||||
|
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
|
||||||
|
contextBuilder.withIncludesTags(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
contextBuilder.withDataBlockEncoding(encoding);
|
||||||
|
HFileContext hFileContext = contextBuilder.build();
|
||||||
|
|
||||||
|
if (null == favoredNodes) {
|
||||||
|
wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
|
||||||
|
.withOutputDir(familyDir).withBloomType(bloomType)
|
||||||
|
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
|
||||||
|
} else {
|
||||||
|
wl.writer =
|
||||||
|
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
|
||||||
|
.withOutputDir(familyDir).withBloomType(bloomType)
|
||||||
|
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
|
||||||
|
.withFavoredNodes(favoredNodes).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.writers.put(family, wl);
|
||||||
|
return wl;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void close(final StoreFileWriter w) throws IOException {
|
||||||
|
if (w != null) {
|
||||||
|
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
|
||||||
|
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
|
||||||
|
Bytes.toBytes(context.getTaskAttemptID().toString()));
|
||||||
|
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
|
||||||
|
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
|
||||||
|
Bytes.toBytes(compactionExclude));
|
||||||
|
w.appendTrackedTimestampsToMetadata();
|
||||||
|
w.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
|
||||||
|
for (WriterLength wl : this.writers.values()) {
|
||||||
|
close(wl.writer);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -503,7 +534,7 @@ public class HFileOutputFormat2
|
||||||
TableMapReduceUtil.initCredentials(job);
|
TableMapReduceUtil.initCredentials(job);
|
||||||
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
|
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
|
public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
|
||||||
IOException {
|
IOException {
|
||||||
Configuration conf = job.getConfiguration();
|
Configuration conf = job.getConfiguration();
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
/**
|
||||||
|
* Create 3 level tree directory, first level is using table name as parent directory and then use
|
||||||
|
* family name as child directory, and all related HFiles for one family are under child directory
|
||||||
|
* -tableName1
|
||||||
|
* -columnFamilyName1
|
||||||
|
* -columnFamilyName2
|
||||||
|
* -HFiles
|
||||||
|
* -tableName2
|
||||||
|
* -columnFamilyName1
|
||||||
|
* -HFiles
|
||||||
|
* -columnFamilyName2
|
||||||
|
* <p>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
@VisibleForTesting
|
||||||
|
public class MultiHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordWriter<ImmutableBytesWritable, Cell>
|
||||||
|
getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
|
||||||
|
return createMultiHFileRecordWriter(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
|
||||||
|
createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
|
||||||
|
|
||||||
|
// Get the path of the output directory
|
||||||
|
final Path outputPath = FileOutputFormat.getOutputPath(context);
|
||||||
|
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
|
||||||
|
final Configuration conf = context.getConfiguration();
|
||||||
|
final FileSystem fs = outputDir.getFileSystem(conf);
|
||||||
|
|
||||||
|
// Map of tables to writers
|
||||||
|
final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters =
|
||||||
|
new HashMap<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>>();
|
||||||
|
|
||||||
|
return new RecordWriter<ImmutableBytesWritable, V>() {
|
||||||
|
@Override
|
||||||
|
public void write(ImmutableBytesWritable tableName, V cell)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
|
||||||
|
// if there is new table, verify that table directory exists
|
||||||
|
if (tableWriter == null) {
|
||||||
|
// using table name as directory name
|
||||||
|
final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
|
||||||
|
fs.mkdirs(tableOutputDir);
|
||||||
|
LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
|
||||||
|
+ tableOutputDir.toString());
|
||||||
|
|
||||||
|
// Create writer for one specific table
|
||||||
|
tableWriter = new HFileOutputFormat2.HFileRecordWriter<V>(context, tableOutputDir);
|
||||||
|
// Put table into map
|
||||||
|
tableWriters.put(tableName, tableWriter);
|
||||||
|
}
|
||||||
|
// Write <Row, Cell> into tableWriter
|
||||||
|
// in the original code, it does not use Row
|
||||||
|
tableWriter.write(null, cell);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(TaskAttemptContext c) throws IOException, InterruptedException {
|
||||||
|
for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
|
||||||
|
writer.close(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,224 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.*;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.mapred.FileOutputCommitter;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
|
||||||
|
* writes hfiles.
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestMultiHFileOutputFormat {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
|
||||||
|
|
||||||
|
private HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static int ROWSPERSPLIT = 10;
|
||||||
|
|
||||||
|
private static final int KEYLEN_DEFAULT = 10;
|
||||||
|
private static final String KEYLEN_CONF = "randomkv.key.length";
|
||||||
|
|
||||||
|
private static final int VALLEN_DEFAULT = 10;
|
||||||
|
private static final String VALLEN_CONF = "randomkv.val.length";
|
||||||
|
|
||||||
|
private static final byte[][] TABLES =
|
||||||
|
{ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
|
||||||
|
Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
|
||||||
|
|
||||||
|
private static final byte[][] FAMILIES =
|
||||||
|
{ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
|
||||||
|
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
|
||||||
|
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("data");
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run small MR job. this MR job will write HFile into
|
||||||
|
* testWritingDataIntoHFiles/tableNames/columFamilies/
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWritingDataIntoHFiles() throws Exception {
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
util.startMiniCluster();
|
||||||
|
Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
|
||||||
|
FileSystem fs = testDir.getFileSystem(conf);
|
||||||
|
LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
|
||||||
|
|
||||||
|
// Set down this value or we OOME in eclipse.
|
||||||
|
conf.setInt("mapreduce.task.io.sort.mb", 20);
|
||||||
|
// Write a few files by setting max file size.
|
||||||
|
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
|
||||||
|
|
||||||
|
FileOutputFormat.setOutputPath(job, testDir);
|
||||||
|
|
||||||
|
job.setInputFormatClass(NMapInputFormat.class);
|
||||||
|
job.setMapperClass(Random_TableKV_GeneratingMapper.class);
|
||||||
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
|
job.setMapOutputValueClass(KeyValue.class);
|
||||||
|
job.setReducerClass(Table_KeyValueSortReducer.class);
|
||||||
|
job.setOutputFormatClass(MultiHFileOutputFormat.class);
|
||||||
|
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
|
||||||
|
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||||
|
KeyValueSerialization.class.getName());
|
||||||
|
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
TableMapReduceUtil.initCredentials(job);
|
||||||
|
LOG.info("\nStarting test testWritingDataIntoHFiles\n");
|
||||||
|
assertTrue(job.waitForCompletion(true));
|
||||||
|
LOG.info("\nWaiting on checking MapReduce output\n");
|
||||||
|
assertTrue(checkMROutput(fs, testDir, 0));
|
||||||
|
} finally {
|
||||||
|
testDir.getFileSystem(conf).delete(testDir, true);
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
|
||||||
|
* created directory is correct or not A recursion method, the testDir had better be small size
|
||||||
|
*/
|
||||||
|
private boolean checkMROutput(FileSystem fs, Path testDir, int level)
|
||||||
|
throws FileNotFoundException, IOException {
|
||||||
|
if (level >= 3) {
|
||||||
|
return HFile.isHFileFormat(fs, testDir);
|
||||||
|
}
|
||||||
|
FileStatus[] fStats = fs.listStatus(testDir);
|
||||||
|
if (fStats == null || fStats.length <= 0) {
|
||||||
|
LOG.info("Created directory format is not correct");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (FileStatus stats : fStats) {
|
||||||
|
// skip the _SUCCESS file created by MapReduce
|
||||||
|
if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
|
||||||
|
continue;
|
||||||
|
if (level < 2 && !stats.isDirectory()) {
|
||||||
|
LOG.info("Created directory format is not correct");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
|
||||||
|
if (flag == false) return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple mapper that makes <TableName, KeyValue> output. With no input data
|
||||||
|
*/
|
||||||
|
static class Random_TableKV_GeneratingMapper
|
||||||
|
extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
|
||||||
|
|
||||||
|
private int keyLength;
|
||||||
|
private int valLength;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context) throws IOException, InterruptedException {
|
||||||
|
super.setup(context);
|
||||||
|
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
|
||||||
|
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void map(NullWritable n1, NullWritable n2,
|
||||||
|
Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
|
||||||
|
throws java.io.IOException, InterruptedException {
|
||||||
|
|
||||||
|
byte keyBytes[] = new byte[keyLength];
|
||||||
|
byte valBytes[] = new byte[valLength];
|
||||||
|
|
||||||
|
ArrayList<ImmutableBytesWritable> tables = new ArrayList<ImmutableBytesWritable>();
|
||||||
|
for (int i = 0; i < TABLES.length; i++) {
|
||||||
|
tables.add(new ImmutableBytesWritable(TABLES[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
int taskId = context.getTaskAttemptID().getTaskID().getId();
|
||||||
|
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
|
||||||
|
Random random = new Random();
|
||||||
|
|
||||||
|
for (int i = 0; i < ROWSPERSPLIT; i++) {
|
||||||
|
random.nextBytes(keyBytes);
|
||||||
|
// Ensure that unique tasks generate unique keys
|
||||||
|
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
|
||||||
|
random.nextBytes(valBytes);
|
||||||
|
|
||||||
|
for (ImmutableBytesWritable table : tables) {
|
||||||
|
for (byte[] family : FAMILIES) {
|
||||||
|
Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
|
||||||
|
context.write(table, kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
|
||||||
|
* <TableName, KeyValue>, with KeyValues are ordered
|
||||||
|
*/
|
||||||
|
|
||||||
|
static class Table_KeyValueSortReducer
|
||||||
|
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||||
|
protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
|
||||||
|
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
|
||||||
|
throws java.io.IOException, InterruptedException {
|
||||||
|
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
try {
|
||||||
|
map.add(kv.clone());
|
||||||
|
} catch (CloneNotSupportedException e) {
|
||||||
|
throw new java.io.IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
context.setStatus("Read " + map.getClass());
|
||||||
|
int index = 0;
|
||||||
|
for (KeyValue kv : map) {
|
||||||
|
context.write(table, kv);
|
||||||
|
if (++index % 100 == 0) context.setStatus("Wrote " + index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue