diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java index 597a7641e82..10e34d2c259 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java @@ -75,10 +75,6 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression connection = ConnectionFactory.createConnection(conf); try { labelsTable = connection.getTable(LABELS_TABLE_NAME); - } catch (TableNotFoundException e) { - // Just return with out doing any thing. When the VC is not used we wont be having 'labels' - // table in the cluster. - return; } catch (IOException e) { LOG.error("Error opening 'labels' table", e); return; @@ -95,6 +91,9 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); labels.put(Bytes.toString(value), Bytes.toInt(row)); } + } catch (TableNotFoundException e) { + // Table not found. So just return + return; } catch (IOException e) { LOG.error("Error scanning 'labels' table", e); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index a71b66a2f53..ec93ba434c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -18,17 +18,25 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.TreeSet; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; @@ -44,7 +52,17 @@ import org.apache.hadoop.util.StringUtils; @InterfaceStability.Stable public class PutSortReducer extends Reducer { - + // the cell creator + private CellCreator kvCreator; + + @Override + protected void + setup(Reducer.Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.kvCreator = new CellCreator(conf); + } + @Override protected void reduce( ImmutableBytesWritable row, @@ -61,11 +79,48 @@ public class PutSortReducer extends TreeSet map = new TreeSet(KeyValue.COMPARATOR); long curSize = 0; // stop at the end or the RAM threshold + List tags = new ArrayList(); while (iter.hasNext() && curSize < threshold) { + // clear the tags + tags.clear(); Put p = iter.next(); + long t = p.getTTL(); + if (t != Long.MAX_VALUE) { + // add TTL tag if found + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); + } + byte[] acl = p.getACL(); + if (acl != null) { + // add ACL tag if found + tags.add(new Tag(TagType.ACL_TAG_TYPE, acl)); + } + try { + CellVisibility cellVisibility = p.getCellVisibility(); + if (cellVisibility != null) { + // add the visibility labels if any + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibility.getExpression())); + } + } catch (DeserializationException e) { + // We just throw exception here. Should we allow other mutations to proceed by + // just ignoring the bad one? + throw new IOException("Invalid visibility expression found in mutation " + p, e); + } for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + KeyValue kv = null; + Tag.carryForwardTags(tags, cell); + if (!tags.isEmpty()) { + kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength(), tags); + } else { + kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); + } if (map.add(kv)) {// don't count duplicated kv into size curSize += kv.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index c367645875b..56b58c4cbc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -99,9 +99,8 @@ public class TextSortReducer extends */ @Override protected void setup(Context context) { - doSetup(context); - Configuration conf = context.getConfiguration(); + doSetup(context, conf); parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); if (parser.getRowKeyColumnIndex() == -1) { @@ -113,10 +112,9 @@ public class TextSortReducer extends /** * Handles common parameter initialization that a subclass might want to leverage. * @param context + * @param conf */ - protected void doSetup(Context context) { - Configuration conf = context.getConfiguration(); - + protected void doSetup(Context context, Configuration conf) { // If a custom separator has been used, // decode it back from Base64 encoding. separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index aefe8cf6da0..c50d34d9426 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -183,11 +184,78 @@ public class TestHFileOutputFormat2 { } } - private void setupRandomGeneratorMapper(Job job) { - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(RandomKVGeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + /** + * Simple mapper that makes Put output. + */ + static class RandomPutGeneratingMapper + extends Mapper { + + private int keyLength; + private static final int KEYLEN_DEFAULT=10; + private static final String KEYLEN_CONF="randomkv.key.length"; + + private int valLength; + private static final int VALLEN_DEFAULT=10; + private static final String VALLEN_CONF="randomkv.val.length"; + private static final byte [] QUALIFIER = Bytes.toBytes("data"); + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + @Override + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException ,InterruptedException + { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + Random random = new Random(); + for (int i = 0; i < ROWSPERSPLIT; i++) { + + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Put p = new Put(keyBytes); + p.addColumn(family, QUALIFIER, valBytes); + // set TTL to very low so that the scan does not return any value + p.setTTL(1l); + context.write(key, p); + } + } + } + } + + private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { + if (putSortReducer) { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomPutGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + } else { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomKVGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + } } /** @@ -325,7 +393,7 @@ public class TestHFileOutputFormat2 { conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); Job job = new Job(conf, "testWritingPEData"); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, false); // This partitioner doesn't work well for number keys but using it anyways // just to demonstrate how to configure it. byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; @@ -438,13 +506,13 @@ public class TestHFileOutputFormat2 { @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoad() throws Exception { LOG.info("\nStarting test testMRIncrementalLoad\n"); - doIncrementalLoadTest(false, false); + doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); } @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoadWithSplit() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); - doIncrementalLoadTest(true, false); + doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); } /** @@ -458,12 +526,18 @@ public class TestHFileOutputFormat2 { @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoadWithLocality() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); - doIncrementalLoadTest(false, true); - doIncrementalLoadTest(true, true); + doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); + doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); } - private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality) - throws Exception { + @Test + public void testMRIncrementalLoadWithPutSortReducer() throws Exception { + LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); + doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); + } + + private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, + boolean putSortReducer, String tableStr) throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); @@ -491,7 +565,8 @@ public class TestHFileOutputFormat2 { assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); // Generate the bulk load files - runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir); + runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir, + putSortReducer); // This doesn't write into the table, just makes files assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); @@ -526,21 +601,28 @@ public class TestHFileOutputFormat2 { // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); - // Ensure data shows up - int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, - util.countRows(table)); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - for (Result res : results) { - assertEquals(FAMILIES.length, res.rawCells().length); - Cell first = res.rawCells()[0]; - for (Cell kv : res.rawCells()) { - assertTrue(CellUtil.matchingRow(first, kv)); - assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + int expectedRows = 0; + if (putSortReducer) { + // no rows should be extracted + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(table)); + } else { + // Ensure data shows up + expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(table)); + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.rawCells().length); + Cell first = res.rawCells()[0]; + for (Cell kv : res.rawCells()) { + assertTrue(CellUtil.matchingRow(first, kv)); + assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + } } + results.close(); } - results.close(); String tableDigestBefore = util.checksumRows(table); // Check region locality @@ -572,14 +654,14 @@ public class TestHFileOutputFormat2 { } private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException, - InterruptedException, ClassNotFoundException { + RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException, + UnsupportedEncodingException, InterruptedException, ClassNotFoundException { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, putSortReducer); HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); FileOutputFormat.setOutputPath(job, outDir); @@ -931,7 +1013,7 @@ public class TestHFileOutputFormat2 { conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, false); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); @@ -1033,7 +1115,7 @@ public class TestHFileOutputFormat2 { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME), - testDir); + testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); } @@ -1125,7 +1207,7 @@ public class TestHFileOutputFormat2 { true); RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); - runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir); + runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); @@ -1196,7 +1278,7 @@ public class TestHFileOutputFormat2 { Admin admin = c.getAdmin(); RegionLocator regionLocator = c.getRegionLocator(tname)) { Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir); + runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false); } } else { throw new RuntimeException(