From 2b1b79f08bf4b8fab84d7e7eb7b46580de60169c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 2 Feb 2019 21:24:18 +0800 Subject: [PATCH] HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly --- .../hadoop/hbase/AsyncMetaTableAccessor.java | 90 +++++++++---------- .../hadoop/hbase/client/AsyncAdmin.java | 12 +-- .../AsyncAdminRequestRetryingCaller.java | 4 +- .../client/AsyncBatchRpcRetryingCaller.java | 3 +- .../client/AsyncBufferedMutatorImpl.java | 5 +- .../hbase/client/AsyncConnectionImpl.java | 2 +- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 11 +-- .../AsyncMasterRequestRpcRetryingCaller.java | 8 +- .../hbase/client/AsyncMetaRegionLocator.java | 3 +- .../AsyncServerRequestRpcRetryingCaller.java | 4 +- .../AsyncSingleRequestRpcRetryingCaller.java | 2 +- .../hadoop/hbase/client/AsyncTableImpl.java | 11 +-- .../hbase/client/ConnectionFactory.java | 12 +-- .../MasterCoprocessorRpcChannelImpl.java | 36 ++++---- .../hbase/client/RawAsyncHBaseAdmin.java | 21 ++--- .../hbase/client/RawAsyncTableImpl.java | 2 +- .../RegionCoprocessorRpcChannelImpl.java | 32 +++---- ...RegionServerCoprocessorRpcChannelImpl.java | 36 ++++---- .../hadoop/hbase/client/ZKAsyncRegistry.java | 21 ++--- .../apache/hadoop/hbase/util/FutureUtils.java | 57 +++++++++++- .../coprocessor/AsyncAggregationClient.java | 11 ++- .../client/example/AsyncClientExample.java | 62 ++++++------- .../client/example/HttpProxyExample.java | 48 +++++----- .../hbase/regionserver/wal/AsyncFSWAL.java | 4 +- .../wal/AsyncProtobufLogWriter.java | 6 +- .../regionserver/wal/CombinedAsyncWriter.java | 4 +- 26 files changed, 285 insertions(+), 222 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index 5d38179fabb..4a886d13e97 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -81,7 +83,7 @@ public class AsyncMetaTableAccessor { long time = EnvironmentEdgeManager.currentTime(); try { get.setTimeRange(0, time); - metaTable.get(get).whenComplete((result, error) -> { + addListener(metaTable.get(get), (result, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -109,16 +111,14 @@ public class AsyncMetaTableAccessor { CompletableFuture> future = new CompletableFuture<>(); try { RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); - metaTable.get( - new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo)) - .addFamily(HConstants.CATALOG_FAMILY)).whenComplete( - (r, err) -> { + addListener(metaTable.get(new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo)) + .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> { if (err != null) { future.completeExceptionally(err); return; } - future.complete(getRegionLocations(r).map( - locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); + future.complete(getRegionLocations(r) + .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); }); } catch (IOException parseEx) { LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName)); @@ -136,34 +136,29 @@ public class AsyncMetaTableAccessor { public static CompletableFuture> getRegionLocationWithEncodedName( AsyncTable metaTable, byte[] encodedRegionName) { CompletableFuture> future = new CompletableFuture<>(); - metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) - .whenComplete( - (results, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - String encodedRegionNameStr = Bytes.toString(encodedRegionName); - results - .stream() - .filter(result -> !result.isEmpty()) - .filter(result -> MetaTableAccessor.getRegionInfo(result) != null) - .forEach( - result -> { - getRegionLocations(result).ifPresent( - locations -> { - for (HRegionLocation location : locations.getRegionLocations()) { - if (location != null - && encodedRegionNameStr.equals(location.getRegion() - .getEncodedName())) { - future.complete(Optional.of(location)); - return; - } - } - }); - }); - future.complete(Optional.empty()); + addListener( + metaTable + .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)), + (results, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + String encodedRegionNameStr = Bytes.toString(encodedRegionName); + results.stream().filter(result -> !result.isEmpty()) + .filter(result -> MetaTableAccessor.getRegionInfo(result) != null).forEach(result -> { + getRegionLocations(result).ifPresent(locations -> { + for (HRegionLocation location : locations.getRegionLocations()) { + if (location != null && + encodedRegionNameStr.equals(location.getRegion().getEncodedName())) { + future.complete(Optional.of(location)); + return; + } + } + }); }); + future.complete(Optional.empty()); + }); return future; } @@ -190,19 +185,18 @@ public class AsyncMetaTableAccessor { public static CompletableFuture> getTableHRegionLocations( AsyncTable metaTable, Optional tableName) { CompletableFuture> future = new CompletableFuture<>(); - getTableRegionsAndLocations(metaTable, tableName, true).whenComplete( - (locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (locations == null || locations.isEmpty()) { - future.complete(Collections.emptyList()); - } else { - List regionLocations = locations.stream() - .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) - .collect(Collectors.toList()); - future.complete(regionLocations); - } - }); + addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (locations == null || locations.isEmpty()) { + future.complete(Collections.emptyList()); + } else { + List regionLocations = + locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) + .collect(Collectors.toList()); + future.complete(regionLocations); + } + }); return future; } @@ -254,7 +248,7 @@ public class AsyncMetaTableAccessor { } }; - scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> { + addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> { if (error != null) { future.completeExceptionally(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 3a5aef14b17..9abfe23763b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import com.google.protobuf.RpcChannel; import java.io.IOException; import java.util.Collection; @@ -614,15 +616,15 @@ public interface AsyncAdmin { * @param peerId a short name that identifies the peer * @return the current cluster state wrapped by a {@link CompletableFuture}. */ - default CompletableFuture - getReplicationPeerSyncReplicationState(String peerId) { + default CompletableFuture getReplicationPeerSyncReplicationState( + String peerId) { CompletableFuture future = new CompletableFuture<>(); - listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> { + addListener(listReplicationPeers(Pattern.compile(peerId)), (peers, error) -> { if (error != null) { future.completeExceptionally(error); } else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) { - future.completeExceptionally( - new IOException("Replication peer " + peerId + " does not exist")); + future + .completeExceptionally(new IOException("Replication peer " + peerId + " does not exist")); } else { future.complete(peers.get(0).getSyncReplicationState()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index cf31d79d8e9..02e22c052ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; @@ -61,7 +63,7 @@ public class AsyncAdminRequestRetryingCaller extends AsyncRpcRetryingCaller { + addListener(callable.call(controller, adminStub), (result, error) -> { if (error != null) { onError(error, () -> "Call to admin stub failed", err -> { }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 33e636647a9..4051e1d34f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; import java.io.IOException; import java.util.ArrayList; @@ -409,7 +410,7 @@ class AsyncBatchRpcRetryingCaller { .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { if (error != null) { - error = translateException(error); + error = unwrapCompletionException(translateException(error)); if (error instanceof DoNotRetryIOException) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 318c6c94c4d..61d49af3103 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -25,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -96,7 +97,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { Iterator> toCompleteIter = toComplete.iterator(); for (CompletableFuture future : table.batch(toSend)) { CompletableFuture toCompleteFuture = toCompleteIter.next(); - future.whenComplete((r, e) -> { + addListener(future, (r, e) -> { if (e != null) { toCompleteFuture.completeExceptionally(e); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 18286503393..4a325461609 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -188,7 +188,7 @@ class AsyncConnectionImpl implements AsyncConnection { } private void makeMasterStub(CompletableFuture future) { - registry.getMasterAddress().whenComplete((sn, error) -> { + addListener(registry.getMasterAddress(), (sn, error) -> { if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 356a42532b0..f39fe36768e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.security.access.UserPermission; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -68,15 +69,7 @@ class AsyncHBaseAdmin implements AsyncAdmin { } private CompletableFuture wrap(CompletableFuture future) { - CompletableFuture asyncFuture = new CompletableFuture<>(); - future.whenCompleteAsync((r, e) -> { - if (e != null) { - asyncFuture.completeExceptionally(e); - } else { - asyncFuture.complete(r); - } - }, pool); - return asyncFuture; + return FutureUtils.wrapFuture(future, pool); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index a52e7997974..7ed44e269b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; @@ -43,20 +45,20 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall Callable callable, long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + startLogErrorsCnt); this.callable = callable; } @Override protected void doCall() { - conn.getMasterStub().whenComplete((stub, error) -> { + addListener(conn.getMasterStub(), (stub, error) -> { if (error != null) { onError(error, () -> "Get async master stub failed", err -> { }); return; } resetCallTimeout(); - callable.call(controller, stub).whenComplete((result, error2) -> { + addListener(callable.call(controller, stub), (result, error2) -> { if (error2 != null) { onError(error2, () -> "Call to master failed", err -> { }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 9fef15dbfc8..ce3a2ddd81d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; @@ -71,7 +72,7 @@ class AsyncMetaRegionLocator { if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { LOG.debug("Start fetching meta region location from registry."); CompletableFuture future = metaRelocateFuture.get(); - registry.getMetaRegionLocation().whenComplete((locs, error) -> { + addListener(registry.getMetaRegionLocation(), (locs, error) -> { if (error != null) { LOG.debug("Failed to fetch meta region location from registry", error); metaRelocateFuture.getAndSet(null).completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 54b055a9ed4..f114eff5bac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; @@ -62,7 +64,7 @@ public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCall return; } resetCallTimeout(); - callable.call(controller, stub).whenComplete((result, error) -> { + addListener(callable.call(controller, stub), (result, error) -> { if (error != null) { onError(error, () -> "Call to admin stub failed", err -> { }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index a552e403a02..9490d0f0b6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -79,7 +79,7 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { return; } resetCallTimeout(); - callable.call(controller, loc, stub).whenComplete((result, error) -> { + addListener(callable.call(controller, loc, stub), (result, error) -> { if (error != null) { onError(error, () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 9747d0665d6..426b18496b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -87,15 +88,7 @@ class AsyncTableImpl implements AsyncTable { } private CompletableFuture wrap(CompletableFuture future) { - CompletableFuture asyncFuture = new CompletableFuture<>(); - future.whenCompleteAsync((r, e) -> { - if (e != null) { - asyncFuture.completeExceptionally(e); - } else { - asyncFuture.complete(r); - } - }, pool); - return asyncFuture; + return FutureUtils.wrapFuture(future, pool); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index e24af7411d2..e3e87f624aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -18,19 +18,20 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.lang.reflect.Constructor; import java.security.PrivilegedExceptionAction; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of @@ -282,7 +283,7 @@ public class ConnectionFactory { final User user) { CompletableFuture future = new CompletableFuture<>(); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); - registry.getClusterId().whenComplete((clusterId, error) -> { + addListener(registry.getClusterId(), (clusterId, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -295,9 +296,8 @@ public class ConnectionFactory { AsyncConnectionImpl.class, AsyncConnection.class); try { future.complete( - user.runAs((PrivilegedExceptionAction)() -> - ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user)) - ); + user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils + .newInstance(clazz, conf, registry, clusterId, user))); } catch (Exception e) { future.completeExceptionally(e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java index 9176c87d106..9e68a16bc30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java @@ -17,22 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; /** * The implementation of a master based coprocessor rpc channel. @@ -75,12 +76,13 @@ class MasterCoprocessorRpcChannelImpl implements RpcChannel { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { - callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() - .whenComplete(((r, e) -> { - if (e != null) { - ((ClientCoprocessorRpcController) controller).setFailed(e); - } - done.run(r); - })); + addListener( + callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(), + ((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + })); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 73efe32157b..d4b60fb8a37 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; @@ -415,12 +416,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture procedureCall(PREQ preq, MasterRpcCall rpcCall, Converter respConverter, ProcedureBiConsumer consumer) { - CompletableFuture procFuture = this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, stub, preq, rpcCall, - respConverter)).call(); - return waitProcedureResult(procFuture).whenComplete(consumer); + CompletableFuture procFuture = + this. newMasterCaller().action((controller, stub) -> this + . call(controller, stub, preq, rpcCall, respConverter)).call(); + CompletableFuture future = waitProcedureResult(procFuture); + addListener(future, consumer); + return future; } @FunctionalInterface @@ -2892,7 +2893,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { // If any region compaction state is MAJOR_AND_MINOR // the table compaction state is MAJOR_AND_MINOR, too. if (err2 != null) { - future.completeExceptionally(err2); + future.completeExceptionally(unwrapCompletionException(err2)); } else if (regionState == CompactionState.MAJOR_AND_MINOR) { future.complete(regionState); } else { @@ -3039,7 +3040,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { serverNames.stream().forEach(serverName -> { futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { if (err2 != null) { - future.completeExceptionally(err2); + future.completeExceptionally(unwrapCompletionException(err2)); } else { serverStates.put(serverName, serverState); } @@ -3558,7 +3559,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { futures .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { if (err2 != null) { - future.completeExceptionally(err2); + future.completeExceptionally(unwrapCompletionException(err2)); } else { aggregator.append(stats); } @@ -3567,7 +3568,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), (ret, err3) -> { if (err3 != null) { - future.completeExceptionally(err3); + future.completeExceptionally(unwrapCompletionException(err3)); } else { future.complete(aggregator.sum()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 3a945667880..be94ca45e54 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -583,7 +583,7 @@ class RawAsyncTableImpl implements AsyncTable { (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, locateFinished, unfinishedRequest, l, e)); } - coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { + addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { if (e != null) { callback.onRegionError(region, e); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java index 4417c7e30f3..94e7d9a019c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java @@ -17,10 +17,16 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -33,12 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - /** * The implementation of a region based coprocessor rpc channel. */ @@ -102,16 +102,16 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { - conn.callerFactory. single().table(tableName).row(row) + addListener( + conn.callerFactory. single().table(tableName).row(row) .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call() - .whenComplete((r, e) -> { - if (e != null) { - ((ClientCoprocessorRpcController) controller).setFailed(e); - } - done.run(r); - }); + .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(), + (r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + }); } - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java index 372dd4a80db..38512d55baf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java @@ -17,22 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; /** * The implementation of a region server based coprocessor rpc channel. @@ -75,12 +76,13 @@ public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { - callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() - .whenComplete(((r, e) -> { - if (e != null) { - ((ClientCoprocessorRpcController) controller).setFailed(e); - } - done.run(r); - })); + addListener( + callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(), + ((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + })); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index c7ae32c0c70..c02643ff3f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; import java.io.IOException; import java.util.concurrent.CompletableFuture; - import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; @@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; @@ -68,7 +70,7 @@ class ZKAsyncRegistry implements AsyncRegistry { private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - zk.get(path).whenComplete((data, error) -> { + addListener(zk.get(path), (data, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -139,7 +141,7 @@ class ZKAsyncRegistry implements AsyncRegistry { MutableInt remaining = new MutableInt(locs.length); znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> { if (replicaId == DEFAULT_REPLICA_ID) { - getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { + addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -154,13 +156,12 @@ class ZKAsyncRegistry implements AsyncRegistry { new IOException("Meta region is in state " + stateAndServerName.getFirst())); return; } - locs[DEFAULT_REPLICA_ID] = - new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), - stateAndServerName.getSecond()); + locs[DEFAULT_REPLICA_ID] = new HRegionLocation( + getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond()); tryComplete(remaining, locs, future); }); } else { - getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { + addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { if (future.isDone()) { return; } @@ -174,12 +175,12 @@ class ZKAsyncRegistry implements AsyncRegistry { Pair stateAndServerName = getStateAndServerName(proto); if (stateAndServerName.getFirst() != RegionState.State.OPEN) { LOG.warn("Meta region for replica " + replicaId + " is in state " + - stateAndServerName.getFirst()); + stateAndServerName.getFirst()); locs[replicaId] = null; } else { locs[replicaId] = - new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), - stateAndServerName.getSecond()); + new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), + stateAndServerName.getSecond()); } } tryComplete(remaining, locs, future); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 02ce655ed2a..861dacbc325 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.function.BiConsumer; import org.apache.yetus.audience.InterfaceAudience; @@ -57,13 +59,66 @@ public final class FutureUtils { BiConsumer action) { future.whenComplete((resp, error) -> { try { - action.accept(resp, error); + // See this post on stack overflow(shorten since the url is too long), + // https://s.apache.org/completionexception + // For a chain of CompleableFuture, only the first child CompletableFuture can get the + // original exception, others will get a CompletionException, which wraps the original + // exception. So here we unwrap it before passing it to the callback action. + action.accept(resp, unwrapCompletionException(error)); } catch (Throwable t) { LOG.error("Unexpected error caught when processing CompletableFuture", t); } }); } + /** + * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only + * exception is that we will call + * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}. + * @see #addListener(CompletableFuture, BiConsumer) + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static void addListener(CompletableFuture future, + BiConsumer action, Executor executor) { + future.whenCompleteAsync((resp, error) -> { + try { + action.accept(resp, unwrapCompletionException(error)); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }, executor); + } + + /** + * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all + * the callbacks in the given {@code executor}. + */ + public static CompletableFuture wrapFuture(CompletableFuture future, + Executor executor) { + CompletableFuture wrappedFuture = new CompletableFuture<>(); + addListener(future, (r, e) -> { + if (e != null) { + wrappedFuture.completeExceptionally(e); + } else { + wrappedFuture.complete(r); + } + }, executor); + return wrappedFuture; + } + + /** + * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. + */ + public static Throwable unwrapCompletionException(Throwable error) { + if (error instanceof CompletionException) { + Throwable cause = error.getCause(); + if (cause != null) { + return cause; + } + } + return error; + } + /** * A helper class for getting the result of a Future, and convert the error to an * {@link IOException}. diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index 3b3e8d9dfab..b3003c4e141 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.client.coprocessor; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.Message; - import java.io.IOException; import java.util.Map; import java.util.NavigableMap; @@ -29,7 +29,6 @@ import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; @@ -455,10 +454,10 @@ public final class AsyncAggregationClient { } public static - CompletableFuture median(AsyncTable table, - ColumnInterpreter ci, Scan scan) { + CompletableFuture median(AsyncTable table, + ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); - sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { + addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> { if (error != null) { future.completeExceptionally(error); } else if (sumByRegion.isEmpty()) { diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java index bcc9c0aed1a..b8b321395cf 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client.example; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -78,7 +80,7 @@ public class AsyncClientExample extends Configured implements Tool { for (;;) { if (future.compareAndSet(null, new CompletableFuture<>())) { CompletableFuture toComplete = future.get(); - ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> { + addListener(ConnectionFactory.createAsyncConnection(getConf()),(conn, error) -> { if (error != null) { toComplete.completeExceptionally(error); // we need to reset the future holder so we will get a chance to recreate an async @@ -98,15 +100,15 @@ public class AsyncClientExample extends Configured implements Tool { } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NONNULL_PARAM_VIOLATION", - justification="it is valid to pass NULL to CompletableFuture#completedFuture") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", + justification = "it is valid to pass NULL to CompletableFuture#completedFuture") private CompletableFuture closeConn() { CompletableFuture f = future.get(); if (f == null) { return CompletableFuture.completedFuture(null); } CompletableFuture closeFuture = new CompletableFuture<>(); - f.whenComplete((conn, error) -> { + addListener(f, (conn, error) -> { if (error == null) { IOUtils.closeQuietly(conn); } @@ -136,44 +138,44 @@ public class AsyncClientExample extends Configured implements Tool { CountDownLatch latch = new CountDownLatch(numOps); IntStream.range(0, numOps).forEach(i -> { CompletableFuture future = getConn(); - future.whenComplete((conn, error) -> { + addListener(future, (conn, error) -> { if (error != null) { LOG.warn("failed to get async connection for " + i, error); latch.countDown(); return; } AsyncTable table = conn.getTable(tableName, threadPool); - table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))) - .whenComplete((putResp, putErr) -> { - if (putErr != null) { - LOG.warn("put failed for " + i, putErr); + addListener(table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))), + (putResp, putErr) -> { + if (putErr != null) { + LOG.warn("put failed for " + i, putErr); + latch.countDown(); + return; + } + LOG.info("put for " + i + " succeeded, try getting"); + addListener(table.get(new Get(getKey(i))), (result, getErr) -> { + if (getErr != null) { + LOG.warn("get failed for " + i); latch.countDown(); return; } - LOG.info("put for " + i + " succeeded, try getting"); - table.get(new Get(getKey(i))).whenComplete((result, getErr) -> { - if (getErr != null) { - LOG.warn("get failed for " + i); - latch.countDown(); - return; - } - if (result.isEmpty()) { - LOG.warn("get failed for " + i + ", server returns empty result"); - } else if (!result.containsColumn(FAMILY, QUAL)) { - LOG.warn("get failed for " + i + ", the result does not contain " + - Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL)); + if (result.isEmpty()) { + LOG.warn("get failed for " + i + ", server returns empty result"); + } else if (!result.containsColumn(FAMILY, QUAL)) { + LOG.warn("get failed for " + i + ", the result does not contain " + + Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL)); + } else { + int v = Bytes.toInt(result.getValue(FAMILY, QUAL)); + if (v != i) { + LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) + + ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i); } else { - int v = Bytes.toInt(result.getValue(FAMILY, QUAL)); - if (v != i) { - LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) + - ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i); - } else { - LOG.info("get for " + i + " succeeded"); - } + LOG.info("get for " + i + " succeeded"); } - latch.countDown(); - }); + } + latch.countDown(); }); + }); }); }); latch.await(); diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java index f9caf2b3329..668bf7ac36f 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client.example; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.Optional; @@ -159,36 +161,38 @@ public class HttpProxyExample { private void get(ChannelHandlerContext ctx, FullHttpRequest req) { Params params = parse(req); - conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) - .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))) - .whenComplete((r, e) -> { - if (e != null) { - exceptionCaught(ctx, e); + addListener( + conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) + .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))), + (r, e) -> { + if (e != null) { + exceptionCaught(ctx, e); + } else { + byte[] value = + r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); + if (value != null) { + write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value))); } else { - byte[] value = - r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); - if (value != null) { - write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value))); - } else { - write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty()); - } + write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty()); } - }); + } + }); } private void put(ChannelHandlerContext ctx, FullHttpRequest req) { Params params = parse(req); byte[] value = new byte[req.content().readableBytes()]; req.content().readBytes(value); - conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) - .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)) - .whenComplete((r, e) -> { - if (e != null) { - exceptionCaught(ctx, e); - } else { - write(ctx, HttpResponseStatus.OK, Optional.empty()); - } - }); + addListener( + conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) + .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)), + (r, e) -> { + if (e != null) { + exceptionCaught(ctx, e); + } else { + write(ctx, HttpResponseStatus.OK, Optional.empty()); + } + }); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 81308ad530e..553ff3d1668 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; @@ -348,7 +350,7 @@ public class AsyncFSWAL extends AbstractFSWAL { highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; final long startTimeNs = System.nanoTime(); final long epoch = (long) epochAndState >>> 2L; - writer.sync().whenCompleteAsync((result, error) -> { + addListener(writer.sync(), (result, error) -> { if (error != null) { syncFailed(epoch, error); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 6368fb745f2..37c6f004913 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; @@ -194,7 +196,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter // should not happen throw new AssertionError(e); } - output.flush(false).whenComplete((len, error) -> { + addListener(output.flush(false), (len, error) -> { if (error != null) { future.completeExceptionally(error); } else { @@ -215,7 +217,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter } output.writeInt(trailer.getSerializedSize()); output.write(magic); - output.flush(false).whenComplete((len, error) -> { + addListener(output.flush(false), (len, error) -> { if (error != null) { future.completeExceptionally(error); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java index 4301ae79359..4e3fa61a60d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -75,7 +77,7 @@ public final class CombinedAsyncWriter implements AsyncWriter { public CompletableFuture sync() { CompletableFuture future = new CompletableFuture<>(); AtomicInteger remaining = new AtomicInteger(writers.size()); - writers.forEach(w -> w.sync().whenComplete((length, error) -> { + writers.forEach(w -> addListener(w.sync(), (length, error) -> { if (error != null) { future.completeExceptionally(error); return;