HBASE-16592 Unify Delete request with AP

This commit is contained in:
chenheng 2016-09-13 10:07:45 +08:00
parent 1cdc5acfd4
commit 831fb3ccb8
8 changed files with 222 additions and 31 deletions

View File

@ -0,0 +1,38 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This class is used to extend AP to process single action request, like delete, get etc.
*/
@InterfaceAudience.Private
abstract class AbstractResponse {
public enum ResponseType {
SINGLE (0),
MULTI (1);
ResponseType(int value) {}
}
public abstract ResponseType type();
}

View File

@ -756,14 +756,14 @@ class AsyncProcess {
@Override
public void run() {
MultiResponse res;
AbstractResponse res;
CancellableRegionServerCallable callable = currentCallable;
try {
// setup the callable based on the actions, if we don't have one already from the request
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
RpcRetryingCaller<AbstractResponse> caller = createCaller(callable);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
@ -785,9 +785,16 @@ class AsyncProcess {
receiveGlobalFailure(multiAction, server, numAttempt, t);
return;
}
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
if (res.type() == AbstractResponse.ResponseType.MULTI) {
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
} else {
if (results != null) {
SingleResponse singleResponse = (SingleResponse) res;
results[0] = singleResponse.getEntry();
}
decActionCounter(1);
}
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
@ -1782,8 +1789,9 @@ class AsyncProcess {
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
protected RpcRetryingCaller<MultiResponse> createCaller(CancellableRegionServerCallable callable) {
return rpcCallerFactory.<MultiResponse> newCaller();
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
return rpcCallerFactory.<AbstractResponse> newCaller();
}
@VisibleForTesting

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -524,18 +523,25 @@ public class HTable implements Table {
@Override
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
this.rpcControllerFactory, getName(), delete.getRow()) {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
connection, getName(), delete.getRow(), this.rpcControllerFactory) {
@Override
protected Boolean rpcCall() throws Exception {
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
return Boolean.valueOf(response.getProcessed());
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
List<Row> rows = new ArrayList<Row>();
rows.add(delete);
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
null, null, callable, operationTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
/**
@ -768,21 +774,30 @@ public class HTable implements Table {
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
getName(), row) {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory) {
@Override
protected Boolean rpcCall() throws Exception {
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
return Boolean.valueOf(response.getProcessed());
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
List<Row> rows = new ArrayList<Row>();
rows.add(delete);
Object[] results = new Object[1];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
null, results, callable, operationTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
return ((SingleResponse.Entry)results[0]).isProcessed();
}
/**

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* A container for Result objects, grouped by regionName.
*/
@InterfaceAudience.Private
public class MultiResponse {
public class MultiResponse extends AbstractResponse {
// map of regionName to map of Results by the original index for that Result
private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ -101,6 +101,11 @@ public class MultiResponse {
return this.results;
}
@Override
public ResponseType type() {
return ResponseType.MULTI;
}
static class RegionResult{
Map<Integer, Object> result = new HashMap<>();
ClientProtos.RegionLoadStats stat;

View File

@ -0,0 +1,65 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Class for single action response
*/
@InterfaceAudience.Private
public class SingleResponse extends AbstractResponse {
private Entry entry = null;
@InterfaceAudience.Private
public static class Entry {
private Result result = null;
private boolean processed = false;
public Result getResult() {
return result;
}
public void setResult(Result result) {
this.result = result;
}
public boolean isProcessed() {
return processed;
}
public void setProcessed(boolean processed) {
this.processed = processed;
}
}
public Entry getEntry() {
return entry;
}
public void setEntry(Entry entry) {
this.entry = entry;
}
@Override
public ResponseType type() {
return ResponseType.SINGLE;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.SingleResponse;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
@ -149,6 +150,19 @@ public final class ResponseConverter {
return results;
}
public static SingleResponse getResult(final ClientProtos.MutateRequest request,
final ClientProtos.MutateResponse response,
final CellScanner cells)
throws IOException {
SingleResponse singleResponse = new SingleResponse();
SingleResponse.Entry entry = new SingleResponse.Entry();
entry.setResult(ProtobufUtil.toResult(response.getResult(), cells));
entry.setProcessed(response.getProcessed());
singleResponse.setEntry(entry);
return singleResponse;
}
/**
* Wrap a throwable to an action result.
*

View File

@ -218,7 +218,7 @@ public class TestAsyncProcess {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
@ -234,9 +234,9 @@ public class TestAsyncProcess {
}
});
return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
try {
@ -252,7 +252,7 @@ public class TestAsyncProcess {
}
}
static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
private final IOException e;
@ -262,7 +262,7 @@ public class TestAsyncProcess {
}
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
throw e;
@ -281,7 +281,7 @@ public class TestAsyncProcess {
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
@ -332,7 +332,7 @@ public class TestAsyncProcess {
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable payloadCallable) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
@ -362,9 +362,9 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet();
}
return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
@ -1857,6 +1858,33 @@ public class TestFromClientSide {
admin.close();
}
@Test
public void testDeleteWithFailed() throws Exception {
TableName TABLE = TableName.valueOf("testDeleteWithFailed");
byte [][] ROWS = makeNAscii(ROW, 6);
byte [][] FAMILIES = makeNAscii(FAMILY, 3);
byte [][] VALUES = makeN(VALUE, 5);
long [] ts = {1000, 2000, 3000, 4000, 5000};
Table ht = TEST_UTIL.createTable(TABLE, FAMILIES, 3);
Put put = new Put(ROW);
put.addColumn(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
ht.put(put);
// delete wrong family
Delete delete = new Delete(ROW);
delete.addFamily(FAMILIES[1], ts[0]);
ht.delete(delete);
Get get = new Get(ROW);
get.addFamily(FAMILIES[0]);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = ht.get(get);
assertTrue(Bytes.equals(result.getValue(FAMILIES[0], QUALIFIER), VALUES[0]));
}
@Test
public void testDeletes() throws Exception {
TableName TABLE = TableName.valueOf("testDeletes");
@ -4622,6 +4650,24 @@ public class TestFromClientSide {
assertEquals(ok, true);
}
@Test
public void testCheckAndDelete() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");
Table table = TEST_UTIL.createTable(TableName.valueOf("testCheckAndDelete"),
FAMILY);
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, value1);
table.put(put);
Delete delete = new Delete(ROW);
delete.addColumns(FAMILY, QUALIFIER);
boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, value1, delete);
assertEquals(ok, true);
}
@Test
public void testCheckAndDeleteWithCompareOp() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");