HBASE-6942 Endpoint implementation for bulk delete rows (Anoop)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1401330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-10-23 15:59:57 +00:00
parent 871618c19b
commit 940b37dc24
4 changed files with 2285 additions and 0 deletions

View File

@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
* conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s)
* or version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s)
* getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers
* which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be
* deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions
* which needs to be deleted.When a timestamp is passed only one version at that timestamp will be
* deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions
* which the Scan selects will get deleted.
*
* </br> Example: <code><pre>
* Scan scan = new Scan();
* // set scan properties(rowkey range, filters, timerange etc).
* HTable ht = ...;
* long noOfDeletedRows = 0L;
* Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt; callable =
* new Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt;() {
* ServerRpcController controller = new ServerRpcController();
* BlockingRpcCallback&lt;BulkDeleteResponse&gt; rpcCallback =
* new BlockingRpcCallback&lt;BulkDeleteResponse&gt;();
*
* public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
* Builder builder = BulkDeleteRequest.newBuilder();
* builder.setScan(ProtobufUtil.toScan(scan));
* builder.setDeleteType(DeleteType.VERSION);
* builder.setRowBatchSize(rowBatchSize);
* // Set optional timestamp if needed
* builder.setTimestamp(timeStamp);
* service.delete(controller, builder.build(), rpcCallback);
* return rpcCallback.get();
* }
* };
* Map&lt;byte[], BulkDeleteResponse&gt; result = ht.coprocessorService(BulkDeleteService.class, scan
* .getStartRow(), scan.getStopRow(), callable);
* for (BulkDeleteResponse response : result.values()) {
* noOfDeletedRows += response.getRowsDeleted();
* }
* </pre></code>
*/
public class BulkDeleteEndpoint extends BulkDeleteService implements CoprocessorService,
Coprocessor {
private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void delete(RpcController controller, BulkDeleteRequest request,
RpcCallback<BulkDeleteResponse> done) {
long totalRowsDeleted = 0L;
long totalVersionsDeleted = 0L;
HRegion region = env.getRegion();
int rowBatchSize = request.getRowBatchSize();
Long timestamp = null;
if (request.hasTimestamp()) {
timestamp = request.getTimestamp();
}
DeleteType deleteType = request.getDeleteType();
boolean hasMore = true;
RegionScanner scanner = null;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
// What we need is just the rowkeys. So only 1st KV from any row is enough.
// Only when it is a row delete, we can apply this filter.
// In other types we rely on the scan to know which all columns to be deleted.
scan.setFilter(new FirstKeyOnlyFilter());
}
// Here by assume that the scan is perfect with the appropriate
// filter and having necessary column(s).
scanner = region.getScanner(scan);
while (hasMore) {
List<List<KeyValue>> deleteRows = new ArrayList<List<KeyValue>>(rowBatchSize);
for (int i = 0; i < rowBatchSize; i++) {
List<KeyValue> results = new ArrayList<KeyValue>();
hasMore = scanner.next(results);
if (results.size() > 0) {
deleteRows.add(results);
}
if (!hasMore) {
// There are no more rows.
break;
}
}
if (deleteRows.size() > 0) {
Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
int i = 0;
for (List<KeyValue> deleteRow : deleteRows) {
Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
}
OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
for (i = 0; i < opStatus.length; i++) {
if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
break;
}
totalRowsDeleted++;
if (deleteType == DeleteType.VERSION) {
byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
NO_OF_VERSIONS_TO_DELETE);
if (versionsDeleted != null) {
totalVersionsDeleted += Bytes.toInt(versionsDeleted);
}
}
}
}
}
} catch (IOException ioe) {
LOG.error(ioe);
// Call ServerRpcController#getFailedOn() to retrieve this IOException at client side.
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ioe) {
LOG.error(ioe);
}
}
}
Builder responseBuilder = BulkDeleteResponse.newBuilder();
responseBuilder.setRowsDeleted(totalRowsDeleted);
if (deleteType == DeleteType.VERSION) {
responseBuilder.setVersionsDeleted(totalVersionsDeleted);
}
BulkDeleteResponse result = responseBuilder.build();
done.run(result);
}
private Delete createDeleteMutation(List<KeyValue> deleteRow, DeleteType deleteType,
Long timestamp) {
long ts;
if (timestamp == null) {
ts = HConstants.LATEST_TIMESTAMP;
} else {
ts = timestamp;
}
// We just need the rowkey. Get it from 1st KV.
byte[] row = deleteRow.get(0).getRow();
Delete delete = new Delete(row, ts, null);
if (deleteType == DeleteType.FAMILY) {
Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (KeyValue kv : deleteRow) {
if (families.add(kv.getFamily())) {
delete.deleteFamily(kv.getFamily(), ts);
}
}
} else if (deleteType == DeleteType.COLUMN) {
Set<Column> columns = new HashSet<Column>();
for (KeyValue kv : deleteRow) {
Column column = new Column(kv.getFamily(), kv.getQualifier());
if (columns.add(column)) {
// Making deleteColumns() calls more than once for the same cf:qualifier is not correct
// Every call to deleteColumns() will add a new KV to the familymap which will finally
// get written to the memstore as part of delete().
delete.deleteColumns(column.family, column.qualifier, ts);
}
}
} else if (deleteType == DeleteType.VERSION) {
// When some timestamp was passed to the delete() call only one version of the column (with
// given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
// How many versions will get deleted depends on the Scan being passed. All the KVs that
// the scan fetched will get deleted.
int noOfVersionsToDelete = 0;
if (timestamp == null) {
for (KeyValue kv : deleteRow) {
delete.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());
noOfVersionsToDelete++;
}
} else {
Set<Column> columns = new HashSet<Column>();
for (KeyValue kv : deleteRow) {
Column column = new Column(kv.getFamily(), kv.getQualifier());
// Only one version of particular column getting deleted.
if (columns.add(column)) {
delete.deleteColumn(column.family, column.qualifier, ts);
noOfVersionsToDelete++;
}
}
}
delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
}
return delete;
}
private static class Column {
private byte[] family;
private byte[] qualifier;
public Column(byte[] family, byte[] qualifier) {
this.family = family;
this.qualifier = qualifier;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof Column)) {
return false;
}
Column column = (Column) other;
return Bytes.equals(this.family, column.family)
&& Bytes.equals(this.qualifier, column.qualifier);
}
@Override
public int hashCode() {
int h = 31;
h = h + 13 * Bytes.hashCode(this.family);
h = h + 13 * Bytes.hashCode(this.qualifier);
return h;
}
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
option java_outer_classname = "BulkDeleteProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "Client.proto";
message BulkDeleteRequest {
required Scan scan = 1;
required DeleteType deleteType = 2;
optional uint64 timestamp = 3;
required uint32 rowBatchSize = 4;
enum DeleteType {
ROW = 0;
FAMILY = 1;
COLUMN = 2;
VERSION = 3;
}
}
message BulkDeleteResponse {
required uint64 rowsDeleted = 1;
optional uint64 versionsDeleted = 2;
}
service BulkDeleteService {
rpc delete(BulkDeleteRequest)
returns (BulkDeleteResponse);
}

