HBASE-15035 bulkloading hfiles with tags that require splits do not preserve tags
This commit is contained in:
parent
30361079c7
commit
03e1451070
|
@ -37,7 +37,7 @@ public class HFileContextBuilder {
|
||||||
/** Whether mvcc is to be included in the Read/Write **/
|
/** Whether mvcc is to be included in the Read/Write **/
|
||||||
private boolean includesMvcc = true;
|
private boolean includesMvcc = true;
|
||||||
/** Whether tags are to be included in the Read/Write **/
|
/** Whether tags are to be included in the Read/Write **/
|
||||||
private boolean includesTags;
|
private boolean includesTags = false;
|
||||||
/** Compression algorithm used **/
|
/** Compression algorithm used **/
|
||||||
private Algorithm compression = Algorithm.NONE;
|
private Algorithm compression = Algorithm.NONE;
|
||||||
/** Whether tags to be compressed or not **/
|
/** Whether tags to be compressed or not **/
|
||||||
|
|
|
@ -944,6 +944,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
||||||
.withBlockSize(blocksize)
|
.withBlockSize(blocksize)
|
||||||
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
|
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
|
||||||
|
.withIncludesTags(true)
|
||||||
.build();
|
.build();
|
||||||
halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
|
halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
|
||||||
fs)
|
fs)
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
@ -52,8 +54,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases for the "load" half of the HFileOutputFormat bulk load
|
* Test cases for the "load" half of the HFileOutputFormat bulk load
|
||||||
|
@ -62,6 +66,9 @@ import org.junit.experimental.categories.Category;
|
||||||
*/
|
*/
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestLoadIncrementalHFiles {
|
public class TestLoadIncrementalHFiles {
|
||||||
|
@Rule
|
||||||
|
public TestName tn = new TestName();
|
||||||
|
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
||||||
private static final String NAMESPACE = "bulkNS";
|
private static final String NAMESPACE = "bulkNS";
|
||||||
|
@ -82,6 +89,9 @@ public class TestLoadIncrementalHFiles {
|
||||||
util.getConfiguration().setInt(
|
util.getConfiguration().setInt(
|
||||||
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
||||||
MAX_FILES_PER_REGION_PER_FAMILY);
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
|
// change default behavior so that tag values are returned with normal rpcs
|
||||||
|
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
|
||||||
|
KeyValueCodecWithTags.class.getCanonicalName());
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
|
|
||||||
setupNamespace();
|
setupNamespace();
|
||||||
|
@ -226,6 +236,14 @@ public class TestLoadIncrementalHFiles {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
|
||||||
|
familyDesc.setBloomFilterType(bloomType);
|
||||||
|
htd.addFamily(familyDesc);
|
||||||
|
return htd;
|
||||||
|
}
|
||||||
|
|
||||||
private void runTest(String testName, BloomType bloomType,
|
private void runTest(String testName, BloomType bloomType,
|
||||||
byte[][][] hfileRanges) throws Exception {
|
byte[][][] hfileRanges) throws Exception {
|
||||||
runTest(testName, bloomType, null, hfileRanges);
|
runTest(testName, bloomType, null, hfileRanges);
|
||||||
|
@ -247,10 +265,7 @@ public class TestLoadIncrementalHFiles {
|
||||||
|
|
||||||
private void runTest(String testName, TableName tableName, BloomType bloomType,
|
private void runTest(String testName, TableName tableName, BloomType bloomType,
|
||||||
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
|
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
|
||||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
HTableDescriptor htd = buildHTD(tableName, bloomType);
|
||||||
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
|
|
||||||
familyDesc.setBloomFilterType(bloomType);
|
|
||||||
htd.addFamily(familyDesc);
|
|
||||||
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
|
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,6 +323,51 @@ public class TestLoadIncrementalHFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that tags survive through a bulk load that needs to split hfiles.
|
||||||
|
*
|
||||||
|
* This test depends on the "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client
|
||||||
|
* can get tags in the responses.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void htestTagsSurviveBulkLoadSplit() throws Exception {
|
||||||
|
Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
|
||||||
|
FileSystem fs = util.getTestFileSystem();
|
||||||
|
dir = dir.makeQualified(fs);
|
||||||
|
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
|
||||||
|
// table has these split points
|
||||||
|
byte [][] tableSplitKeys = new byte[][] {
|
||||||
|
Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
|
||||||
|
Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
|
||||||
|
};
|
||||||
|
|
||||||
|
// creating an hfile that has values that span the split points.
|
||||||
|
byte[] from = Bytes.toBytes("ddd");
|
||||||
|
byte[] to = Bytes.toBytes("ooo");
|
||||||
|
HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
|
||||||
|
new Path(familyDir, tn.getMethodName()+"_hfile"),
|
||||||
|
FAMILY, QUALIFIER, from, to, 1000);
|
||||||
|
int expectedRows = 1000;
|
||||||
|
|
||||||
|
TableName tableName = TableName.valueOf(tn.getMethodName());
|
||||||
|
HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
|
||||||
|
util.getHBaseAdmin().createTable(htd, tableSplitKeys);
|
||||||
|
|
||||||
|
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
|
||||||
|
String [] args= {dir.toString(), tableName.toString()};
|
||||||
|
loader.run(args);
|
||||||
|
|
||||||
|
Table table = util.getConnection().getTable(tableName);
|
||||||
|
try {
|
||||||
|
assertEquals(expectedRows, util.countRows(table));
|
||||||
|
HFileTestUtil.verifyTags(table);
|
||||||
|
} finally {
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
util.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test loading into a column family that does not exist.
|
* Test loading into a column family that does not exist.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -33,6 +35,10 @@ public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrem
|
||||||
MAX_FILES_PER_REGION_PER_FAMILY);
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
||||||
|
// change default behavior so that tag values are returned with normal rpcs
|
||||||
|
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
|
||||||
|
KeyValueCodecWithTags.class.getCanonicalName());
|
||||||
|
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
setupNamespace();
|
setupNamespace();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
||||||
|
@ -51,6 +53,9 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
|
||||||
util.getConfiguration().setInt(
|
util.getConfiguration().setInt(
|
||||||
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
||||||
MAX_FILES_PER_REGION_PER_FAMILY);
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
|
// change default behavior so that tag values are returned with normal rpcs
|
||||||
|
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
|
||||||
|
KeyValueCodecWithTags.class.getCanonicalName());
|
||||||
|
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,14 @@ package org.apache.hadoop.hbase.util;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
@ -29,6 +36,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for HFile-related testing.
|
* Utility class for HFile-related testing.
|
||||||
|
@ -37,15 +49,45 @@ public class HFileTestUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an HFile with the given number of rows between a given
|
* Create an HFile with the given number of rows between a given
|
||||||
* start key and end key.
|
* start key and end key @ family:qualifier. The value will be the key value.
|
||||||
|
* This file will not have tags.
|
||||||
*/
|
*/
|
||||||
public static void createHFile(
|
public static void createHFile(
|
||||||
Configuration configuration,
|
Configuration configuration,
|
||||||
FileSystem fs, Path path,
|
FileSystem fs, Path path,
|
||||||
byte[] family, byte[] qualifier,
|
byte[] family, byte[] qualifier,
|
||||||
byte[] startKey, byte[] endKey, int numRows) throws IOException
|
byte[] startKey, byte[] endKey, int numRows) throws IOException {
|
||||||
{
|
createHFile(configuration, fs, path, family, qualifier, startKey, endKey,
|
||||||
HFileContext meta = new HFileContextBuilder().build();
|
numRows, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an HFile with the given number of rows between a given
|
||||||
|
* start key and end key @ family:qualifier. The value will be the key value.
|
||||||
|
* This cells will also have a tag whose value is the key.
|
||||||
|
*/
|
||||||
|
public static void createHFileWithTags(
|
||||||
|
Configuration configuration,
|
||||||
|
FileSystem fs, Path path,
|
||||||
|
byte[] family, byte[] qualifier,
|
||||||
|
byte[] startKey, byte[] endKey, int numRows) throws IOException {
|
||||||
|
createHFile(configuration, fs, path, family, qualifier, startKey, endKey, numRows, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an HFile with the given number of rows between a given
|
||||||
|
* start key and end key @ family:qualifier.
|
||||||
|
* If withTag is true, we add the rowKey as the tag value for
|
||||||
|
* tagtype ACL_TAG_TYPE
|
||||||
|
*/
|
||||||
|
public static void createHFile(
|
||||||
|
Configuration configuration,
|
||||||
|
FileSystem fs, Path path,
|
||||||
|
byte[] family, byte[] qualifier,
|
||||||
|
byte[] startKey, byte[] endKey, int numRows, boolean withTag) throws IOException {
|
||||||
|
HFileContext meta = new HFileContextBuilder()
|
||||||
|
.withIncludesTags(withTag)
|
||||||
|
.build();
|
||||||
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
|
HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
|
||||||
.withPath(fs, path)
|
.withPath(fs, path)
|
||||||
.withFileContext(meta)
|
.withFileContext(meta)
|
||||||
|
@ -53,8 +95,27 @@ public class HFileTestUtil {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
// subtract 2 since iterateOnSplits doesn't include boundary keys
|
// subtract 2 since iterateOnSplits doesn't include boundary keys
|
||||||
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
|
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows - 2)) {
|
||||||
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
|
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
|
||||||
|
if (withTag) {
|
||||||
|
// add a tag. Arbitrarily chose mob tag since we have a helper already.
|
||||||
|
List<Tag> tags = new ArrayList<Tag>();
|
||||||
|
tags.add(new Tag(TagType.ACL_TAG_TYPE, key));
|
||||||
|
kv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
|
||||||
|
kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
|
||||||
|
kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
|
||||||
|
kv.getTimestamp(), KeyValue.Type.Put, kv.getValueArray(), kv.getValueOffset(),
|
||||||
|
kv.getValueLength(), tags);
|
||||||
|
|
||||||
|
// verify that the kv has the tag.
|
||||||
|
byte[] ta = kv.getTagsArray();
|
||||||
|
int toff = kv.getTagsOffset();
|
||||||
|
int tlen = kv.getTagsLength();
|
||||||
|
Tag t = Tag.getTag(ta, toff, tlen, TagType.ACL_TAG_TYPE);
|
||||||
|
if (t == null) {
|
||||||
|
throw new IllegalStateException("Tag didn't stick to KV " + kv.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
writer.append(kv);
|
writer.append(kv);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -63,4 +124,30 @@ public class HFileTestUtil {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* This verifies that each cell has a tag that is equal to its rowkey name. For this to work
|
||||||
|
* the hbase instance must have HConstants.RPC_CODEC_CONF_KEY set to
|
||||||
|
* KeyValueCodecWithTags.class.getCanonicalName());
|
||||||
|
* @param table table containing tagged cells
|
||||||
|
* @throws IOException if problems reading table
|
||||||
|
*/
|
||||||
|
public static void verifyTags(Table table) throws IOException {
|
||||||
|
ResultScanner s = table.getScanner(new Scan());
|
||||||
|
for (Result r : s) {
|
||||||
|
for (Cell c : r.listCells()) {
|
||||||
|
byte[] ta = c.getTagsArray();
|
||||||
|
int toff = c.getTagsOffset();
|
||||||
|
int tlen = c.getTagsLength();
|
||||||
|
Tag t = Tag.getTag(ta, toff, tlen, TagType.ACL_TAG_TYPE);
|
||||||
|
if (t == null) {
|
||||||
|
fail(c.toString() + " has null tag");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
byte[] tval = t.getValue();
|
||||||
|
assertArrayEquals(c.toString() + " has tag" + Bytes.toString(tval),
|
||||||
|
r.getRow(), tval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue