diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java new file mode 100644 index 00000000000..7878d0519c3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This class is used to extend AP to process single action request, like delete, get etc. + */ +@InterfaceAudience.Private +abstract class AbstractResponse { + + public enum ResponseType { + + SINGLE (0), + MULTI (1); + + ResponseType(int value) {} + } + + public abstract ResponseType type(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index c5745e9cf48..15312018a5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -756,14 +756,14 @@ class AsyncProcess { @Override public void run() { - MultiResponse res; + AbstractResponse res; CancellableRegionServerCallable callable = currentCallable; try { // setup the callable based on the actions, if we don't have one already from the request if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller caller = createCaller(callable); + RpcRetryingCaller caller = createCaller(callable); try { if (callsInProgress != null) { callsInProgress.add(callable); @@ -785,9 +785,16 @@ class AsyncProcess { receiveGlobalFailure(multiAction, server, numAttempt, t); return; } - - // Normal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(multiAction, server, res, numAttempt); + if (res.type() == AbstractResponse.ResponseType.MULTI) { + // Normal case: we received an answer from the server, and it's not an exception. + receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); + } else { + if (results != null) { + SingleResponse singleResponse = (SingleResponse) res; + results[0] = singleResponse.getEntry(); + } + decActionCounter(1); + } } catch (Throwable t) { // Something really bad happened. We are on the send thread that will now die. LOG.error("Internal AsyncProcess #" + id + " error for " @@ -1782,8 +1789,9 @@ class AsyncProcess { * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller createCaller(CancellableRegionServerCallable callable) { - return rpcCallerFactory. newCaller(); + protected RpcRetryingCaller createCaller( + CancellableRegionServerCallable callable) { + return rpcCallerFactory. newCaller(); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 0d1b156c179..bcbb1dafaf1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -524,18 +523,25 @@ public class HTable implements Table { @Override public void delete(final Delete delete) throws IOException { - RegionServerCallable callable = new RegionServerCallable(connection, - this.rpcControllerFactory, getName(), delete.getRow()) { + CancellableRegionServerCallable callable = + new CancellableRegionServerCallable( + connection, getName(), delete.getRow(), this.rpcControllerFactory) { @Override - protected Boolean rpcCall() throws Exception { + protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), delete); MutateResponse response = getStub().mutate(getRpcController(), request); - return Boolean.valueOf(response.getProcessed()); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + List rows = new ArrayList(); + rows.add(delete); + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } } /** @@ -768,21 +774,30 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(this.connection, this.rpcControllerFactory, - getName(), row) { + CancellableRegionServerCallable callable = + new CancellableRegionServerCallable( + this.connection, getName(), row, this.rpcControllerFactory) { @Override - protected Boolean rpcCall() throws Exception { + protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), compareType, delete); MutateResponse response = getStub().mutate(getRpcController(), request); - return Boolean.valueOf(response.getProcessed()); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - return rpcCallerFactory. newCaller(this.writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + List rows = new ArrayList(); + rows.add(delete); + + Object[] results = new Object[1]; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, + null, results, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } + return ((SingleResponse.Entry)results[0]).isProcessed(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java index 79a9ed34013..18376f480fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes; * A container for Result objects, grouped by regionName. */ @InterfaceAudience.Private -public class MultiResponse { +public class MultiResponse extends AbstractResponse { // map of regionName to map of Results by the original index for that Result private Map results = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -101,6 +101,11 @@ public class MultiResponse { return this.results; } + @Override + public ResponseType type() { + return ResponseType.MULTI; + } + static class RegionResult{ Map result = new HashMap<>(); ClientProtos.RegionLoadStats stat; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java new file mode 100644 index 00000000000..68897b58ba6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Class for single action response + */ +@InterfaceAudience.Private +public class SingleResponse extends AbstractResponse { + private Entry entry = null; + + @InterfaceAudience.Private + public static class Entry { + private Result result = null; + private boolean processed = false; + + public Result getResult() { + return result; + } + + public void setResult(Result result) { + this.result = result; + } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + } + + public Entry getEntry() { + return entry; + } + + public void setEntry(Entry entry) { + this.entry = entry; + } + @Override + public ResponseType type() { + return ResponseType.SINGLE; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 76b4ccf8f47..e5deabd2fea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.SingleResponse; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; @@ -149,6 +150,19 @@ public final class ResponseConverter { return results; } + + public static SingleResponse getResult(final ClientProtos.MutateRequest request, + final ClientProtos.MutateResponse response, + final CellScanner cells) + throws IOException { + SingleResponse singleResponse = new SingleResponse(); + SingleResponse.Entry entry = new SingleResponse.Entry(); + entry.setResult(ProtobufUtil.toResult(response.getResult(), cells)); + entry.setProcessed(response.getProcessed()); + singleResponse.setEntry(entry); + return singleResponse; + } + /** * Wrap a throwable to an action result. * diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index e7366a90989..54552d96235 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -218,7 +218,7 @@ public class TestAsyncProcess { // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. } @Override - protected RpcRetryingCaller createCaller( + protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; @@ -234,9 +234,9 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override - public MultiResponse callWithoutRetries(RetryingCallable callable, + public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { try { @@ -252,7 +252,7 @@ public class TestAsyncProcess { } } - static class CallerWithFailure extends RpcRetryingCallerImpl{ + static class CallerWithFailure extends RpcRetryingCallerImpl{ private final IOException e; @@ -262,7 +262,7 @@ public class TestAsyncProcess { } @Override - public MultiResponse callWithoutRetries(RetryingCallable callable, + public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { throw e; @@ -281,7 +281,7 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller createCaller( + protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); @@ -332,7 +332,7 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller createCaller( + protected RpcRetryingCaller createCaller( CancellableRegionServerCallable payloadCallable) { MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( @@ -362,9 +362,9 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 10, 9) { @Override - public MultiResponse callWithoutRetries(RetryingCallable callable, + public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { long sleep = -1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index bc94b024b8f..f46562540e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; @@ -1857,6 +1858,33 @@ public class TestFromClientSide { admin.close(); } + @Test + public void testDeleteWithFailed() throws Exception { + TableName TABLE = TableName.valueOf("testDeleteWithFailed"); + + byte [][] ROWS = makeNAscii(ROW, 6); + byte [][] FAMILIES = makeNAscii(FAMILY, 3); + byte [][] VALUES = makeN(VALUE, 5); + long [] ts = {1000, 2000, 3000, 4000, 5000}; + + Table ht = TEST_UTIL.createTable(TABLE, FAMILIES, 3); + + Put put = new Put(ROW); + put.addColumn(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + ht.put(put); + + // delete wrong family + Delete delete = new Delete(ROW); + delete.addFamily(FAMILIES[1], ts[0]); + ht.delete(delete); + + Get get = new Get(ROW); + get.addFamily(FAMILIES[0]); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = ht.get(get); + assertTrue(Bytes.equals(result.getValue(FAMILIES[0], QUALIFIER), VALUES[0])); + } + @Test public void testDeletes() throws Exception { TableName TABLE = TableName.valueOf("testDeletes"); @@ -4622,6 +4650,24 @@ public class TestFromClientSide { assertEquals(ok, true); } + @Test + public void testCheckAndDelete() throws IOException { + final byte [] value1 = Bytes.toBytes("aaaa"); + + Table table = TEST_UTIL.createTable(TableName.valueOf("testCheckAndDelete"), + FAMILY); + + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, value1); + table.put(put); + + Delete delete = new Delete(ROW); + delete.addColumns(FAMILY, QUALIFIER); + + boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, value1, delete); + assertEquals(ok, true); + } + @Test public void testCheckAndDeleteWithCompareOp() throws IOException { final byte [] value1 = Bytes.toBytes("aaaa");