HBASE-15035 bulkloading hfiles with tags that require splits do not preserve tags

This commit is contained in:
Jonathan M Hsieh 2015-12-25 09:51:34 -08:00
parent dfada43e90
commit 3de0b0417c
6 changed files with 164 additions and 11 deletions

View File

@ -37,7 +37,7 @@ public class HFileContextBuilder {
/** Whether mvcc is to be included in the Read/Write **/ /** Whether mvcc is to be included in the Read/Write **/
private boolean includesMvcc = true; private boolean includesMvcc = true;
/** Whether tags are to be included in the Read/Write **/ /** Whether tags are to be included in the Read/Write **/
private boolean includesTags; private boolean includesTags = false;
/** Compression algorithm used **/ /** Compression algorithm used **/
private Algorithm compression = Algorithm.NONE; private Algorithm compression = Algorithm.NONE;
/** Whether tags to be compressed or not **/ /** Whether tags to be compressed or not **/

View File

@ -903,6 +903,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(blocksize) .withBlockSize(blocksize)
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
.withIncludesTags(true)
.build(); .build();
halfWriter = new StoreFile.WriterBuilder(conf, cacheConf, halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
fs) fs)

View File

@ -33,11 +33,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
@ -50,8 +52,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/** /**
* Test cases for the "load" half of the HFileOutputFormat bulk load * Test cases for the "load" half of the HFileOutputFormat bulk load
@ -60,6 +64,9 @@ import org.junit.experimental.categories.Category;
*/ */
@Category({MapReduceTests.class, LargeTests.class}) @Category({MapReduceTests.class, LargeTests.class})
public class TestLoadIncrementalHFiles { public class TestLoadIncrementalHFiles {
@Rule
public TestName tn = new TestName();
private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
private static final byte[] FAMILY = Bytes.toBytes("myfam"); private static final byte[] FAMILY = Bytes.toBytes("myfam");
private static final String NAMESPACE = "bulkNS"; private static final String NAMESPACE = "bulkNS";
@ -80,6 +87,9 @@ public class TestLoadIncrementalHFiles {
util.getConfiguration().setInt( util.getConfiguration().setInt(
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY); MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.startMiniCluster(); util.startMiniCluster();
setupNamespace(); setupNamespace();
@ -224,6 +234,14 @@ public class TestLoadIncrementalHFiles {
); );
} }
private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
familyDesc.setBloomFilterType(bloomType);
htd.addFamily(familyDesc);
return htd;
}
private void runTest(String testName, BloomType bloomType, private void runTest(String testName, BloomType bloomType,
byte[][][] hfileRanges) throws Exception { byte[][][] hfileRanges) throws Exception {
runTest(testName, bloomType, null, hfileRanges); runTest(testName, bloomType, null, hfileRanges);
@ -245,10 +263,7 @@ public class TestLoadIncrementalHFiles {
private void runTest(String testName, TableName tableName, BloomType bloomType, private void runTest(String testName, TableName tableName, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
HTableDescriptor htd = new HTableDescriptor(tableName); HTableDescriptor htd = buildHTD(tableName, bloomType);
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
familyDesc.setBloomFilterType(bloomType);
htd.addFamily(familyDesc);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges); runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
} }
@ -297,6 +312,51 @@ public class TestLoadIncrementalHFiles {
util.deleteTable(tableName); util.deleteTable(tableName);
} }
/**
* Test that tags survive through a bulk load that needs to split hfiles.
*
* This test depends on the "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client
* can get tags in the responses.
*/
@Test(timeout = 60000)
public void testTagsSurviveBulkLoadSplit() throws Exception {
Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
// table has these split points
byte [][] tableSplitKeys = new byte[][] {
Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
};
// creating an hfile that has values that span the split points.
byte[] from = Bytes.toBytes("ddd");
byte[] to = Bytes.toBytes("ooo");
HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
new Path(familyDir, tn.getMethodName()+"_hfile"),
FAMILY, QUALIFIER, from, to, 1000);
int expectedRows = 1000;
TableName tableName = TableName.valueOf(tn.getMethodName());
HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
util.getAdmin().createTable(htd, tableSplitKeys);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
String [] args= {dir.toString(), tableName.toString()};
loader.run(args);
Table table = util.getConnection().getTable(tableName);
try {
assertEquals(expectedRows, util.countRows(table));
HFileTestUtil.verifyTags(table);
} finally {
table.close();
}
util.deleteTable(tableName);
}
/** /**
* Test loading into a column family that does not exist. * Test loading into a column family that does not exist.
*/ */

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -33,6 +35,10 @@ public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrem
MAX_FILES_PER_REGION_PER_FAMILY); MAX_FILES_PER_REGION_PER_FAMILY);
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.startMiniCluster(); util.startMiniCluster();
setupNamespace(); setupNamespace();
} }

