HBASE-15707 ImportTSV bulk output does not support tags with hfile.format.version=3 (huaxiang sun)

This commit is contained in:
tedyu 2016-04-26 11:21:29 -07:00
parent 53d7316075
commit ebb5d421f9
3 changed files with 69 additions and 1 deletions

View File

@ -301,6 +301,11 @@ public class HFileOutputFormat2
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(blockSize);
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true);
}
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build();

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -41,7 +42,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@ -56,6 +60,9 @@ import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -73,6 +80,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
@ -344,6 +352,56 @@ public class TestHFileOutputFormat2 {
assertTrue(files.length > 0);
}
/**
* Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
* hfile.
*/
@Test
public void test_WritingTagData()
throws Exception {
Configuration conf = new Configuration(this.util.getConfiguration());
final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
Path dir =
util.getDataTestDir("WritingTagData");
try {
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
HFileOutputFormat2 hof = new HFileOutputFormat2();
writer = hof.getRecordWriter(context);
final byte [] b = Bytes.toBytes("b");
List< Tag > tags = new ArrayList<Tag>();
tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
writer.write(new ImmutableBytesWritable(), kv);
writer.close(context);
writer = null;
FileSystem fs = dir.getFileSystem(conf);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
while(iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next();
HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf),
conf);
HFileScanner scanner = reader.getScanner(false, false, false);
scanner.seekTo();
Cell cell = scanner.getCell();
List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
assertTrue(tagsFromCell.size() > 0);
for (Tag tag : tagsFromCell) {
assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
}
}
} finally {
if (writer != null && context != null) writer.close(context);
dir.getFileSystem(conf).delete(dir, true);
}
}
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testJobConfiguration() throws Exception {
Configuration conf = new Configuration(this.util.getConfiguration());

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
@ -886,6 +886,11 @@ class HBaseContext(@transient sc: SparkContext,
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true)
}
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()