HBASE-25246 Backup/Restore hbase cell tags (#2767)
Closes #2745 Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
8e4516536c
commit
d8083e8599
|
@ -1506,6 +1506,21 @@ public final class ProtobufUtil {
|
||||||
* @return the converted protocol buffer Result
|
* @return the converted protocol buffer Result
|
||||||
*/
|
*/
|
||||||
public static ClientProtos.Result toResult(final Result result) {
|
public static ClientProtos.Result toResult(final Result result) {
|
||||||
|
return toResult(result, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a client Result to a protocol buffer Result
|
||||||
|
* @param result the client Result to convert
|
||||||
|
* @param encodeTags whether to includeTags in converted protobuf result or not
|
||||||
|
* When @encodeTags is set to true, it will return all the tags in the response.
|
||||||
|
* These tags may contain some sensitive data like acl permissions, etc.
|
||||||
|
* Only the tools like Export, Import which needs to take backup needs to set
|
||||||
|
* it to true so that cell tags are persisted in backup.
|
||||||
|
* Refer to HBASE-25246 for more context.
|
||||||
|
* @return the converted protocol buffer Result
|
||||||
|
*/
|
||||||
|
public static ClientProtos.Result toResult(final Result result, boolean encodeTags) {
|
||||||
if (result.getExists() != null) {
|
if (result.getExists() != null) {
|
||||||
return toResult(result.getExists(), result.isStale());
|
return toResult(result.getExists(), result.isStale());
|
||||||
}
|
}
|
||||||
|
@ -1517,7 +1532,7 @@ public final class ProtobufUtil {
|
||||||
|
|
||||||
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
|
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
|
||||||
for (Cell c : cells) {
|
for (Cell c : cells) {
|
||||||
builder.addCell(toCell(c));
|
builder.addCell(toCell(c, encodeTags));
|
||||||
}
|
}
|
||||||
|
|
||||||
builder.setStale(result.isStale());
|
builder.setStale(result.isStale());
|
||||||
|
@ -1564,6 +1579,22 @@ public final class ProtobufUtil {
|
||||||
* @return the converted client Result
|
* @return the converted client Result
|
||||||
*/
|
*/
|
||||||
public static Result toResult(final ClientProtos.Result proto) {
|
public static Result toResult(final ClientProtos.Result proto) {
|
||||||
|
return toResult(proto, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a protocol buffer Result to a client Result
|
||||||
|
*
|
||||||
|
* @param proto the protocol buffer Result to convert
|
||||||
|
* @param decodeTags whether to decode tags into converted client Result
|
||||||
|
* When @decodeTags is set to true, it will decode all the tags from the
|
||||||
|
* response. These tags may contain some sensitive data like acl permissions,
|
||||||
|
* etc. Only the tools like Export, Import which needs to take backup needs to
|
||||||
|
* set it to true so that cell tags are persisted in backup.
|
||||||
|
* Refer to HBASE-25246 for more context.
|
||||||
|
* @return the converted client Result
|
||||||
|
*/
|
||||||
|
public static Result toResult(final ClientProtos.Result proto, boolean decodeTags) {
|
||||||
if (proto.hasExists()) {
|
if (proto.hasExists()) {
|
||||||
if (proto.getStale()) {
|
if (proto.getStale()) {
|
||||||
return proto.getExists() ? EMPTY_RESULT_EXISTS_TRUE_STALE :EMPTY_RESULT_EXISTS_FALSE_STALE;
|
return proto.getExists() ? EMPTY_RESULT_EXISTS_TRUE_STALE :EMPTY_RESULT_EXISTS_FALSE_STALE;
|
||||||
|
@ -1578,7 +1609,7 @@ public final class ProtobufUtil {
|
||||||
|
|
||||||
List<Cell> cells = new ArrayList<Cell>(values.size());
|
List<Cell> cells = new ArrayList<Cell>(values.size());
|
||||||
for (CellProtos.Cell c : values) {
|
for (CellProtos.Cell c : values) {
|
||||||
cells.add(toCell(c));
|
cells.add(toCell(c, decodeTags));
|
||||||
}
|
}
|
||||||
return Result.create(cells, null, proto.getStale(), proto.getPartial());
|
return Result.create(cells, null, proto.getStale(), proto.getPartial());
|
||||||
}
|
}
|
||||||
|
@ -1620,7 +1651,7 @@ public final class ProtobufUtil {
|
||||||
if (!values.isEmpty()){
|
if (!values.isEmpty()){
|
||||||
if (cells == null) cells = new ArrayList<Cell>(values.size());
|
if (cells == null) cells = new ArrayList<Cell>(values.size());
|
||||||
for (CellProtos.Cell c: values) {
|
for (CellProtos.Cell c: values) {
|
||||||
cells.add(toCell(c));
|
cells.add(toCell(c, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2802,7 +2833,7 @@ public final class ProtobufUtil {
|
||||||
throw new IOException(se);
|
throw new IOException(se);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CellProtos.Cell toCell(final Cell kv) {
|
public static CellProtos.Cell toCell(final Cell kv, boolean encodeTags) {
|
||||||
// Doing this is going to kill us if we do it for all data passed.
|
// Doing this is going to kill us if we do it for all data passed.
|
||||||
// St.Ack 20121205
|
// St.Ack 20121205
|
||||||
CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
|
CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
|
||||||
|
@ -2816,18 +2847,27 @@ public final class ProtobufUtil {
|
||||||
kvbuilder.setTimestamp(kv.getTimestamp());
|
kvbuilder.setTimestamp(kv.getTimestamp());
|
||||||
kvbuilder.setValue(ByteStringer.wrap(kv.getValueArray(), kv.getValueOffset(),
|
kvbuilder.setValue(ByteStringer.wrap(kv.getValueArray(), kv.getValueOffset(),
|
||||||
kv.getValueLength()));
|
kv.getValueLength()));
|
||||||
|
if (encodeTags && kv.getTagsLength() > 0) {
|
||||||
|
kvbuilder.setTags(ByteStringer.wrap(kv.getTagsArray(), kv.getTagsOffset(),
|
||||||
|
kv.getTagsLength()));
|
||||||
|
}
|
||||||
return kvbuilder.build();
|
return kvbuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Cell toCell(final CellProtos.Cell cell) {
|
public static Cell toCell(final CellProtos.Cell cell, boolean decodeTags) {
|
||||||
// Doing this is going to kill us if we do it for all data passed.
|
// Doing this is going to kill us if we do it for all data passed.
|
||||||
// St.Ack 20121205
|
// St.Ack 20121205
|
||||||
|
byte[] tags = null;
|
||||||
|
if (decodeTags && cell.hasTags()) {
|
||||||
|
tags = cell.getTags().toByteArray();
|
||||||
|
}
|
||||||
return CellUtil.createCell(cell.getRow().toByteArray(),
|
return CellUtil.createCell(cell.getRow().toByteArray(),
|
||||||
cell.getFamily().toByteArray(),
|
cell.getFamily().toByteArray(),
|
||||||
cell.getQualifier().toByteArray(),
|
cell.getQualifier().toByteArray(),
|
||||||
cell.getTimestamp(),
|
cell.getTimestamp(),
|
||||||
(byte)cell.getCellType().getNumber(),
|
KeyValue.Type.codeToType((byte)(cell.getCellType().getNumber())),
|
||||||
cell.getValue().toByteArray());
|
cell.getValue().toByteArray(),
|
||||||
|
tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) {
|
public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) {
|
||||||
|
|
|
@ -41,13 +41,13 @@ public class TestPBCell {
|
||||||
public void testRoundTrip() {
|
public void testRoundTrip() {
|
||||||
final Cell cell = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"),
|
final Cell cell = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"),
|
||||||
Bytes.toBytes("qual"), Bytes.toBytes("val"));
|
Bytes.toBytes("qual"), Bytes.toBytes("val"));
|
||||||
CellProtos.Cell c = ProtobufUtil.toCell(cell), decoded;
|
CellProtos.Cell c = ProtobufUtil.toCell(cell, false), decoded;
|
||||||
PositionedByteRange pbr = new SimplePositionedByteRange(c.getSerializedSize());
|
PositionedByteRange pbr = new SimplePositionedByteRange(c.getSerializedSize());
|
||||||
pbr.setPosition(0);
|
pbr.setPosition(0);
|
||||||
int encodedLength = CODEC.encode(pbr, c);
|
int encodedLength = CODEC.encode(pbr, c);
|
||||||
pbr.setPosition(0);
|
pbr.setPosition(0);
|
||||||
decoded = CODEC.decode(pbr);
|
decoded = CODEC.decode(pbr);
|
||||||
assertEquals(encodedLength, pbr.getPosition());
|
assertEquals(encodedLength, pbr.getPosition());
|
||||||
assertTrue(CellComparator.equals(cell, ProtobufUtil.toCell(decoded)));
|
assertTrue(CellComparator.equals(cell, ProtobufUtil.toCell(decoded, false)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
@ -495,6 +496,10 @@ public class Import {
|
||||||
// If there's a rename mapping for this CF, create a new KeyValue
|
// If there's a rename mapping for this CF, create a new KeyValue
|
||||||
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
||||||
if(newCfName != null) {
|
if(newCfName != null) {
|
||||||
|
List<Tag> tags = null;
|
||||||
|
if (kv.getTagsLength() > 0) {
|
||||||
|
tags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
|
||||||
|
}
|
||||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||||
kv.getRowOffset(), // row offset
|
kv.getRowOffset(), // row offset
|
||||||
kv.getRowLength(), // row length
|
kv.getRowLength(), // row length
|
||||||
|
@ -508,7 +513,8 @@ public class Import {
|
||||||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||||
kv.getValueArray(), // value buffer
|
kv.getValueArray(), // value buffer
|
||||||
kv.getValueOffset(), // value offset
|
kv.getValueOffset(), // value offset
|
||||||
kv.getValueLength()); // value length
|
kv.getValueLength(), // value length
|
||||||
|
tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return kv;
|
return kv;
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class ResultSerialization extends Configured implements Serialization<Res
|
||||||
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
|
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
|
||||||
ProtobufUtil.mergeDelimitedFrom(builder, in);
|
ProtobufUtil.mergeDelimitedFrom(builder, in);
|
||||||
ClientProtos.Result proto = builder.build();
|
ClientProtos.Result proto = builder.build();
|
||||||
return ProtobufUtil.toResult(proto);
|
return ProtobufUtil.toResult(proto, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -156,7 +156,7 @@ public class ResultSerialization extends Configured implements Serialization<Res
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serialize(Result result) throws IOException {
|
public void serialize(Result result) throws IOException {
|
||||||
ProtobufUtil.toResult(result).writeDelimitedTo(out);
|
ProtobufUtil.toResult(result, true).writeDelimitedTo(out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
@ -49,6 +52,17 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagRewriteCell;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
@ -106,6 +120,9 @@ public class TestImportExport {
|
||||||
private static final String EXPORT_BATCH_SIZE = "100";
|
private static final String EXPORT_BATCH_SIZE = "100";
|
||||||
|
|
||||||
private static long now = System.currentTimeMillis();
|
private static long now = System.currentTimeMillis();
|
||||||
|
public static final byte TEST_TAG_TYPE = (byte) (65);
|
||||||
|
public static final String TEST_ATTR = "source_op";
|
||||||
|
public static final String TEST_TAG = "test_tag";
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
||||||
|
@ -733,4 +750,198 @@ public class TestImportExport {
|
||||||
return isVisited;
|
return isVisited;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add cell tags to delete mutations, run export and import tool and
|
||||||
|
* verify that tags are present in import table also.
|
||||||
|
* @throws Throwable throws Throwable.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTagsAddition() throws Throwable {
|
||||||
|
final TableName exportTable = TableName.valueOf("exportWithTestTagsAddition");
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(exportTable)
|
||||||
|
.addCoprocessor(MetadataController.class.getName());
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
|
.setMaxVersions(5)
|
||||||
|
.setKeepDeletedCells(true));
|
||||||
|
|
||||||
|
UTIL.getHBaseAdmin().createTable(desc);
|
||||||
|
Table exportT = UTIL.getConnection().getTable(exportTable);
|
||||||
|
|
||||||
|
//Add first version of QUAL
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
exportT.put(p);
|
||||||
|
|
||||||
|
//Add Delete family marker
|
||||||
|
Delete d = new Delete(ROW1, now+3);
|
||||||
|
// Add test attribute to delete mutation.
|
||||||
|
d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
|
||||||
|
exportT.delete(d);
|
||||||
|
|
||||||
|
// Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
|
||||||
|
// will use KeyValueCodecWithTags.
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + Export.RAW_SCAN + "=true",
|
||||||
|
// This will make sure that codec will encode and decode tags in rpc call.
|
||||||
|
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||||
|
exportTable.getNameAsString(),
|
||||||
|
FQ_OUTPUT_DIR,
|
||||||
|
"1000", // max number of key versions per key to export
|
||||||
|
};
|
||||||
|
assertTrue(runExport(args));
|
||||||
|
// Assert tag exists in exportTable
|
||||||
|
checkWhetherTagExists(exportTable, true);
|
||||||
|
|
||||||
|
// Create an import table with MetadataController.
|
||||||
|
final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
|
||||||
|
HTableDescriptor importTableDesc = new HTableDescriptor(importTable)
|
||||||
|
.addCoprocessor(MetadataController.class.getName());
|
||||||
|
importTableDesc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
|
.setMaxVersions(5)
|
||||||
|
.setKeepDeletedCells(true));
|
||||||
|
UTIL.getHBaseAdmin().createTable(importTableDesc);
|
||||||
|
|
||||||
|
// Run import tool.
|
||||||
|
args = new String[] {
|
||||||
|
// This will make sure that codec will encode and decode tags in rpc call.
|
||||||
|
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||||
|
importTable.getNameAsString(),
|
||||||
|
FQ_OUTPUT_DIR
|
||||||
|
};
|
||||||
|
assertTrue(runImport(args));
|
||||||
|
// Make sure that tags exists in imported table.
|
||||||
|
checkWhetherTagExists(importTable, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
|
||||||
|
List<Cell> values = new ArrayList<>();
|
||||||
|
for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
// Make sure to set rawScan to true so that we will get Delete Markers.
|
||||||
|
scan.setRaw(true);
|
||||||
|
scan.setMaxVersions();
|
||||||
|
scan.withStartRow(ROW1);
|
||||||
|
// Need to use RegionScanner instead of table#getScanner since the latter will
|
||||||
|
// not return tags since it will go through rpc layer and remove tags intentionally.
|
||||||
|
RegionScanner scanner = region.getScanner(scan);
|
||||||
|
scanner.next(values);
|
||||||
|
if (!values.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
boolean deleteFound = false;
|
||||||
|
for (Cell cell: values) {
|
||||||
|
if (CellUtil.isDelete(cell)) {
|
||||||
|
deleteFound = true;
|
||||||
|
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
// If tagExists flag is true then validate whether tag contents are as expected.
|
||||||
|
if (tagExists) {
|
||||||
|
Assert.assertEquals(1, tags.size());
|
||||||
|
for (Tag tag : tags) {
|
||||||
|
Assert.assertEquals(TEST_TAG, Bytes.toStringBinary(tag.getValue()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If tagExists flag is disabled then check for 0 size tags.
|
||||||
|
assertEquals(0, tags.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(deleteFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
This co-proc will add a cell tag to delete mutation.
|
||||||
|
*/
|
||||||
|
public static class MetadataController
|
||||||
|
extends BaseRegionObserver /*implements CoprocessorService*/ {
|
||||||
|
@Override
|
||||||
|
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
|
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||||
|
if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < miniBatchOp.size(); i++) {
|
||||||
|
Mutation m = miniBatchOp.getOperation(i);
|
||||||
|
if (!(m instanceof Delete)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
|
||||||
|
if (sourceOpAttr == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Tag sourceOpTag = new Tag(TEST_TAG_TYPE, sourceOpAttr);
|
||||||
|
List<Cell> updatedCells = new ArrayList<>();
|
||||||
|
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance(); ) {
|
||||||
|
Cell cell = cellScanner.current();
|
||||||
|
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
tags.add(sourceOpTag);
|
||||||
|
Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags));
|
||||||
|
updatedCells.add(updatedCell);
|
||||||
|
}
|
||||||
|
m.getFamilyCellMap().clear();
|
||||||
|
// Clear and add new Cells to the Mutation.
|
||||||
|
for (Cell cell : updatedCells) {
|
||||||
|
Delete d = (Delete) m;
|
||||||
|
d.addDeleteMarker(cell);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string
|
||||||
|
* This means it will use no Codec. Make sure that we don't return Tags in response.
|
||||||
|
* @throws Exception Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTagsWithEmptyCodec() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testTagsWithEmptyCodec");
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName)
|
||||||
|
.addCoprocessor(MetadataController.class.getName());
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||||
|
.setMaxVersions(5)
|
||||||
|
.setKeepDeletedCells(true));
|
||||||
|
|
||||||
|
UTIL.getHBaseAdmin().createTable(desc);
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
conf.set(RPC_CODEC_CONF_KEY, "");
|
||||||
|
conf.set(DEFAULT_CODEC_CLASS, "");
|
||||||
|
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||||
|
Table table = connection.getTable(tableName)) {
|
||||||
|
//Add first version of QUAL
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
table.put(p);
|
||||||
|
|
||||||
|
//Add Delete family marker
|
||||||
|
Delete d = new Delete(ROW1, now+3);
|
||||||
|
// Add test attribute to delete mutation.
|
||||||
|
d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
|
||||||
|
table.delete(d);
|
||||||
|
|
||||||
|
// Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
|
||||||
|
// empty Codec and it shouldn't encode/decode tags.
|
||||||
|
Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
int count = 0;
|
||||||
|
Result result;
|
||||||
|
while ((result = scanner.next()) != null) {
|
||||||
|
List<Cell> cells = result.listCells();
|
||||||
|
assertEquals(2, cells.size());
|
||||||
|
Cell cell = cells.get(0);
|
||||||
|
assertTrue(CellUtil.isDelete(cell));
|
||||||
|
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
assertEquals(0, tags.size());
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(1, count);
|
||||||
|
} finally {
|
||||||
|
UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.hbase.protobuf;
|
package org.apache.hadoop.hbase.protobuf;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -29,12 +31,14 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||||
|
@ -55,6 +59,9 @@ import org.junit.experimental.categories.Category;
|
||||||
*/
|
*/
|
||||||
@Category(SmallTests.class)
|
@Category(SmallTests.class)
|
||||||
public class TestProtobufUtil {
|
public class TestProtobufUtil {
|
||||||
|
private static final String TAG_STR = "tag-1";
|
||||||
|
private static final byte TAG_TYPE = (byte)10;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testException() throws IOException {
|
public void testException() throws IOException {
|
||||||
NameBytesPair.Builder builder = NameBytesPair.newBuilder();
|
NameBytesPair.Builder builder = NameBytesPair.newBuilder();
|
||||||
|
@ -379,4 +386,91 @@ public class TestProtobufUtil {
|
||||||
assertEquals(serverName, rs.getServerName());
|
assertEquals(serverName, rs.getServerName());
|
||||||
assertEquals(rs.getState(), RegionState.State.OPEN);
|
assertEquals(rs.getState(), RegionState.State.OPEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||||
|
* {@link ProtobufUtil#toCell(CellProtos.Cell, boolean)} conversion
|
||||||
|
* methods when it contains tags and encode/decode tags is set to true.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCellConversionWithTags() {
|
||||||
|
Cell cell = getCellWithTags();
|
||||||
|
CellProtos.Cell protoCell = ProtobufUtil.toCell(cell, true);
|
||||||
|
assertNotNull(protoCell);
|
||||||
|
|
||||||
|
Cell decodedCell = getCellFromProtoResult(protoCell, true);
|
||||||
|
List<Tag> decodedTags = Tag.asList(decodedCell.getTagsArray(), decodedCell.getTagsOffset(),
|
||||||
|
decodedCell.getTagsLength());
|
||||||
|
assertEquals(1, decodedTags.size());
|
||||||
|
Tag decodedTag = decodedTags.get(0);
|
||||||
|
assertEquals(TAG_TYPE, decodedTag.getType());
|
||||||
|
assertEquals(TAG_STR, Bytes.toStringBinary(decodedTag.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Cell getCellWithTags() {
|
||||||
|
Tag tag = new Tag(TAG_TYPE, TAG_STR);
|
||||||
|
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("f1"),
|
||||||
|
Bytes.toBytes("q1"), 10L, Bytes.toBytes("value1"), new Tag[] {tag});
|
||||||
|
return kv;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Cell getCellFromProtoResult(CellProtos.Cell protoCell, boolean decodeTags) {
|
||||||
|
return ProtobufUtil.toCell(protoCell, decodeTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||||
|
* {@link ProtobufUtil#toCell(CellProtos.Cell, boolean)} conversion
|
||||||
|
* methods when it contains tags and encode/decode tags is set to false.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCellConversionWithoutTags() {
|
||||||
|
Cell cell = getCellWithTags();
|
||||||
|
CellProtos.Cell protoCell =
|
||||||
|
ProtobufUtil.toCell(cell, false);
|
||||||
|
assertNotNull(protoCell);
|
||||||
|
|
||||||
|
Cell decodedCell = getCellFromProtoResult(protoCell, false);
|
||||||
|
List<Tag> decodedTags = Tag.asList(decodedCell.getTagsArray(), decodedCell.getTagsOffset(),
|
||||||
|
decodedCell.getTagsLength());
|
||||||
|
assertEquals(0, decodedTags.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||||
|
* {@link ProtobufUtil#toCell(CellProtos.Cell, boolean)} conversion
|
||||||
|
* methods when it contains tags and encoding of tags is set to false
|
||||||
|
* and decoding of tags is set to true.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTagEncodeFalseDecodeTrue() {
|
||||||
|
Cell cell = getCellWithTags();
|
||||||
|
CellProtos.Cell protoCell =
|
||||||
|
ProtobufUtil.toCell(cell, false);
|
||||||
|
assertNotNull(protoCell);
|
||||||
|
|
||||||
|
Cell decodedCell = getCellFromProtoResult(protoCell, true);
|
||||||
|
List<Tag> decodedTags = Tag.asList(decodedCell.getTagsArray(), decodedCell.getTagsOffset(),
|
||||||
|
decodedCell.getTagsLength());
|
||||||
|
assertEquals(0, decodedTags.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||||
|
* {@link ProtobufUtil#toCell(CellProtos.Cell, boolean)} conversion
|
||||||
|
* methods when it contains tags and encoding of tags is set to true
|
||||||
|
* and decoding of tags is set to false.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTagEncodeTrueDecodeFalse() {
|
||||||
|
Cell cell = getCellWithTags();
|
||||||
|
CellProtos.Cell protoCell =
|
||||||
|
ProtobufUtil.toCell(cell, true);
|
||||||
|
assertNotNull(protoCell);
|
||||||
|
|
||||||
|
Cell decodedCell = getCellFromProtoResult(protoCell, false);
|
||||||
|
List<Tag> decodedTags = Tag.asList(decodedCell.getTagsArray(), decodedCell.getTagsOffset(),
|
||||||
|
decodedCell.getTagsLength());
|
||||||
|
assertEquals(0, decodedTags.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -591,6 +591,9 @@ $ bin/hbase org.apache.hadoop.hbase.mapreduce.Export <tablename> <outputdir> [<v
|
||||||
|
|
||||||
By default, the `Export` tool only exports the newest version of a given cell, regardless of the number of versions stored. To export more than one version, replace *_<versions>_* with the desired number of versions.
|
By default, the `Export` tool only exports the newest version of a given cell, regardless of the number of versions stored. To export more than one version, replace *_<versions>_* with the desired number of versions.
|
||||||
|
|
||||||
|
For mapreduce based Export, if you want to export cell tags then set the following config property
|
||||||
|
`hbase.client.rpc.codec` to `org.apache.hadoop.hbase.codec.KeyValueCodecWithTags`
|
||||||
|
|
||||||
Note: caching for the input Scan is configured via `hbase.client.scanner.caching` in the job configuration.
|
Note: caching for the input Scan is configured via `hbase.client.scanner.caching` in the job configuration.
|
||||||
|
|
||||||
=== Import
|
=== Import
|
||||||
|
@ -608,6 +611,9 @@ To import 0.94 exported files in a 0.96 cluster or onwards, you need to set syst
|
||||||
$ bin/hbase -Dhbase.import.version=0.94 org.apache.hadoop.hbase.mapreduce.Import <tablename> <inputdir>
|
$ bin/hbase -Dhbase.import.version=0.94 org.apache.hadoop.hbase.mapreduce.Import <tablename> <inputdir>
|
||||||
----
|
----
|
||||||
|
|
||||||
|
If you want to import cell tags then set the following config property
|
||||||
|
`hbase.client.rpc.codec` to `org.apache.hadoop.hbase.codec.KeyValueCodecWithTags`
|
||||||
|
|
||||||
=== ImportTsv
|
=== ImportTsv
|
||||||
|
|
||||||
ImportTsv is a utility that will load data in TSV format into HBase.
|
ImportTsv is a utility that will load data in TSV format into HBase.
|
||||||
|
|
Loading…
Reference in New Issue