View File

@ -19,6 +19,8 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
@ -52,6 +54,9 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
util.getConfiguration().setInt( util.getConfiguration().setInt(
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY); MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.startMiniCluster(); util.startMiniCluster();

View File

@ -21,15 +21,26 @@ package org.apache.hadoop.hbase.util;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import java.io.IOException; import java.io.IOException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
/** /**
* Utility class for HFile-related testing. * Utility class for HFile-related testing.
*/ */
@ -37,15 +48,45 @@ public class HFileTestUtil {
/** /**
* Create an HFile with the given number of rows between a given * Create an HFile with the given number of rows between a given
* start key and end key. * start key and end key @ family:qualifier. The value will be the key value.
* This file will not have tags.
*/ */
public static void createHFile( public static void createHFile(
Configuration configuration, Configuration configuration,
FileSystem fs, Path path, FileSystem fs, Path path,
byte[] family, byte[] qualifier, byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException byte[] startKey, byte[] endKey, int numRows) throws IOException {
{ createHFile(configuration, fs, path, family, qualifier, startKey, endKey,
HFileContext meta = new HFileContextBuilder().build(); numRows, false);
}
/**
* Create an HFile with the given number of rows between a given
* start key and end key @ family:qualifier. The value will be the key value.
* This cells will also have a tag whose value is the key.
*/
public static void createHFileWithTags(
Configuration configuration,
FileSystem fs, Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException {
createHFile(configuration, fs, path, family, qualifier, startKey, endKey, numRows, true);
}
/**
* Create an HFile with the given number of rows between a given
* start key and end key @ family:qualifier.
* If withTag is true, we add the rowKey as the tag value for
* tagtype MOB_TABLE_NAME_TAG_TYPE
*/
public static void createHFile(
Configuration configuration,
FileSystem fs, Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows, boolean withTag) throws IOException {
HFileContext meta = new HFileContextBuilder()
.withIncludesTags(withTag)
.build();
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration)) HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
.withPath(fs, path) .withPath(fs, path)
.withFileContext(meta) .withFileContext(meta)
@ -55,6 +96,20 @@ public class HFileTestUtil {
// subtract 2 since iterateOnSplits doesn't include boundary keys // subtract 2 since iterateOnSplits doesn't include boundary keys
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows - 2)) { for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows - 2)) {
KeyValue kv = new KeyValue(key, family, qualifier, now, key); KeyValue kv = new KeyValue(key, family, qualifier, now, key);
if (withTag) {
// add a tag. Arbitrarily chose mob tag since we have a helper already.
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, key);
kv = MobUtils.createMobRefKeyValue(kv, key, tableNameTag);
// verify that the kv has the tag.
byte[] ta = kv.getTagsArray();
int toff = kv.getTagsOffset();
int tlen = kv.getTagsLength();
Tag t = Tag.getTag(ta, toff, tlen, TagType.MOB_TABLE_NAME_TAG_TYPE);
if (t == null) {
throw new IllegalStateException("Tag didn't stick to KV " + kv.toString());
}
}
writer.append(kv); writer.append(kv);
} }
} finally { } finally {
@ -63,4 +118,30 @@ public class HFileTestUtil {
writer.close(); writer.close();
} }
} }
/**
* This verifies that each cell has a tag that is equal to its rowkey name. For this to work
* the hbase instance must have HConstants.RPC_CODEC_CONF_KEY set to
* KeyValueCodecWithTags.class.getCanonicalName());
* @param table table containing tagged cells
* @throws IOException if problems reading table
*/
public static void verifyTags(Table table) throws IOException {
ResultScanner s = table.getScanner(new Scan());
for (Result r : s) {
for (Cell c : r.listCells()) {
byte[] ta = c.getTagsArray();
int toff = c.getTagsOffset();
int tlen = c.getTagsLength();
Tag t = Tag.getTag(ta, toff, tlen, TagType.MOB_TABLE_NAME_TAG_TYPE);
if (t == null) {
fail(c.toString() + " has null tag");
continue;
}
byte[] tval = t.getValue();
assertArrayEquals(c.toString() + " has tag" + Bytes.toString(tval),
r.getRow(), tval);
}
}
}
} }