HBASE-25246 Backup/Restore hbase cell tags
Closes #2745 Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
c327680125
commit
74d68180e6
|
@ -1436,6 +1436,21 @@ public final class ProtobufUtil {
|
|||
* @return the converted protocol buffer Result
|
||||
*/
|
||||
public static ClientProtos.Result toResult(final Result result) {
|
||||
return toResult(result, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a client Result to a protocol buffer Result
|
||||
* @param result the client Result to convert
|
||||
* @param encodeTags whether to includeTags in converted protobuf result or not
|
||||
* When @encodeTags is set to true, it will return all the tags in the response.
|
||||
* These tags may contain some sensitive data like acl permissions, etc.
|
||||
* Only the tools like Export, Import which needs to take backup needs to set
|
||||
* it to true so that cell tags are persisted in backup.
|
||||
* Refer to HBASE-25246 for more context.
|
||||
* @return the converted protocol buffer Result
|
||||
*/
|
||||
public static ClientProtos.Result toResult(final Result result, boolean encodeTags) {
|
||||
if (result.getExists() != null) {
|
||||
return toResult(result.getExists(), result.isStale());
|
||||
}
|
||||
|
@ -1447,7 +1462,7 @@ public final class ProtobufUtil {
|
|||
|
||||
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
|
||||
for (Cell c : cells) {
|
||||
builder.addCell(toCell(c));
|
||||
builder.addCell(toCell(c, encodeTags));
|
||||
}
|
||||
|
||||
builder.setStale(result.isStale());
|
||||
|
@ -1494,6 +1509,22 @@ public final class ProtobufUtil {
|
|||
* @return the converted client Result
|
||||
*/
|
||||
public static Result toResult(final ClientProtos.Result proto) {
|
||||
return toResult(proto, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a protocol buffer Result to a client Result
|
||||
*
|
||||
* @param proto the protocol buffer Result to convert
|
||||
* @param decodeTags whether to decode tags into converted client Result
|
||||
* When @decodeTags is set to true, it will decode all the tags from the
|
||||
* response. These tags may contain some sensitive data like acl permissions,
|
||||
* etc. Only the tools like Export, Import which needs to take backup needs to
|
||||
* set it to true so that cell tags are persisted in backup.
|
||||
* Refer to HBASE-25246 for more context.
|
||||
* @return the converted client Result
|
||||
*/
|
||||
public static Result toResult(final ClientProtos.Result proto, boolean decodeTags) {
|
||||
if (proto.hasExists()) {
|
||||
if (proto.getStale()) {
|
||||
return proto.getExists() ? EMPTY_RESULT_EXISTS_TRUE_STALE :EMPTY_RESULT_EXISTS_FALSE_STALE;
|
||||
|
@ -1509,7 +1540,7 @@ public final class ProtobufUtil {
|
|||
List<Cell> cells = new ArrayList<>(values.size());
|
||||
ExtendedCellBuilder builder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
|
||||
for (CellProtos.Cell c : values) {
|
||||
cells.add(toCell(builder, c));
|
||||
cells.add(toCell(builder, c, decodeTags));
|
||||
}
|
||||
return Result.create(cells, null, proto.getStale(), proto.getPartial());
|
||||
}
|
||||
|
@ -1552,7 +1583,7 @@ public final class ProtobufUtil {
|
|||
if (cells == null) cells = new ArrayList<>(values.size());
|
||||
ExtendedCellBuilder builder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
|
||||
for (CellProtos.Cell c: values) {
|
||||
cells.add(toCell(builder, c));
|
||||
cells.add(toCell(builder, c, false));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2000,7 +2031,7 @@ public final class ProtobufUtil {
|
|||
throw new IOException(se);
|
||||
}
|
||||
|
||||
public static CellProtos.Cell toCell(final Cell kv) {
|
||||
public static CellProtos.Cell toCell(final Cell kv, boolean encodeTags) {
|
||||
// Doing this is going to kill us if we do it for all data passed.
|
||||
// St.Ack 20121205
|
||||
CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
|
||||
|
@ -2015,7 +2046,10 @@ public final class ProtobufUtil {
|
|||
kvbuilder.setTimestamp(kv.getTimestamp());
|
||||
kvbuilder.setValue(wrap(((ByteBufferExtendedCell) kv).getValueByteBuffer(),
|
||||
((ByteBufferExtendedCell) kv).getValuePosition(), kv.getValueLength()));
|
||||
// TODO : Once tags become first class then we may have to set tags to kvbuilder.
|
||||
if (encodeTags) {
|
||||
kvbuilder.setTags(wrap(((ByteBufferExtendedCell) kv).getTagsByteBuffer(),
|
||||
((ByteBufferExtendedCell) kv).getTagsPosition(), kv.getTagsLength()));
|
||||
}
|
||||
} else {
|
||||
kvbuilder.setRow(
|
||||
UnsafeByteOperations.unsafeWrap(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
|
||||
|
@ -2027,6 +2061,10 @@ public final class ProtobufUtil {
|
|||
kvbuilder.setTimestamp(kv.getTimestamp());
|
||||
kvbuilder.setValue(UnsafeByteOperations.unsafeWrap(kv.getValueArray(), kv.getValueOffset(),
|
||||
kv.getValueLength()));
|
||||
if (encodeTags) {
|
||||
kvbuilder.setTags(UnsafeByteOperations.unsafeWrap(kv.getTagsArray(), kv.getTagsOffset(),
|
||||
kv.getTagsLength()));
|
||||
}
|
||||
}
|
||||
return kvbuilder.build();
|
||||
}
|
||||
|
@ -2038,15 +2076,19 @@ public final class ProtobufUtil {
|
|||
return UnsafeByteOperations.unsafeWrap(dup);
|
||||
}
|
||||
|
||||
public static Cell toCell(ExtendedCellBuilder cellBuilder, final CellProtos.Cell cell) {
|
||||
return cellBuilder.clear()
|
||||
.setRow(cell.getRow().toByteArray())
|
||||
.setFamily(cell.getFamily().toByteArray())
|
||||
.setQualifier(cell.getQualifier().toByteArray())
|
||||
.setTimestamp(cell.getTimestamp())
|
||||
.setType((byte) cell.getCellType().getNumber())
|
||||
.setValue(cell.getValue().toByteArray())
|
||||
.build();
|
||||
public static Cell toCell(ExtendedCellBuilder cellBuilder, final CellProtos.Cell cell,
|
||||
boolean decodeTags) {
|
||||
ExtendedCellBuilder builder = cellBuilder.clear()
|
||||
.setRow(cell.getRow().toByteArray())
|
||||
.setFamily(cell.getFamily().toByteArray())
|
||||
.setQualifier(cell.getQualifier().toByteArray())
|
||||
.setTimestamp(cell.getTimestamp())
|
||||
.setType((byte) cell.getCellType().getNumber())
|
||||
.setValue(cell.getValue().toByteArray());
|
||||
if (decodeTags && cell.hasTags()) {
|
||||
builder.setTags(cell.getTags().toByteArray());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) {
|
||||
|
|
|
@ -18,17 +18,24 @@
|
|||
package org.apache.hadoop.hbase.shaded.protobuf;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilder;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -63,7 +70,8 @@ public class TestProtobufUtil {
|
|||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProtobufUtil.class);
|
||||
|
||||
private static final String TAG_STR = "tag-1";
|
||||
private static final byte TAG_TYPE = (byte)10;
|
||||
public TestProtobufUtil() {
|
||||
}
|
||||
|
||||
|
@ -271,9 +279,10 @@ public class TestProtobufUtil {
|
|||
ByteBuffer dbb = ByteBuffer.allocateDirect(arr.length);
|
||||
dbb.put(arr);
|
||||
ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(dbb, kv1.getLength(), kv2.getLength());
|
||||
CellProtos.Cell cell = ProtobufUtil.toCell(offheapKV);
|
||||
CellProtos.Cell cell = ProtobufUtil.toCell(offheapKV, false);
|
||||
Cell newOffheapKV =
|
||||
ProtobufUtil.toCell(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell);
|
||||
ProtobufUtil.toCell(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell,
|
||||
false);
|
||||
assertTrue(CellComparatorImpl.COMPARATOR.compare(offheapKV, newOffheapKV) == 0);
|
||||
}
|
||||
|
||||
|
@ -479,4 +488,92 @@ public class TestProtobufUtil {
|
|||
+ "\"sharedLockCount\":0"
|
||||
+ "}]", lockJson);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||
* {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell, boolean)} conversion
|
||||
* methods when it contains tags and encode/decode tags is set to true.
|
||||
*/
|
||||
@Test
|
||||
public void testCellConversionWithTags() {
|
||||
|
||||
Cell cell = getCellWithTags();
|
||||
CellProtos.Cell protoCell = ProtobufUtil.toCell(cell, true);
|
||||
assertNotNull(protoCell);
|
||||
|
||||
Cell decodedCell = getCellFromProtoResult(protoCell, true);
|
||||
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||
assertEquals(1, decodedTags.size());
|
||||
Tag decodedTag = decodedTags.get(0);
|
||||
assertEquals(TAG_TYPE, decodedTag.getType());
|
||||
assertEquals(TAG_STR, Tag.getValueAsString(decodedTag));
|
||||
}
|
||||
|
||||
private Cell getCellWithTags() {
|
||||
Tag tag = new ArrayBackedTag(TAG_TYPE, TAG_STR);
|
||||
ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||
cellBuilder.setRow(Bytes.toBytes("row1"));
|
||||
cellBuilder.setFamily(Bytes.toBytes("f1"));
|
||||
cellBuilder.setQualifier(Bytes.toBytes("q1"));
|
||||
cellBuilder.setValue(Bytes.toBytes("value1"));
|
||||
cellBuilder.setType(Cell.Type.Delete);
|
||||
cellBuilder.setTags(Collections.singletonList(tag));
|
||||
return cellBuilder.build();
|
||||
}
|
||||
|
||||
private Cell getCellFromProtoResult(CellProtos.Cell protoCell, boolean decodeTags) {
|
||||
ExtendedCellBuilder decodedBuilder =
|
||||
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||
return ProtobufUtil.toCell(decodedBuilder, protoCell, decodeTags);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||
* {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell, boolean)} conversion
|
||||
* methods when it contains tags and encode/decode tags is set to false.
|
||||
*/
|
||||
@Test
|
||||
public void testCellConversionWithoutTags() {
|
||||
Cell cell = getCellWithTags();
|
||||
CellProtos.Cell protoCell = ProtobufUtil.toCell(cell, false);
|
||||
assertNotNull(protoCell);
|
||||
|
||||
Cell decodedCell = getCellFromProtoResult(protoCell, false);
|
||||
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||
assertEquals(0, decodedTags.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||
* {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell, boolean)} conversion
|
||||
* methods when it contains tags and encoding of tags is set to false
|
||||
* and decoding of tags is set to true.
|
||||
*/
|
||||
@Test
|
||||
public void testTagEncodeFalseDecodeTrue() {
|
||||
Cell cell = getCellWithTags();
|
||||
CellProtos.Cell protoCell = ProtobufUtil.toCell(cell, false);
|
||||
assertNotNull(protoCell);
|
||||
|
||||
Cell decodedCell = getCellFromProtoResult(protoCell, true);
|
||||
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||
assertEquals(0, decodedTags.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link ProtobufUtil#toCell(Cell, boolean)} and
|
||||
* {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell, boolean)} conversion
|
||||
* methods when it contains tags and encoding of tags is set to true
|
||||
* and decoding of tags is set to false.
|
||||
*/
|
||||
@Test
|
||||
public void testTagEncodeTrueDecodeFalse() {
|
||||
Cell cell = getCellWithTags();
|
||||
CellProtos.Cell protoCell = ProtobufUtil.toCell(cell, true);
|
||||
assertNotNull(protoCell);
|
||||
|
||||
Cell decodedCell = getCellFromProtoResult(protoCell, false);
|
||||
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||
assertEquals(0, decodedTags.size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -511,6 +512,7 @@ public class Import extends Configured implements Tool {
|
|||
// If there's a rename mapping for this CF, create a new KeyValue
|
||||
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
||||
if (newCfName != null) {
|
||||
List<Tag> tags = PrivateCellUtil.getTags(kv);
|
||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||
kv.getRowOffset(), // row offset
|
||||
kv.getRowLength(), // row length
|
||||
|
@ -524,7 +526,8 @@ public class Import extends Configured implements Tool {
|
|||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||
kv.getValueArray(), // value buffer
|
||||
kv.getValueOffset(), // value offset
|
||||
kv.getValueLength()); // value length
|
||||
kv.getValueLength(), // value length
|
||||
tags.size() == 0 ? null: tags);
|
||||
}
|
||||
}
|
||||
return kv;
|
||||
|
|
|
@ -128,7 +128,7 @@ public class ResultSerialization extends Configured implements Serialization<Res
|
|||
@Override
|
||||
public Result deserialize(Result mutation) throws IOException {
|
||||
ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
|
||||
return ProtobufUtil.toResult(proto);
|
||||
return ProtobufUtil.toResult(proto, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -152,7 +152,7 @@ public class ResultSerialization extends Configured implements Serialization<Res
|
|||
|
||||
@Override
|
||||
public void serialize(Result result) throws IOException {
|
||||
ProtobufUtil.toResult(result).writeDelimitedTo(out);
|
||||
ProtobufUtil.toResult(result, true).writeDelimitedTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
|
||||
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -34,10 +36,13 @@ import java.net.URL;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -46,10 +51,14 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -58,11 +67,18 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||
|
@ -117,6 +133,9 @@ public class TestImportExport {
|
|||
private static final long now = System.currentTimeMillis();
|
||||
private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
|
||||
private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
|
||||
public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
|
||||
public static final String TEST_ATTR = "source_op";
|
||||
public static final String TEST_TAG = "test_tag";
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Throwable {
|
||||
|
@ -801,4 +820,207 @@ public class TestImportExport {
|
|||
return isVisited;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add cell tags to delete mutations, run export and import tool and
|
||||
* verify that tags are present in import table also.
|
||||
* @throws Throwable throws Throwable.
|
||||
*/
|
||||
@Test
|
||||
public void testTagsAddition() throws Throwable {
|
||||
final TableName exportTable = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor desc = TableDescriptorBuilder
|
||||
.newBuilder(exportTable)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.setCoprocessor(MetadataController.class.getName())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(desc);
|
||||
|
||||
Table exportT = UTIL.getConnection().getTable(exportTable);
|
||||
|
||||
//Add first version of QUAL
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
exportT.put(p);
|
||||
|
||||
//Add Delete family marker
|
||||
Delete d = new Delete(ROW1, now+3);
|
||||
// Add test attribute to delete mutation.
|
||||
d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
|
||||
exportT.delete(d);
|
||||
|
||||
// Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
|
||||
// will use KeyValueCodecWithTags.
|
||||
String[] args = new String[] {
|
||||
"-D" + ExportUtils.RAW_SCAN + "=true",
|
||||
// This will make sure that codec will encode and decode tags in rpc call.
|
||||
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||
exportTable.getNameAsString(),
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
assertTrue(runExport(args));
|
||||
// Assert tag exists in exportTable
|
||||
checkWhetherTagExists(exportTable, true);
|
||||
|
||||
// Create an import table with MetadataController.
|
||||
final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
|
||||
TableDescriptor importTableDesc = TableDescriptorBuilder
|
||||
.newBuilder(importTable)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.setCoprocessor(MetadataController.class.getName())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(importTableDesc);
|
||||
|
||||
// Run import tool.
|
||||
args = new String[] {
|
||||
// This will make sure that codec will encode and decode tags in rpc call.
|
||||
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||
importTable.getNameAsString(),
|
||||
FQ_OUTPUT_DIR
|
||||
};
|
||||
assertTrue(runImport(args));
|
||||
// Make sure that tags exists in imported table.
|
||||
checkWhetherTagExists(importTable, true);
|
||||
}
|
||||
|
||||
private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
|
||||
List<Cell> values = new ArrayList<>();
|
||||
for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
|
||||
Scan scan = new Scan();
|
||||
// Make sure to set rawScan to true so that we will get Delete Markers.
|
||||
scan.setRaw(true);
|
||||
scan.readAllVersions();
|
||||
scan.withStartRow(ROW1);
|
||||
// Need to use RegionScanner instead of table#getScanner since the latter will
|
||||
// not return tags since it will go through rpc layer and remove tags intentionally.
|
||||
RegionScanner scanner = region.getScanner(scan);
|
||||
scanner.next(values);
|
||||
if (!values.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
boolean deleteFound = false;
|
||||
for (Cell cell: values) {
|
||||
if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
|
||||
deleteFound = true;
|
||||
List<Tag> tags = PrivateCellUtil.getTags(cell);
|
||||
// If tagExists flag is true then validate whether tag contents are as expected.
|
||||
if (tagExists) {
|
||||
Assert.assertEquals(1, tags.size());
|
||||
for (Tag tag : tags) {
|
||||
Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag));
|
||||
}
|
||||
} else {
|
||||
// If tagExists flag is disabled then check for 0 size tags.
|
||||
assertEquals(0, tags.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(deleteFound);
|
||||
}
|
||||
|
||||
/*
|
||||
This co-proc will add a cell tag to delete mutation.
|
||||
*/
|
||||
public static class MetadataController implements RegionCoprocessor, RegionObserver {
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp)
|
||||
throws IOException {
|
||||
if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < miniBatchOp.size(); i++) {
|
||||
Mutation m = miniBatchOp.getOperation(i);
|
||||
if (!(m instanceof Delete)) {
|
||||
continue;
|
||||
}
|
||||
byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
|
||||
if (sourceOpAttr == null) {
|
||||
continue;
|
||||
}
|
||||
Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
|
||||
List<Cell> updatedCells = new ArrayList<>();
|
||||
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance(); ) {
|
||||
Cell cell = cellScanner.current();
|
||||
List<Tag> tags = PrivateCellUtil.getTags(cell);
|
||||
tags.add(sourceOpTag);
|
||||
Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
|
||||
updatedCells.add(updatedCell);
|
||||
}
|
||||
m.getFamilyCellMap().clear();
|
||||
// Clear and add new Cells to the Mutation.
|
||||
for (Cell cell : updatedCells) {
|
||||
Delete d = (Delete) m;
|
||||
d.add(cell);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string
|
||||
* This means it will use no Codec. Make sure that we don't return Tags in response.
|
||||
* @throws Exception Exception
|
||||
*/
|
||||
@Test
|
||||
public void testTagsWithEmptyCodec() throws Exception {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder
|
||||
.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||
.setMaxVersions(5)
|
||||
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||
.build())
|
||||
.setCoprocessor(MetadataController.class.getName())
|
||||
.build();
|
||||
UTIL.getAdmin().createTable(tableDesc);
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
conf.set(RPC_CODEC_CONF_KEY, "");
|
||||
conf.set(DEFAULT_CODEC_CLASS, "");
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||
Table table = connection.getTable(tableName)) {
|
||||
//Add first version of QUAL
|
||||
Put p = new Put(ROW1);
|
||||
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||
table.put(p);
|
||||
|
||||
//Add Delete family marker
|
||||
Delete d = new Delete(ROW1, now+3);
|
||||
// Add test attribute to delete mutation.
|
||||
d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
|
||||
table.delete(d);
|
||||
|
||||
// Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
|
||||
// empty Codec and it shouldn't encode/decode tags.
|
||||
Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
int count = 0;
|
||||
Result result;
|
||||
while ((result = scanner.next()) != null) {
|
||||
List<Cell> cells = result.listCells();
|
||||
assertEquals(2, cells.size());
|
||||
Cell cell = cells.get(0);
|
||||
assertTrue(CellUtil.isDelete(cell));
|
||||
List<Tag> tags = PrivateCellUtil.getTags(cell);
|
||||
assertEquals(0, tags.size());
|
||||
count++;
|
||||
}
|
||||
assertEquals(1, count);
|
||||
} finally {
|
||||
UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class MessageCodec implements Codec {
|
|||
|
||||
@Override
|
||||
protected Cell parseCell() throws IOException {
|
||||
return ProtobufUtil.toCell(cellBuilder, CellProtos.Cell.parseDelimitedFrom(this.in));
|
||||
return ProtobufUtil.toCell(cellBuilder, CellProtos.Cell.parseDelimitedFrom(this.in), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -735,6 +735,9 @@ specifying column families and applying filters during the export.
|
|||
|
||||
By default, the `Export` tool only exports the newest version of a given cell, regardless of the number of versions stored. To export more than one version, replace *_<versions>_* with the desired number of versions.
|
||||
|
||||
For mapreduce based Export, if you want to export cell tags then set the following config property
|
||||
`hbase.client.rpc.codec` to `org.apache.hadoop.hbase.codec.KeyValueCodecWithTags`
|
||||
|
||||
Note: caching for the input Scan is configured via `hbase.client.scanner.caching` in the job configuration.
|
||||
|
||||
[[import]]
|
||||
|
@ -755,6 +758,9 @@ To import 0.94 exported files in a 0.96 cluster or onwards, you need to set syst
|
|||
$ bin/hbase -Dhbase.import.version=0.94 org.apache.hadoop.hbase.mapreduce.Import <tablename> <inputdir>
|
||||
----
|
||||
|
||||
If you want to import cell tags then set the following config property
|
||||
`hbase.client.rpc.codec` to `org.apache.hadoop.hbase.codec.KeyValueCodecWithTags`
|
||||
|
||||
[[importtsv]]
|
||||
=== ImportTsv
|
||||
|
||||
|
|
Loading…
Reference in New Issue