View File

@ -0,0 +1,434 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestBulkDeleteProtocol {
private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setupBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
BulkDeleteEndpoint.class.getName());
TEST_UTIL.startMiniCluster(2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testBulkDeleteEndpoint() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
byte[] rowkey = Bytes.toBytes(j);
puts.add(createPut(rowkey, "v1"));
}
ht.put(puts);
// Deleting all the rows.
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
for (Result result : ht.getScanner(new Scan())) {
rows++;
}
assertEquals(0, rows);
}
@Test
public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
throws Throwable {
byte[] tableName = Bytes
.toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
byte[] rowkey = Bytes.toBytes(j);
puts.add(createPut(rowkey, "v1"));
}
ht.put(puts);
// Deleting all the rows.
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
for (Result result : ht.getScanner(new Scan())) {
rows++;
}
assertEquals(0, rows);
}
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
final DeleteType deleteType, final Long timeStamp) throws Throwable {
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
long noOfDeletedRows = 0L;
Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
new BlockingRpcCallback<BulkDeleteResponse>();
public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
Builder builder = BulkDeleteRequest.newBuilder();
builder.setScan(ProtobufUtil.toScan(scan));
builder.setDeleteType(deleteType);
builder.setRowBatchSize(rowBatchSize);
if (timeStamp != null) {
builder.setTimestamp(timeStamp);
}
service.delete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
.getStartRow(), scan.getStopRow(), callable);
for (BulkDeleteResponse response : result.values()) {
noOfDeletedRows += response.getRowsDeleted();
}
return noOfDeletedRows;
}
@Test
public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
byte[] rowkey = Bytes.toBytes(j);
String value = (j % 10 == 0) ? "v1" : "v2";
puts.add(createPut(rowkey, value));
}
ht.put(puts);
Scan scan = new Scan();
FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
CompareOp.EQUAL, Bytes.toBytes("v1"));
// fl.addFilter(new FirstKeyOnlyFilter());
fl.addFilter(scvf);
scan.setFilter(fl);
// Deleting all the rows where cf1:c1=v1
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
assertEquals(10, noOfRowsDeleted);
int rows = 0;
for (Result result : ht.getScanner(new Scan())) {
rows++;
}
assertEquals(90, rows);
}
@Test
public void testBulkDeleteColumn() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteColumn");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
byte[] rowkey = Bytes.toBytes(j);
String value = (j % 10 == 0) ? "v1" : "v2";
puts.add(createPut(rowkey, value));
}
ht.put(puts);
Scan scan = new Scan();
scan.addColumn(FAMILY1, QUALIFIER2);
// Delete the column cf1:col2
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
for (Result result : ht.getScanner(new Scan())) {
assertEquals(2, result.getFamilyMap(FAMILY1).size());
assertTrue(result.getColumn(FAMILY1, QUALIFIER2).isEmpty());
assertEquals(1, result.getColumn(FAMILY1, QUALIFIER1).size());
assertEquals(1, result.getColumn(FAMILY1, QUALIFIER3).size());
rows++;
}
assertEquals(100, rows);
}
@Test
public void testBulkDeleteFamily() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteFamily");
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(FAMILY1));
htd.addFamily(new HColumnDescriptor(FAMILY2));
TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
Put put = new Put(Bytes.toBytes(j));
put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
puts.add(put);
}
ht.put(puts);
Scan scan = new Scan();
scan.addFamily(FAMILY1);
// Delete the column family cf1
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
for (Result result : ht.getScanner(new Scan())) {
assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
assertEquals(1, result.getColumn(FAMILY2, QUALIFIER2).size());
rows++;
}
assertEquals(100, rows);
}
@Test
public void testBulkDeleteColumnVersion() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
Put put = new Put(Bytes.toBytes(j));
byte[] value = "v1".getBytes();
put.add(FAMILY1, QUALIFIER1, 1234L, value);
put.add(FAMILY1, QUALIFIER2, 1234L, value);
put.add(FAMILY1, QUALIFIER3, 1234L, value);
// Latest version values
value = "v2".getBytes();
put.add(FAMILY1, QUALIFIER1, value);
put.add(FAMILY1, QUALIFIER2, value);
put.add(FAMILY1, QUALIFIER3, value);
put.add(FAMILY1, null, value);
puts.add(put);
}
ht.put(puts);
Scan scan = new Scan();
scan.addFamily(FAMILY1);
// Delete the latest version values of all the columns in family cf1.
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
HConstants.LATEST_TIMESTAMP);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
scan = new Scan();
scan.setMaxVersions();
for (Result result : ht.getScanner(scan)) {
assertEquals(3, result.getFamilyMap(FAMILY1).size());
List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER1);
assertEquals(1, column.size());
assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
column = result.getColumn(FAMILY1, QUALIFIER2);
assertEquals(1, column.size());
assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
column = result.getColumn(FAMILY1, QUALIFIER3);
assertEquals(1, column.size());
assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
rows++;
}
assertEquals(100, rows);
}
@Test
public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
Put put = new Put(Bytes.toBytes(j));
// TS = 1000L
byte[] value = "v1".getBytes();
put.add(FAMILY1, QUALIFIER1, 1000L, value);
put.add(FAMILY1, QUALIFIER2, 1000L, value);
put.add(FAMILY1, QUALIFIER3, 1000L, value);
// TS = 1234L
value = "v2".getBytes();
put.add(FAMILY1, QUALIFIER1, 1234L, value);
put.add(FAMILY1, QUALIFIER2, 1234L, value);
put.add(FAMILY1, QUALIFIER3, 1234L, value);
// Latest version values
value = "v3".getBytes();
put.add(FAMILY1, QUALIFIER1, value);
put.add(FAMILY1, QUALIFIER2, value);
put.add(FAMILY1, QUALIFIER3, value);
puts.add(put);
}
ht.put(puts);
Scan scan = new Scan();
scan.addColumn(FAMILY1, QUALIFIER3);
// Delete the column cf1:c3's one version at TS=1234
long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
assertEquals(100, noOfRowsDeleted);
int rows = 0;
scan = new Scan();
scan.setMaxVersions();
for (Result result : ht.getScanner(scan)) {
assertEquals(3, result.getFamilyMap(FAMILY1).size());
assertEquals(3, result.getColumn(FAMILY1, QUALIFIER1).size());
assertEquals(3, result.getColumn(FAMILY1, QUALIFIER2).size());
List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER3);
assertEquals(2, column.size());
assertTrue(Bytes.equals("v3".getBytes(), column.get(0).getValue()));
assertTrue(Bytes.equals("v1".getBytes(), column.get(1).getValue()));
rows++;
}
assertEquals(100, rows);
}
@Test
public void testBulkDeleteWithNumberOfVersions() throws Throwable {
byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions");
HTable ht = createTable(tableName);
List<Put> puts = new ArrayList<Put>(100);
for (int j = 0; j < 100; j++) {
Put put = new Put(Bytes.toBytes(j));
// TS = 1000L
byte[] value = "v1".getBytes();
put.add(FAMILY1, QUALIFIER1, 1000L, value);
put.add(FAMILY1, QUALIFIER2, 1000L, value);
put.add(FAMILY1, QUALIFIER3, 1000L, value);
// TS = 1234L
value = "v2".getBytes();
put.add(FAMILY1, QUALIFIER1, 1234L, value);
put.add(FAMILY1, QUALIFIER2, 1234L, value);
put.add(FAMILY1, QUALIFIER3, 1234L, value);
// TS = 2000L
value = "v3".getBytes();
put.add(FAMILY1, QUALIFIER1, 2000L, value);
put.add(FAMILY1, QUALIFIER2, 2000L, value);
put.add(FAMILY1, QUALIFIER3, 2000L, value);
// Latest version values
value = "v4".getBytes();
put.add(FAMILY1, QUALIFIER1, value);
put.add(FAMILY1, QUALIFIER2, value);
put.add(FAMILY1, QUALIFIER3, value);
puts.add(put);
}
ht.put(puts);
// Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
// [1000,2000)
final Scan scan = new Scan();
scan.addColumn(FAMILY1, QUALIFIER1);
scan.addColumn(FAMILY1, QUALIFIER2);
scan.setTimeRange(1000L, 2000L);
scan.setMaxVersions();
long noOfDeletedRows = 0L;
long noOfVersionsDeleted = 0L;
Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
new BlockingRpcCallback<BulkDeleteResponse>();
public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
Builder builder = BulkDeleteRequest.newBuilder();
builder.setScan(ProtobufUtil.toScan(scan));
builder.setDeleteType(DeleteType.VERSION);
builder.setRowBatchSize(500);
service.delete(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
.getStartRow(), scan.getStopRow(), callable);
for (BulkDeleteResponse response : result.values()) {
noOfDeletedRows += response.getRowsDeleted();
noOfVersionsDeleted += response.getVersionsDeleted();
}
assertEquals(100, noOfDeletedRows);
assertEquals(400, noOfVersionsDeleted);
int rows = 0;
Scan scan1 = new Scan();
scan1.setMaxVersions();
for (Result res : ht.getScanner(scan1)) {
assertEquals(3, res.getFamilyMap(FAMILY1).size());
List<KeyValue> column = res.getColumn(FAMILY1, QUALIFIER1);
assertEquals(2, column.size());
assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
column = res.getColumn(FAMILY1, QUALIFIER2);
assertEquals(2, column.size());
assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
assertEquals(4, res.getColumn(FAMILY1, QUALIFIER3).size());
rows++;
}
assertEquals(100, rows);
}
private HTable createTable(byte[] tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
htd.addFamily(hcd);
TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
return ht;
}
private Put createPut(byte[] rowkey, String value) throws IOException {
Put put = new Put(rowkey);
put.add(FAMILY1, QUALIFIER1, value.getBytes());
put.add(FAMILY1, QUALIFIER2, value.getBytes());
put.add(FAMILY1, QUALIFIER3, value.getBytes());
return put;
}
}