diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java index 597a7641e82..10e34d2c259 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java @@ -75,10 +75,6 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression connection = ConnectionFactory.createConnection(conf); try { labelsTable = connection.getTable(LABELS_TABLE_NAME); - } catch (TableNotFoundException e) { - // Just return with out doing any thing. When the VC is not used we wont be having 'labels' - // table in the cluster. - return; } catch (IOException e) { LOG.error("Error opening 'labels' table", e); return; @@ -95,6 +91,9 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); labels.put(Bytes.toString(value), Bytes.toInt(row)); } + } catch (TableNotFoundException e) { + // Table not found. So just return + return; } catch (IOException e) { LOG.error("Error scanning 'labels' table", e); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index b302d586417..6657c99ccf4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -18,18 +18,28 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.TreeSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; @@ -45,7 +55,17 @@ import org.apache.hadoop.util.StringUtils; @InterfaceStability.Stable public class PutSortReducer extends Reducer { - + // the cell creator + private CellCreator kvCreator; + + @Override + protected void + setup(Reducer.Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.kvCreator = new CellCreator(conf); + } + @Override protected void reduce( ImmutableBytesWritable row, @@ -62,11 +82,48 @@ public class PutSortReducer extends TreeSet map = new TreeSet(CellComparator.COMPARATOR); long curSize = 0; // stop at the end or the RAM threshold + List tags = new ArrayList(); while (iter.hasNext() && curSize < threshold) { + // clear the tags + tags.clear(); Put p = iter.next(); + long t = p.getTTL(); + if (t != Long.MAX_VALUE) { + // add TTL tag if found + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); + } + byte[] acl = p.getACL(); + if (acl != null) { + // add ACL tag if found + tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); + } + try { + CellVisibility cellVisibility = p.getCellVisibility(); + if (cellVisibility != null) { + // add the visibility labels if any + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibility.getExpression())); + } + } catch (DeserializationException e) { + // We just throw exception here. Should we allow other mutations to proceed by + // just ignoring the bad one? + throw new IOException("Invalid visibility expression found in mutation " + p, e); + } for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + KeyValue kv = null; + TagUtil.carryForwardTags(tags, cell); + if (!tags.isEmpty()) { + kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength(), tags); + } else { + kv = KeyValueUtil.ensureKeyValue(cell); + } if (map.add(kv)) {// don't count duplicated kv into size curSize += kv.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index d2adbd4a2cd..1e09f037a5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -101,9 +101,8 @@ public class TextSortReducer extends */ @Override protected void setup(Context context) { - doSetup(context); - Configuration conf = context.getConfiguration(); + doSetup(context, conf); parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); if (parser.getRowKeyColumnIndex() == -1) { @@ -115,10 +114,9 @@ public class TextSortReducer extends /** * Handles common parameter initialization that a subclass might want to leverage. * @param context + * @param conf */ - protected void doSetup(Context context) { - Configuration conf = context.getConfiguration(); - + protected void doSetup(Context context, Configuration conf) { // If a custom separator has been used, // decode it back from Base64 encoding. separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 12761d33ab9..486c961e115 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -183,11 +184,78 @@ public class TestHFileOutputFormat2 { } } - private void setupRandomGeneratorMapper(Job job) { - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(RandomKVGeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + /** + * Simple mapper that makes Put output. + */ + static class RandomPutGeneratingMapper + extends Mapper { + + private int keyLength; + private static final int KEYLEN_DEFAULT=10; + private static final String KEYLEN_CONF="randomkv.key.length"; + + private int valLength; + private static final int VALLEN_DEFAULT=10; + private static final String VALLEN_CONF="randomkv.val.length"; + private static final byte [] QUALIFIER = Bytes.toBytes("data"); + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + @Override + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException ,InterruptedException + { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + Random random = new Random(); + for (int i = 0; i < ROWSPERSPLIT; i++) { + + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + Put p = new Put(keyBytes); + p.addColumn(family, QUALIFIER, valBytes); + // set TTL to very low so that the scan does not return any value + p.setTTL(1l); + context.write(key, p); + } + } + } + } + + private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { + if (putSortReducer) { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomPutGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + } else { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomKVGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + } } /** @@ -325,7 +393,7 @@ public class TestHFileOutputFormat2 { conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); Job job = new Job(conf, "testWritingPEData"); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, false); // This partitioner doesn't work well for number keys but using it anyways // just to demonstrate how to configure it. byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; @@ -441,13 +509,13 @@ public class TestHFileOutputFormat2 { @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoad() throws Exception { LOG.info("\nStarting test testMRIncrementalLoad\n"); - doIncrementalLoadTest(false, false); + doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); } @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoadWithSplit() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); - doIncrementalLoadTest(true, false); + doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); } /** @@ -461,12 +529,18 @@ public class TestHFileOutputFormat2 { @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test public void testMRIncrementalLoadWithLocality() throws Exception { LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); - doIncrementalLoadTest(false, true); - doIncrementalLoadTest(true, true); + doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); + doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); } - private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality) - throws Exception { + @Test + public void testMRIncrementalLoadWithPutSortReducer() throws Exception { + LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); + doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); + } + + private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, + boolean putSortReducer, String tableStr) throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); @@ -485,17 +559,18 @@ public class TestHFileOutputFormat2 { hostnames[i] = "datanode_" + i; } util.startMiniCluster(1, hostCount, hostnames); - - Table table = util.createTable(TABLE_NAME, FAMILIES, splitKeys); + TableName tableName = TableName.valueOf(tableStr); + Table table = util.createTable(tableName, FAMILIES, splitKeys); Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); - try (RegionLocator r = util.getConnection().getRegionLocator(TABLE_NAME); Admin admin = + FileSystem fs = testDir.getFileSystem(conf); + try (RegionLocator r = util.getConnection().getRegionLocator(tableName); Admin admin = util.getConnection().getAdmin();) { assertEquals("Should start with empty table", 0, util.countRows(table)); int numRegions = r.getStartKeys().length; assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); // Generate the bulk load files - runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir); + runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir, putSortReducer); // This doesn't write into the table, just makes files assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); @@ -518,9 +593,9 @@ public class TestHFileOutputFormat2 { util.deleteTable(table.getName()); byte[][] newSplitKeys = generateRandomSplitKeys(14); - table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys); + table = util.createTable(tableName, FAMILIES, newSplitKeys); - while (util.getConnection().getRegionLocator(TABLE_NAME) + while (util.getConnection().getRegionLocator(tableName) .getAllRegionLocations().size() != 15 || !admin.isTableAvailable(table.getName())) { Thread.sleep(200); @@ -532,25 +607,31 @@ public class TestHFileOutputFormat2 { new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, r); // Ensure data shows up - int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", - expectedRows, util.countRows(table)); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - for (Result res : results) { - assertEquals(FAMILIES.length, res.rawCells().length); - Cell first = res.rawCells()[0]; - for (Cell kv : res.rawCells()) { - assertTrue(CellUtil.matchingRow(first, kv)); - assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + int expectedRows = 0; + if (putSortReducer) { + // no rows should be extracted + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(table)); + } else { + expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, + util.countRows(table)); + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.rawCells().length); + Cell first = res.rawCells()[0]; + for (Cell kv : res.rawCells()) { + assertTrue(CellUtil.matchingRow(first, kv)); + assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + } } + results.close(); } - results.close(); String tableDigestBefore = util.checksumRows(table); - // Check region locality HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); - for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) { + for (HRegion region : util.getHBaseCluster().getRegions(tableName)) { hbd.add(region.getHDFSBlocksDistribution()); } for (String hostname : hostnames) { @@ -560,31 +641,31 @@ public class TestHFileOutputFormat2 { } // Cause regions to reopen - admin.disableTable(TABLE_NAME); - while (!admin.isTableDisabled(TABLE_NAME)) { + admin.disableTable(tableName); + while (!admin.isTableDisabled(tableName)) { Thread.sleep(200); LOG.info("Waiting for table to disable"); } - admin.enableTable(TABLE_NAME); - util.waitTableAvailable(TABLE_NAME); + admin.enableTable(tableName); + util.waitTableAvailable(tableName); assertEquals("Data should remain after reopening of regions", tableDigestBefore, util.checksumRows(table)); } finally { testDir.getFileSystem(conf).delete(testDir, true); - util.deleteTable(TABLE_NAME); + util.deleteTable(tableName); util.shutdownMiniCluster(); } } private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException, - InterruptedException, ClassNotFoundException { + RegionLocator regionLocator, Path outDir, boolean putSortReducer) throws IOException, + UnsupportedEncodingException, InterruptedException, ClassNotFoundException { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, putSortReducer); HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); FileOutputFormat.setOutputPath(job, outDir); @@ -937,7 +1018,7 @@ public class TestHFileOutputFormat2 { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); - setupRandomGeneratorMapper(job); + setupRandomGeneratorMapper(job, false); HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); @@ -1040,7 +1121,7 @@ public class TestHFileOutputFormat2 { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME), - testDir); + testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); } @@ -1132,7 +1213,7 @@ public class TestHFileOutputFormat2 { true); RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); - runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir); + runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir, false); // Perform the actual load new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); @@ -1203,7 +1284,7 @@ public class TestHFileOutputFormat2 { Admin admin = c.getAdmin(); RegionLocator regionLocator = c.getRegionLocator(tname)) { Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir); + runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir, false); } } else { throw new RuntimeException(