HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the

Puts (Ram)
This commit is contained in:
Ramkrishna 2016-06-24 15:26:51 +05:30
parent 83c0cc109e
commit a02c6da4d3
4 changed files with 178 additions and 44 deletions

View File

@ -75,10 +75,6 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
connection = ConnectionFactory.createConnection(conf);
try {
labelsTable = connection.getTable(LABELS_TABLE_NAME);
} catch (TableNotFoundException e) {
// Just return with out doing any thing. When the VC is not used we wont be having 'labels'
// table in the cluster.
return;
} catch (IOException e) {
LOG.error("Error opening 'labels' table", e);
return;
@ -95,6 +91,9 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
labels.put(Bytes.toString(value), Bytes.toInt(row));
}
} catch (TableNotFoundException e) {
// Table not found. So just return
return;
} catch (IOException e) {
LOG.error("Error scanning 'labels' table", e);
} finally {

View File

@ -18,17 +18,25 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
@ -44,7 +52,17 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceStability.Stable
public class PutSortReducer extends
Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
// the cell creator
private CellCreator kvCreator;
@Override
protected void
setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.kvCreator = new CellCreator(conf);
}
@Override
protected void reduce(
ImmutableBytesWritable row,
@ -61,11 +79,48 @@ public class PutSortReducer extends
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
List<Tag> tags = new ArrayList<Tag>();
while (iter.hasNext() && curSize < threshold) {
// clear the tags
tags.clear();
Put p = iter.next();
long t = p.getTTL();
if (t != Long.MAX_VALUE) {
// add TTL tag if found
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
}
byte[] acl = p.getACL();
if (acl != null) {
// add ACL tag if found
tags.add(new Tag(TagType.ACL_TAG_TYPE, acl));
}
try {
CellVisibility cellVisibility = p.getCellVisibility();
if (cellVisibility != null) {
// add the visibility labels if any
tags.addAll(kvCreator.getVisibilityExpressionResolver()
.createVisibilityExpTags(cellVisibility.getExpression()));
}
} catch (DeserializationException e) {
// We just throw exception here. Should we allow other mutations to proceed by
// just ignoring the bad one?
throw new IOException("Invalid visibility expression found in mutation " + p, e);
}
for (List<Cell> cells: p.getFamilyCellMap().values()) {
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
KeyValue kv = null;
Tag.carryForwardTags(tags, cell);
if (!tags.isEmpty()) {
kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(), tags);
} else {
kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
}
if (map.add(kv)) {// don't count duplicated kv into size
curSize += kv.heapSize();
}

View File

@ -99,9 +99,8 @@ public class TextSortReducer extends
*/
@Override
protected void setup(Context context) {
doSetup(context);
Configuration conf = context.getConfiguration();
doSetup(context, conf);
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
if (parser.getRowKeyColumnIndex() == -1) {
@ -113,10 +112,9 @@ public class TextSortReducer extends
/**
* Handles common parameter initialization that a subclass might want to leverage.
* @param context
* @param conf
*/
protected void doSetup(Context context) {
Configuration conf = context.getConfiguration();
protected void doSetup(Context context, Configuration conf) {
// If a custom separator has been used,
// decode it back from Base64 encoding.
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -183,11 +184,78 @@ public class TestHFileOutputFormat2 {
}
}
private void setupRandomGeneratorMapper(Job job) {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomKVGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
/**
* Simple mapper that makes Put output.
*/
static class RandomPutGeneratingMapper
extends Mapper<NullWritable, NullWritable,
ImmutableBytesWritable, Put> {
private int keyLength;
private static final int KEYLEN_DEFAULT=10;
private static final String KEYLEN_CONF="randomkv.key.length";
private int valLength;
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
}
@Override
protected void map(
NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable,Put>.Context context)
throws java.io.IOException ,InterruptedException
{
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Put p = new Put(keyBytes);
p.addColumn(family, QUALIFIER, valBytes);
// set TTL to very low so that the scan does not return any value
p.setTTL(1l);
context.write(key, p);
}
}
}
}
private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
if (putSortReducer) {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomPutGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
} else {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomKVGeneratingMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
}
}
/**
@ -325,7 +393,7 @@ public class TestHFileOutputFormat2 {
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
Job job = new Job(conf, "testWritingPEData");
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, false);
// This partitioner doesn't work well for number keys but using it anyways
// just to demonstrate how to configure it.
byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
@ -438,13 +506,13 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoad() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoad\n");
doIncrementalLoadTest(false, false);
doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
}
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoadWithSplit() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
doIncrementalLoadTest(true, false);
doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
}
/**
@ -458,12 +526,18 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testMRIncrementalLoadWithLocality() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
doIncrementalLoadTest(false, true);
doIncrementalLoadTest(true, true);
doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
throws Exception {
@Test
public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
}
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, String tableStr) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
@ -491,7 +565,8 @@ public class TestHFileOutputFormat2 {
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
// Generate the bulk load files
runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir,
putSortReducer);
// This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
@ -526,21 +601,28 @@ public class TestHFileOutputFormat2 {
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
assertEquals(FAMILIES.length, res.rawCells().length);
Cell first = res.rawCells()[0];
for (Cell kv : res.rawCells()) {
assertTrue(CellUtil.matchingRow(first, kv));
assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
int expectedRows = 0;
if (putSortReducer) {
// no rows should be extracted
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
} else {
// Ensure data shows up
expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
assertEquals(FAMILIES.length, res.rawCells().length);
Cell first = res.rawCells()[0];
for (Cell kv : res.rawCells()) {
assertTrue(CellUtil.matchingRow(first, kv));
assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
}
}
results.close();
}
results.close();
String tableDigestBefore = util.checksumRows(table);
// Check region locality
@ -572,14 +654,14 @@ public class TestHFileOutputFormat2 {
}
private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
InterruptedException, ClassNotFoundException {
RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException,
UnsupportedEncodingException, InterruptedException, ClassNotFoundException {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, putSortReducer);
HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
FileOutputFormat.setOutputPath(job, outDir);
@ -931,7 +1013,7 @@ public class TestHFileOutputFormat2 {
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job);
setupRandomGeneratorMapper(job, false);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
@ -1033,7 +1115,7 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
testDir);
testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
}
@ -1125,7 +1207,7 @@ public class TestHFileOutputFormat2 {
true);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false);
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
@ -1196,7 +1278,7 @@ public class TestHFileOutputFormat2 {
Admin admin = c.getAdmin();
RegionLocator regionLocator = c.getRegionLocator(tname)) {
Path outDir = new Path("incremental-out");
runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false);
}
} else {
throw new RuntimeException(