HBASE-25246 Backup/Restore hbase cell tags
Closes #2706 Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
90e96d1a6e
commit
34721c42ec
|
@ -1971,7 +1971,8 @@ public final class ProtobufUtil {
|
||||||
kvbuilder.setTimestamp(kv.getTimestamp());
|
kvbuilder.setTimestamp(kv.getTimestamp());
|
||||||
kvbuilder.setValue(wrap(((ByteBufferExtendedCell) kv).getValueByteBuffer(),
|
kvbuilder.setValue(wrap(((ByteBufferExtendedCell) kv).getValueByteBuffer(),
|
||||||
((ByteBufferExtendedCell) kv).getValuePosition(), kv.getValueLength()));
|
((ByteBufferExtendedCell) kv).getValuePosition(), kv.getValueLength()));
|
||||||
// TODO : Once tags become first class then we may have to set tags to kvbuilder.
|
kvbuilder.setTags(wrap(((ByteBufferExtendedCell) kv).getTagsByteBuffer(),
|
||||||
|
((ByteBufferExtendedCell) kv).getTagsPosition(), kv.getTagsLength()));
|
||||||
} else {
|
} else {
|
||||||
kvbuilder.setRow(
|
kvbuilder.setRow(
|
||||||
UnsafeByteOperations.unsafeWrap(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
|
UnsafeByteOperations.unsafeWrap(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
|
||||||
|
@ -1983,6 +1984,8 @@ public final class ProtobufUtil {
|
||||||
kvbuilder.setTimestamp(kv.getTimestamp());
|
kvbuilder.setTimestamp(kv.getTimestamp());
|
||||||
kvbuilder.setValue(UnsafeByteOperations.unsafeWrap(kv.getValueArray(), kv.getValueOffset(),
|
kvbuilder.setValue(UnsafeByteOperations.unsafeWrap(kv.getValueArray(), kv.getValueOffset(),
|
||||||
kv.getValueLength()));
|
kv.getValueLength()));
|
||||||
|
kvbuilder.setTags(UnsafeByteOperations.unsafeWrap(kv.getTagsArray(), kv.getTagsOffset(),
|
||||||
|
kv.getTagsLength()));
|
||||||
}
|
}
|
||||||
return kvbuilder.build();
|
return kvbuilder.build();
|
||||||
}
|
}
|
||||||
|
@ -1995,14 +1998,17 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Cell toCell(ExtendedCellBuilder cellBuilder, final CellProtos.Cell cell) {
|
public static Cell toCell(ExtendedCellBuilder cellBuilder, final CellProtos.Cell cell) {
|
||||||
return cellBuilder.clear()
|
ExtendedCellBuilder builder = cellBuilder.clear()
|
||||||
.setRow(cell.getRow().toByteArray())
|
.setRow(cell.getRow().toByteArray())
|
||||||
.setFamily(cell.getFamily().toByteArray())
|
.setFamily(cell.getFamily().toByteArray())
|
||||||
.setQualifier(cell.getQualifier().toByteArray())
|
.setQualifier(cell.getQualifier().toByteArray())
|
||||||
.setTimestamp(cell.getTimestamp())
|
.setTimestamp(cell.getTimestamp())
|
||||||
.setType((byte) cell.getCellType().getNumber())
|
.setType((byte) cell.getCellType().getNumber())
|
||||||
.setValue(cell.getValue().toByteArray())
|
.setValue(cell.getValue().toByteArray());
|
||||||
.build();
|
if (cell.hasTags()) {
|
||||||
|
builder.setTags(cell.getTags().toByteArray());
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) {
|
public static HBaseProtos.NamespaceDescriptor toProtoNamespaceDescriptor(NamespaceDescriptor ns) {
|
||||||
|
|
|
@ -18,22 +18,30 @@
|
||||||
package org.apache.hadoop.hbase.shaded.protobuf;
|
package org.apache.hadoop.hbase.shaded.protobuf;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||||
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellBuilderType;
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
|
import org.apache.hadoop.hbase.ExtendedCellBuilder;
|
||||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -479,4 +487,40 @@ public class TestProtobufUtil {
|
||||||
+ "\"sharedLockCount\":0"
|
+ "\"sharedLockCount\":0"
|
||||||
+ "}]", lockJson);
|
+ "}]", lockJson);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ProtobufUtil#toCell(Cell)} and
|
||||||
|
* {@link ProtobufUtil#toCell(ExtendedCellBuilder, CellProtos.Cell)} conversion
|
||||||
|
* methods when it contains tags.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCellConversionWithTags() {
|
||||||
|
String tagStr = "tag-1";
|
||||||
|
byte tagType = (byte)10;
|
||||||
|
Tag tag = new ArrayBackedTag(tagType, tagStr);
|
||||||
|
|
||||||
|
ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||||
|
cellBuilder.setRow(Bytes.toBytes("row1"));
|
||||||
|
cellBuilder.setFamily(Bytes.toBytes("f1"));
|
||||||
|
cellBuilder.setQualifier(Bytes.toBytes("q1"));
|
||||||
|
cellBuilder.setValue(Bytes.toBytes("value1"));
|
||||||
|
cellBuilder.setType(Cell.Type.Delete);
|
||||||
|
cellBuilder.setTags(Collections.singletonList(tag));
|
||||||
|
Cell cell = cellBuilder.build();
|
||||||
|
|
||||||
|
ClientProtos.Result protoResult =
|
||||||
|
ProtobufUtil.toResult(Result.create(Collections.singletonList(cell)));
|
||||||
|
assertNotNull(protoResult);
|
||||||
|
assertEquals(1, protoResult.getCellCount());
|
||||||
|
|
||||||
|
CellProtos.Cell protoCell = protoResult.getCell(0);
|
||||||
|
ExtendedCellBuilder decodedBuilder =
|
||||||
|
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||||
|
Cell decodedCell = ProtobufUtil.toCell(decodedBuilder, protoCell);
|
||||||
|
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||||
|
assertEquals(1, decodedTags.size());
|
||||||
|
Tag decodedTag = decodedTags.get(0);
|
||||||
|
assertEquals(tagType, decodedTag.getType());
|
||||||
|
assertEquals(tagStr, Tag.getValueAsString(decodedTag));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
|
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
@ -710,6 +711,7 @@ public class Import extends Configured implements Tool {
|
||||||
// If there's a rename mapping for this CF, create a new KeyValue
|
// If there's a rename mapping for this CF, create a new KeyValue
|
||||||
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
||||||
if (newCfName != null) {
|
if (newCfName != null) {
|
||||||
|
List<Tag> tags = PrivateCellUtil.getTags(kv);
|
||||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||||
kv.getRowOffset(), // row offset
|
kv.getRowOffset(), // row offset
|
||||||
kv.getRowLength(), // row length
|
kv.getRowLength(), // row length
|
||||||
|
@ -723,7 +725,8 @@ public class Import extends Configured implements Tool {
|
||||||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||||
kv.getValueArray(), // value buffer
|
kv.getValueArray(), // value buffer
|
||||||
kv.getValueOffset(), // value offset
|
kv.getValueOffset(), // value offset
|
||||||
kv.getValueLength()); // value length
|
kv.getValueLength(), // value length
|
||||||
|
tags.size() == 0 ? null: tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return kv;
|
return kv;
|
||||||
|
|
|
@ -34,10 +34,13 @@ import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
@ -46,10 +49,12 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -58,11 +63,19 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
|
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||||
|
@ -117,6 +130,9 @@ public class TestImportExport {
|
||||||
private static final long now = System.currentTimeMillis();
|
private static final long now = System.currentTimeMillis();
|
||||||
private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
|
private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
|
||||||
private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
|
private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
|
||||||
|
public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
|
||||||
|
public static final String TEST_ATTR = "source_op";
|
||||||
|
public static final String TEST_TAG = "test_tag";
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Throwable {
|
public static void beforeClass() throws Throwable {
|
||||||
|
@ -804,4 +820,147 @@ public class TestImportExport {
|
||||||
return isVisited;
|
return isVisited;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add cell tags to delete mutations, run export and import tool and
|
||||||
|
* verify that tags are present in import table also.
|
||||||
|
* @throws Throwable throws Throwable.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTagsAddition() throws Throwable {
|
||||||
|
final TableName exportTable = TableName.valueOf(name.getMethodName());
|
||||||
|
TableDescriptor desc = TableDescriptorBuilder
|
||||||
|
.newBuilder(exportTable)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||||
|
.setMaxVersions(5)
|
||||||
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
|
.build())
|
||||||
|
.setCoprocessor(MetadataController.class.getName())
|
||||||
|
.build();
|
||||||
|
UTIL.getAdmin().createTable(desc);
|
||||||
|
|
||||||
|
Table exportT = UTIL.getConnection().getTable(exportTable);
|
||||||
|
|
||||||
|
//Add first version of QUAL
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.addColumn(FAMILYA, QUAL, now, QUAL);
|
||||||
|
exportT.put(p);
|
||||||
|
|
||||||
|
//Add Delete family marker
|
||||||
|
Delete d = new Delete(ROW1, now+3);
|
||||||
|
// Add test attribute to delete mutation.
|
||||||
|
d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
|
||||||
|
exportT.delete(d);
|
||||||
|
|
||||||
|
// Run export too with KeyValueCodecWithTags as Codec. This will ensure that export tool
|
||||||
|
// will use KeyValueCodecWithTags.
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + ExportUtils.RAW_SCAN + "=true",
|
||||||
|
// This will make sure that codec will encode and decode tags in rpc call.
|
||||||
|
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||||
|
exportTable.getNameAsString(),
|
||||||
|
FQ_OUTPUT_DIR,
|
||||||
|
"1000", // max number of key versions per key to export
|
||||||
|
};
|
||||||
|
assertTrue(runExport(args));
|
||||||
|
// Assert tag exists in exportTable
|
||||||
|
assertTagExists(exportTable);
|
||||||
|
|
||||||
|
// Create an import table with MetadataController.
|
||||||
|
final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
|
||||||
|
TableDescriptor importTableDesc = TableDescriptorBuilder
|
||||||
|
.newBuilder(importTable)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
|
||||||
|
.setMaxVersions(5)
|
||||||
|
.setKeepDeletedCells(KeepDeletedCells.TRUE)
|
||||||
|
.build())
|
||||||
|
.setCoprocessor(MetadataController.class.getName())
|
||||||
|
.build();
|
||||||
|
UTIL.getAdmin().createTable(importTableDesc);
|
||||||
|
|
||||||
|
// Run import tool.
|
||||||
|
args = new String[] {
|
||||||
|
// This will make sure that codec will encode and decode tags in rpc call.
|
||||||
|
"-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
|
||||||
|
importTable.getNameAsString(),
|
||||||
|
FQ_OUTPUT_DIR
|
||||||
|
};
|
||||||
|
assertTrue(runImport(args));
|
||||||
|
// Make sure that tags exists in imported table.
|
||||||
|
assertTagExists(importTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTagExists(TableName table) throws IOException {
|
||||||
|
List<Cell> values = new ArrayList<>();
|
||||||
|
for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
// Make sure to set rawScan to true so that we will get Delete Markers.
|
||||||
|
scan.setRaw(true);
|
||||||
|
scan.readAllVersions();
|
||||||
|
scan.withStartRow(ROW1);
|
||||||
|
// Need to use RegionScanner instead of table#getScanner since the latter will
|
||||||
|
// not return tags since it will go through rpc layer and remove tags intentionally.
|
||||||
|
RegionScanner scanner = region.getScanner(scan);
|
||||||
|
scanner.next(values);
|
||||||
|
if (!values.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
boolean deleteFound = false;
|
||||||
|
for (Cell cell: values) {
|
||||||
|
if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
|
||||||
|
deleteFound = true;
|
||||||
|
List<Tag> tags = PrivateCellUtil.getTags(cell);
|
||||||
|
Assert.assertEquals(1, tags.size());
|
||||||
|
for (Tag tag : tags) {
|
||||||
|
Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(deleteFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
This co-proc will add a cell tag to delete mutation.
|
||||||
|
*/
|
||||||
|
public static class MetadataController implements RegionCoprocessor, RegionObserver {
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
|
MiniBatchOperationInProgress<Mutation> miniBatchOp)
|
||||||
|
throws IOException {
|
||||||
|
if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < miniBatchOp.size(); i++) {
|
||||||
|
Mutation m = miniBatchOp.getOperation(i);
|
||||||
|
if (!(m instanceof Delete)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
|
||||||
|
if (sourceOpAttr == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
|
||||||
|
List<Cell> updatedCells = new ArrayList<>();
|
||||||
|
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance(); ) {
|
||||||
|
Cell cell = cellScanner.current();
|
||||||
|
List<Tag> tags = PrivateCellUtil.getTags(cell);
|
||||||
|
tags.add(sourceOpTag);
|
||||||
|
Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
|
||||||
|
updatedCells.add(updatedCell);
|
||||||
|
}
|
||||||
|
m.getFamilyCellMap().clear();
|
||||||
|
// Clear and add new Cells to the Mutation.
|
||||||
|
for (Cell cell : updatedCells) {
|
||||||
|
Delete d = (Delete) m;
|
||||||
|
d.add(cell);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue