HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the

Puts (Ram)
This commit is contained in:
Ramkrishna 2016-06-24 15:09:53 +05:30
parent 518faa735b
commit 99dc300d37
4 changed files with 190 additions and 55 deletions

View File

@ -75,10 +75,6 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
connection = ConnectionFactory.createConnection(conf);
try {
labelsTable = connection.getTable(LABELS_TABLE_NAME);
} catch (TableNotFoundException e) {
// Just return with out doing any thing. When the VC is not used we wont be having 'labels'
// table in the cluster.
return;
} catch (IOException e) {
LOG.error("Error opening 'labels' table", e);
return;
@ -95,6 +91,9 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
labels.put(Bytes.toString(value), Bytes.toInt(row));
}
} catch (TableNotFoundException e) {
// Table not found. So just return
return;
} catch (IOException e) {
LOG.error("Error scanning 'labels' table", e);
} finally {

View File

@ -18,18 +18,28 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
@ -45,6 +55,16 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceStability.Stable
public class PutSortReducer extends
Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
// the cell creator
private CellCreator kvCreator;
@Override
protected void
setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.kvCreator = new CellCreator(conf);
}
@Override
protected void reduce(
@ -62,11 +82,48 @@ public class PutSortReducer extends
TreeSet<KeyValue> map = new TreeSet<KeyValue>(CellComparator.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
List<Tag> tags = new ArrayList<Tag>();
while (iter.hasNext() && curSize < threshold) {
// clear the tags
tags.clear();
Put p = iter.next();
long t = p.getTTL();
if (t != Long.MAX_VALUE) {
// add TTL tag if found
tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
}
byte[] acl = p.getACL();
if (acl != null) {
// add ACL tag if found
tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
}
try {
CellVisibility cellVisibility = p.getCellVisibility();
if (cellVisibility != null) {
// add the visibility labels if any
tags.addAll(kvCreator.getVisibilityExpressionResolver()
.createVisibilityExpTags(cellVisibility.getExpression()));
}
} catch (DeserializationException e) {
// We just throw exception here. Should we allow other mutations to proceed by
// just ignoring the bad one?
throw new IOException("Invalid visibility expression found in mutation " + p, e);
}
for (List<Cell> cells: p.getFamilyCellMap().values()) {
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
KeyValue kv = null;
TagUtil.carryForwardTags(tags, cell);
if (!tags.isEmpty()) {
kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(), tags);
} else {
kv = KeyValueUtil.ensureKeyValue(cell);
}
if (map.add(kv)) {// don't count duplicated kv into size
curSize += kv.heapSize();
}

View File

@ -101,9 +101,8 @@ public class TextSortReducer extends
*/
@Override
protected void setup(Context context) {
doSetup(context);
Configuration conf = context.getConfiguration();
doSetup(context, conf);
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
if (parser.getRowKeyColumnIndex() == -1) {
@ -115,10 +114,9 @@ public class TextSortReducer extends
/**
* Handles common parameter initialization that a subclass might want to leverage.
* @param context
* @param conf
*/
protected void doSetup(Context context) {
Configuration conf = context.getConfiguration();
protected void doSetup(Context context, Configuration conf) {
// If a custom separator has been used,
// decode it back from Base64 encoding.
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -183,12 +184,79 @@ public class TestHFileOutputFormat2 {
}
}
private void setupRandomGeneratorMapper(Job job) {
/**
* Simple mapper that makes Put output.
*/
static class RandomPutGeneratingMapper
extends Mapper<NullWritable, NullWritable,
ImmutableBytesWritable, Put> {
private int keyLength;
private static final int KEYLEN_DEFAULT=10;
private static final String KEYLEN_CONF="randomkv.key.length";
private int valLength;
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
@Override
protected void map(
NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable,Put>.Context context)
throws java.io.IOException ,InterruptedException
{
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Put p = new Put(keyBytes);
p.addColumn(family, QUALIFIER, valBytes);
// set TTL to very low so that the scan does not return any value
p.setTTL(1l);
context.write(key, p);
}
}
}
}
private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
if (putSortReducer) {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomPutGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
} else {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomKVGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
}
}
/**
* Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
@ -325,7 +393,7 @@ public class TestHFileOutputFormat2 {
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
Job job = new Job(conf, "testWritingPEData");
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, false);
// This partitioner doesn't work well for number keys but using it anyways
// just to demonstrate how to configure it.
byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
@ -441,13 +509,13 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoad() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoad\n");
doIncrementalLoadTest(false, false);
doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
}
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoadWithSplit() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
doIncrementalLoadTest(true, false);
doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
}
/**
@ -461,12 +529,18 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoadWithLocality() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
doIncrementalLoadTest(false, true);
doIncrementalLoadTest(true, true);
doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
throws Exception {
@Test
public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, String tableStr) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
@ -485,17 +559,18 @@ public class TestHFileOutputFormat2 {
hostnames[i] = "datanode_" + i;
}
util.startMiniCluster(1, hostCount, hostnames);
Table table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
TableName tableName = TableName.valueOf(tableStr);
Table table = util.createTable(tableName, FAMILIES, splitKeys);
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
try (RegionLocator r = util.getConnection().getRegionLocator(TABLE_NAME); Admin admin =
FileSystem fs = testDir.getFileSystem(conf);
try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin =
util.getConnection().getAdmin();) {
assertEquals("Should start with empty table", 0, util.countRows(table));
int numRegions = r.getStartKeys().length;
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
// Generate the bulk load files
runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir);
runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer);
// This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
@ -518,9 +593,9 @@ public class TestHFileOutputFormat2 {
util.deleteTable(table.getName());
byte[][] newSplitKeys = generateRandomSplitKeys(14);
table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
table = util.createTable(tableName, FAMILIES, newSplitKeys);
while (util.getConnection().getRegionLocator(TABLE_NAME)
while (util.getConnection().getRegionLocator(tableName)
.getAllRegionLocations().size() != 15 ||
!admin.isTableAvailable(table.getName())) {
Thread.sleep(200);
@ -532,9 +607,15 @@ public class TestHFileOutputFormat2 {
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r);
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table",
expectedRows, util.countRows(table));
int expectedRows = 0;
if (putSortReducer) {
// no rows should be extracted
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
} else {
expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
@ -546,11 +627,11 @@ public class TestHFileOutputFormat2 {
}
}
results.close();
}
String tableDigestBefore = util.checksumRows(table);
// Check region locality
HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) {
for (HRegion region : util.getHBaseCluster().getRegions(tableName)) {
hbd.add(region.getHDFSBlocksDistribution());
}
for (String hostname : hostnames) {
@ -560,31 +641,31 @@ public class TestHFileOutputFormat2 {
}
// Cause regions to reopen
admin.disableTable(TABLE_NAME);
while (!admin.isTableDisabled(TABLE_NAME)) {
admin.disableTable(tableName);
while (!admin.isTableDisabled(tableName)) {
Thread.sleep(200);
LOG.info("Waiting for table to disable");
}
admin.enableTable(TABLE_NAME);
util.waitTableAvailable(TABLE_NAME);
admin.enableTable(tableName);
util.waitTableAvailable(tableName);
assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(table));
} finally {
testDir.getFileSystem(conf).delete(testDir, true);
util.deleteTable(TABLE_NAME);
util.deleteTable(tableName);
util.shutdownMiniCluster();
}
}
private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
InterruptedException, ClassNotFoundException {
RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException,
UnsupportedEncodingException, InterruptedException, ClassNotFoundException {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, putSortReducer);
HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
FileOutputFormat.setOutputPath(job, outDir);
@ -937,7 +1018,7 @@ public class TestHFileOutputFormat2 {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, false);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
@ -1040,7 +1121,7 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
testDir);
testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
}
@ -1132,7 +1213,7 @@ public class TestHFileOutputFormat2 {
true);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
@ -1203,7 +1284,7 @@ public class TestHFileOutputFormat2 {
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out");
runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false);
}
} else {
throw new RuntimeException(