diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index b9a08676f8e..cfbdd648625 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2015,7 +2015,8 @@ public final class ProtobufUtil { kvbuilder.setTimestamp(kv.getTimestamp()); kvbuilder.setValue(wrap(((ByteBufferExtendedCell) kv).getValueByteBuffer(), ((ByteBufferExtendedCell) kv).getValuePosition(), kv.getValueLength())); - // TODO : Once tags become first class then we may have to set tags to kvbuilder. + kvbuilder.setTags(wrap(((ByteBufferExtendedCell) kv).getTagsByteBuffer(), + ((ByteBufferExtendedCell) kv).getTagsPosition(), kv.getTagsLength())); } else { kvbuilder.setRow( UnsafeByteOperations.unsafeWrap(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); @@ -2027,6 +2028,8 @@ public final class ProtobufUtil { kvbuilder.setTimestamp(kv.getTimestamp()); kvbuilder.setValue(UnsafeByteOperations.unsafeWrap(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + kvbuilder.setTags(UnsafeByteOperations.unsafeWrap(kv.getTagsArray(), kv.getTagsOffset(), + kv.getTagsLength())); } return kvbuilder.build(); } @@ -2039,14 +2042,17 @@ public final class ProtobufUtil { } public static Cell toCell(ExtendedCellBuilder cellBuilder, final CellProtos.Cell cell) { - return cellBuilder.clear() - .setRow(cell.getRow().toByteArray()) - .setFamily(cell.getFamily().toByteArray()) - .setQualifier(cell.getQualifier().toByteArray()) - .setTimestamp(cell.getTimestamp()) - .setType((byte) cell.getCellType().getNumber()) - .setValue(cell.getValue().toByteArray()) - .build(); + ExtendedCellBuilder builder = cellBuilder.clear() + .setRow(cell.getRow().toByteArray()) + .setFamily(cell.getFamily().toByteArray()) + .setQualifier(cell.getQualifier().toByteArray()) + .setTimestamp(cell.getTimestamp()) + .setType((byte) cell.getCellType().getNumber()) + .setValue(cell.getValue().toByteArray()); + if (cell.hasTags()) { + builder.setTags(cell.getTags().toByteArray()); + } + return builder.build(); } public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index 7d6eda817cf..791beb7ede5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -18,22 +18,30 @@ package org.apache.hadoop.hbase.shaded.protobuf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.ExtendedCellBuilder; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -479,4 +487,40 @@ public class TestProtobufUtil { + "\"sharedLockCount\":0" + "}]", lockJson); } + + /** + * Test {@link ProtobufUtil#toCell(Cell)} and + * {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell)} conversion + * methods when it contains tags. + */ + @Test + public void testCellConversionWithTags() { + String tagStr = "tag-1"; + byte tagType = (byte)10; + Tag tag = new ArrayBackedTag(tagType, tagStr); + + ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(Bytes.toBytes("row1")); + cellBuilder.setFamily(Bytes.toBytes("f1")); + cellBuilder.setQualifier(Bytes.toBytes("q1")); + cellBuilder.setValue(Bytes.toBytes("value1")); + cellBuilder.setType(Cell.Type.Delete); + cellBuilder.setTags(Collections.singletonList(tag)); + Cell cell = cellBuilder.build(); + + ClientProtos.Result protoResult = + ProtobufUtil.toResult(Result.create(Collections.singletonList(cell))); + assertNotNull(protoResult); + assertEquals(1, protoResult.getCellCount()); + + CellProtos.Cell protoCell = protoResult.getCell(0); + ExtendedCellBuilder decodedBuilder = + ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); + Cell decodedCell = ProtobufUtil.toCell(decodedBuilder, protoCell); + List decodedTags = PrivateCellUtil.getTags(decodedCell); + assertEquals(1, decodedTags.size()); + Tag decodedTag = decodedTags.get(0); + assertEquals(tagType, decodedTag.getType()); + assertEquals(tagStr, Tag.getValueAsString(decodedTag)); + } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 239a12bdc68..30071fdfd80 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -511,6 +512,7 @@ public class Import extends Configured implements Tool { // If there's a rename mapping for this CF, create a new KeyValue byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); if (newCfName != null) { + List tags = PrivateCellUtil.getTags(kv); kv = new KeyValue(kv.getRowArray(), // row buffer kv.getRowOffset(), // row offset kv.getRowLength(), // row length @@ -524,7 +526,8 @@ public class Import extends Configured implements Tool { KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type kv.getValueArray(), // value buffer kv.getValueOffset(), // value offset - kv.getValueLength()); // value length + kv.getValueLength(), // value length + tags.size() == 0 ? null: tags); } } return kv; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 12060a742a2..5a95fd8eecb 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -34,10 +34,13 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -46,10 +49,12 @@ import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; @@ -58,11 +63,18 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; @@ -117,6 +129,9 @@ public class TestImportExport { private static final long now = System.currentTimeMillis(); private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); + public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1); + public static final String TEST_ATTR = "source_op"; + public static final String TEST_TAG = "test_tag"; @BeforeClass public static void beforeClass() throws Throwable { @@ -801,4 +816,147 @@ public class TestImportExport { return isVisited; } } + + /** + * Add cell tags to delete mutations, run export and import tool and + * verify that tags are present in import table also. + * @throws Throwable throws Throwable. + */ + @Test + public void testTagsAddition() throws Throwable { + final TableName exportTable = TableName.valueOf(name.getMethodName()); + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(exportTable) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .setCoprocessor(MetadataController.class.getName()) + .build(); + UTIL.getAdmin().createTable(desc); + + Table exportT = UTIL.getConnection().getTable(exportTable); + + //Add first version of QUAL + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + exportT.put(p); + + //Add Delete family marker + Delete d = new Delete(ROW1, now+3); + // Add test attribute to delete mutation. + d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); + exportT.delete(d); + + // Run export too with KeyValueCodecWithTags as Codec. This will ensure that export tool + // will use KeyValueCodecWithTags. + String[] args = new String[] { + "-D" + ExportUtils.RAW_SCAN + "=true", + // This will make sure that codec will encode and decode tags in rpc call. + "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", + exportTable.getNameAsString(), + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(runExport(args)); + // Assert tag exists in exportTable + assertTagExists(exportTable); + + // Create an import table with MetadataController. + final TableName importTable = TableName.valueOf("importWithTestTagsAddition"); + TableDescriptor importTableDesc = TableDescriptorBuilder + .newBuilder(importTable) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + .build()) + .setCoprocessor(MetadataController.class.getName()) + .build(); + UTIL.getAdmin().createTable(importTableDesc); + + // Run import tool. + args = new String[] { + // This will make sure that codec will encode and decode tags in rpc call. + "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", + importTable.getNameAsString(), + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + // Make sure that tags exists in imported table. + assertTagExists(importTable); + } + + private void assertTagExists(TableName table) throws IOException { + List values = new ArrayList<>(); + for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) { + Scan scan = new Scan(); + // Make sure to set rawScan to true so that we will get Delete Markers. + scan.setRaw(true); + scan.readAllVersions(); + scan.withStartRow(ROW1); + // Need to use RegionScanner instead of table#getScanner since the latter will + // not return tags since it will go through rpc layer and remove tags intentionally. + RegionScanner scanner = region.getScanner(scan); + scanner.next(values); + if (!values.isEmpty()) { + break; + } + } + boolean deleteFound = false; + for (Cell cell: values) { + if (PrivateCellUtil.isDelete(cell.getType().getCode())) { + deleteFound = true; + List tags = PrivateCellUtil.getTags(cell); + Assert.assertEquals(1, tags.size()); + for (Tag tag : tags) { + Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag)); + } + } + } + Assert.assertTrue(deleteFound); + } + + /* + This co-proc will add a cell tag to delete mutation. + */ + public static class MetadataController implements RegionCoprocessor, RegionObserver { + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) + throws IOException { + if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) { + return; + } + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + if (!(m instanceof Delete)) { + continue; + } + byte[] sourceOpAttr = m.getAttribute(TEST_ATTR); + if (sourceOpAttr == null) { + continue; + } + Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr); + List updatedCells = new ArrayList<>(); + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance(); ) { + Cell cell = cellScanner.current(); + List tags = PrivateCellUtil.getTags(cell); + tags.add(sourceOpTag); + Cell updatedCell = PrivateCellUtil.createCell(cell, tags); + updatedCells.add(updatedCell); + } + m.getFamilyCellMap().clear(); + // Clear and add new Cells to the Mutation. + for (Cell cell : updatedCells) { + Delete d = (Delete) m; + d.add(cell); + } + } + } + } }