HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly

This commit is contained in:
zhangduo 2019-02-02 21:24:18 +08:00
parent f26e1bb588
commit 2b1b79f08b
26 changed files with 285 additions and 222 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -81,7 +83,7 @@ public class AsyncMetaTableAccessor {
long time = EnvironmentEdgeManager.currentTime();
try {
get.setTimeRange(0, time);
metaTable.get(get).whenComplete((result, error) -> {
addListener(metaTable.get(get), (result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@ -109,16 +111,14 @@ public class AsyncMetaTableAccessor {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
try {
RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
metaTable.get(
new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
.addFamily(HConstants.CATALOG_FAMILY)).whenComplete(
(r, err) -> {
addListener(metaTable.get(new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
.addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
future.complete(getRegionLocations(r).map(
locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
future.complete(getRegionLocations(r)
.map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
});
} catch (IOException parseEx) {
LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
@ -136,34 +136,29 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
AsyncTable<?> metaTable, byte[] encodedRegionName) {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
.whenComplete(
(results, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
results
.stream()
.filter(result -> !result.isEmpty())
.filter(result -> MetaTableAccessor.getRegionInfo(result) != null)
.forEach(
result -> {
getRegionLocations(result).ifPresent(
locations -> {
for (HRegionLocation location : locations.getRegionLocations()) {
if (location != null
&& encodedRegionNameStr.equals(location.getRegion()
.getEncodedName())) {
future.complete(Optional.of(location));
return;
}
}
});
});
future.complete(Optional.empty());
addListener(
metaTable
.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
(results, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
results.stream().filter(result -> !result.isEmpty())
.filter(result -> MetaTableAccessor.getRegionInfo(result) != null).forEach(result -> {
getRegionLocations(result).ifPresent(locations -> {
for (HRegionLocation location : locations.getRegionLocations()) {
if (location != null &&
encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
future.complete(Optional.of(location));
return;
}
}
});
});
future.complete(Optional.empty());
});
return future;
}
@ -190,19 +185,18 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
getTableRegionsAndLocations(metaTable, tableName, true).whenComplete(
(locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (locations == null || locations.isEmpty()) {
future.complete(Collections.emptyList());
} else {
List<HRegionLocation> regionLocations = locations.stream()
.map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
.collect(Collectors.toList());
future.complete(regionLocations);
}
});
addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (locations == null || locations.isEmpty()) {
future.complete(Collections.emptyList());
} else {
List<HRegionLocation> regionLocations =
locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
.collect(Collectors.toList());
future.complete(regionLocations);
}
});
return future;
}
@ -254,7 +248,7 @@ public class AsyncMetaTableAccessor {
}
};
scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> {
addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.Collection;
@ -614,15 +616,15 @@ public interface AsyncAdmin {
* @param peerId a short name that identifies the peer
* @return the current cluster state wrapped by a {@link CompletableFuture}.
*/
default CompletableFuture<SyncReplicationState>
getReplicationPeerSyncReplicationState(String peerId) {
default CompletableFuture<SyncReplicationState> getReplicationPeerSyncReplicationState(
String peerId) {
CompletableFuture<SyncReplicationState> future = new CompletableFuture<>();
listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> {
addListener(listReplicationPeers(Pattern.compile(peerId)), (peers, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
future.completeExceptionally(
new IOException("Replication peer " + peerId + " does not exist"));
future
.completeExceptionally(new IOException("Replication peer " + peerId + " does not exist"));
} else {
future.complete(peers.get(0).getSyncReplicationState());
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
@ -61,7 +63,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
return;
}
resetCallTimeout();
callable.call(controller, adminStub).whenComplete((result, error) -> {
addListener(callable.call(controller, adminStub), (result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import java.io.IOException;
import java.util.ArrayList;
@ -409,7 +410,7 @@ class AsyncBatchRpcRetryingCaller<T> {
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) {
error = translateException(error);
error = unwrapCompletionException(translateException(error));
if (error instanceof DoNotRetryIOException) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
return;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@ -25,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@ -96,7 +97,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
future.whenComplete((r, e) -> {
addListener(future, (r, e) -> {
if (e != null) {
toCompleteFuture.completeExceptionally(e);
} else {

View File

@ -188,7 +188,7 @@ class AsyncConnectionImpl implements AsyncConnection {
}
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
registry.getMasterAddress().whenComplete((sn, error) -> {
addListener(registry.getMasterAddress(), (sn, error) -> {
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -68,15 +69,7 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
CompletableFuture<T> asyncFuture = new CompletableFuture<>();
future.whenCompleteAsync((r, e) -> {
if (e != null) {
asyncFuture.completeExceptionally(e);
} else {
asyncFuture.complete(r);
}
}, pool);
return asyncFuture;
return FutureUtils.wrapFuture(future, pool);
}
@Override

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@ -43,20 +45,20 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
startLogErrorsCnt);
this.callable = callable;
}
@Override
protected void doCall() {
conn.getMasterStub().whenComplete((stub, error) -> {
addListener(conn.getMasterStub(), (stub, error) -> {
if (error != null) {
onError(error, () -> "Get async master stub failed", err -> {
});
return;
}
resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error2) -> {
addListener(callable.call(controller, stub), (result, error2) -> {
if (error2 != null) {
onError(error2, () -> "Call to master failed", err -> {
});

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@ -71,7 +72,7 @@ class AsyncMetaRegionLocator {
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
LOG.debug("Start fetching meta region location from registry.");
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
registry.getMetaRegionLocation().whenComplete((locs, error) -> {
addListener(registry.getMetaRegionLocation(), (locs, error) -> {
if (error != null) {
LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error);

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
@ -62,7 +64,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
return;
}
resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error) -> {
addListener(callable.call(controller, stub), (result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});

View File

@ -79,7 +79,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
return;
}
resetCallTimeout();
callable.call(controller, loc, stub).whenComplete((result, error) -> {
addListener(callable.call(controller, loc, stub), (result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -87,15 +88,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
CompletableFuture<T> asyncFuture = new CompletableFuture<>();
future.whenCompleteAsync((r, e) -> {
if (e != null) {
asyncFuture.completeExceptionally(e);
} else {
asyncFuture.complete(r);
}
}, pool);
return asyncFuture;
return FutureUtils.wrapFuture(future, pool);
}
@Override

View File

@ -18,19 +18,20 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
@ -282,7 +283,7 @@ public class ConnectionFactory {
final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
registry.getClusterId().whenComplete((clusterId, error) -> {
addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@ -295,9 +296,8 @@ public class ConnectionFactory {
AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
);
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
.newInstance(clazz, conf, registry, clusterId, user)));
} catch (Exception e) {
future.completeExceptionally(e);
}

View File

@ -17,22 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* The implementation of a master based coprocessor rpc channel.
@ -75,12 +76,13 @@ class MasterCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
.whenComplete(((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
addListener(
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
@ -415,12 +416,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
CompletableFuture<Long> procFuture = this
.<Long> newMasterCaller()
.action(
(controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
respConverter)).call();
return waitProcedureResult(procFuture).whenComplete(consumer);
CompletableFuture<Long> procFuture =
this.<Long> newMasterCaller().action((controller, stub) -> this
.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
CompletableFuture<Void> future = waitProcedureResult(procFuture);
addListener(future, consumer);
return future;
}
@FunctionalInterface
@ -2892,7 +2893,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
// If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) {
future.completeExceptionally(err2);
future.completeExceptionally(unwrapCompletionException(err2));
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState);
} else {
@ -3039,7 +3040,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
serverNames.stream().forEach(serverName -> {
futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
future.completeExceptionally(unwrapCompletionException(err2));
} else {
serverStates.put(serverName, serverState);
}
@ -3558,7 +3559,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
futures
.add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
future.completeExceptionally(unwrapCompletionException(err2));
} else {
aggregator.append(stats);
}
@ -3567,7 +3568,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
future.completeExceptionally(unwrapCompletionException(err3));
} else {
future.complete(aggregator.sum());
}

View File

@ -583,7 +583,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e));
}
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
if (e != null) {
callback.onRegionError(region, e);
} else {

View File

@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@ -33,12 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/**
* The implementation of a region based coprocessor rpc channel.
*/
@ -102,16 +102,16 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
conn.callerFactory.<Message> single().table(tableName).row(row)
addListener(
conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call()
.whenComplete((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
});
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
(r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
});
}
}

View File

@ -17,22 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/**
* The implementation of a region server based coprocessor rpc channel.
@ -75,12 +76,13 @@ public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
.whenComplete(((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
addListener(
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
}
}

View File

@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@ -68,7 +70,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
zk.get(path).whenComplete((data, error) -> {
addListener(zk.get(path), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@ -139,7 +141,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@ -154,13 +156,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return;
}
locs[DEFAULT_REPLICA_ID] =
new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) {
return;
}
@ -174,12 +175,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state " +
stateAndServerName.getFirst());
stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] =
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience;
@ -57,13 +59,66 @@ public final class FutureUtils {
BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> {
try {
action.accept(resp, error);
// See this post on stack overflow(shorten since the url is too long),
// https://s.apache.org/completionexception
// For a chain of CompleableFuture, only the first child CompletableFuture can get the
// original exception, others will get a CompletionException, which wraps the original
// exception. So here we unwrap it before passing it to the callback action.
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
});
}
/**
* Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
* exception is that we will call
* {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
* @see #addListener(CompletableFuture, BiConsumer)
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
future.whenCompleteAsync((resp, error) -> {
try {
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
}, executor);
}
/**
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
* the callbacks in the given {@code executor}.
*/
public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
Executor executor) {
CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
addListener(future, (r, e) -> {
if (e != null) {
wrappedFuture.completeExceptionally(e);
} else {
wrappedFuture.complete(r);
}
}, executor);
return wrappedFuture;
}
/**
* Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
*/
public static Throwable unwrapCompletionException(Throwable error) {
if (error instanceof CompletionException) {
Throwable cause = error.getCause();
if (cause != null) {
return cause;
}
}
return error;
}
/**
* A helper class for getting the result of a Future, and convert the error to an
* {@link IOException}.

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
@ -29,7 +29,6 @@ import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
@ -455,10 +454,10 @@ public final class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (sumByRegion.isEmpty()) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client.example;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -78,7 +80,7 @@ public class AsyncClientExample extends Configured implements Tool {
for (;;) {
if (future.compareAndSet(null, new CompletableFuture<>())) {
CompletableFuture<AsyncConnection> toComplete = future.get();
ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> {
addListener(ConnectionFactory.createAsyncConnection(getConf()),(conn, error) -> {
if (error != null) {
toComplete.completeExceptionally(error);
// we need to reset the future holder so we will get a chance to recreate an async
@ -98,15 +100,15 @@ public class AsyncClientExample extends Configured implements Tool {
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NONNULL_PARAM_VIOLATION",
justification="it is valid to pass NULL to CompletableFuture#completedFuture")
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
justification = "it is valid to pass NULL to CompletableFuture#completedFuture")
private CompletableFuture<Void> closeConn() {
CompletableFuture<AsyncConnection> f = future.get();
if (f == null) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
f.whenComplete((conn, error) -> {
addListener(f, (conn, error) -> {
if (error == null) {
IOUtils.closeQuietly(conn);
}
@ -136,44 +138,44 @@ public class AsyncClientExample extends Configured implements Tool {
CountDownLatch latch = new CountDownLatch(numOps);
IntStream.range(0, numOps).forEach(i -> {
CompletableFuture<AsyncConnection> future = getConn();
future.whenComplete((conn, error) -> {
addListener(future, (conn, error) -> {
if (error != null) {
LOG.warn("failed to get async connection for " + i, error);
latch.countDown();
return;
}
AsyncTable<?> table = conn.getTable(tableName, threadPool);
table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)))
.whenComplete((putResp, putErr) -> {
if (putErr != null) {
LOG.warn("put failed for " + i, putErr);
addListener(table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))),
(putResp, putErr) -> {
if (putErr != null) {
LOG.warn("put failed for " + i, putErr);
latch.countDown();
return;
}
LOG.info("put for " + i + " succeeded, try getting");
addListener(table.get(new Get(getKey(i))), (result, getErr) -> {
if (getErr != null) {
LOG.warn("get failed for " + i);
latch.countDown();
return;
}
LOG.info("put for " + i + " succeeded, try getting");
table.get(new Get(getKey(i))).whenComplete((result, getErr) -> {
if (getErr != null) {
LOG.warn("get failed for " + i);
latch.countDown();
return;
}
if (result.isEmpty()) {
LOG.warn("get failed for " + i + ", server returns empty result");
} else if (!result.containsColumn(FAMILY, QUAL)) {
LOG.warn("get failed for " + i + ", the result does not contain " +
Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
if (result.isEmpty()) {
LOG.warn("get failed for " + i + ", server returns empty result");
} else if (!result.containsColumn(FAMILY, QUAL)) {
LOG.warn("get failed for " + i + ", the result does not contain " +
Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
} else {
int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
if (v != i) {
LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
} else {
int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
if (v != i) {
LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
} else {
LOG.info("get for " + i + " succeeded");
}
LOG.info("get for " + i + " succeeded");
}
latch.countDown();
});
}
latch.countDown();
});
});
});
});
latch.await();

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client.example;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
@ -159,36 +161,38 @@ public class HttpProxyExample {
private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req);
conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)))
.whenComplete((r, e) -> {
if (e != null) {
exceptionCaught(ctx, e);
addListener(
conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
(r, e) -> {
if (e != null) {
exceptionCaught(ctx, e);
} else {
byte[] value =
r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
if (value != null) {
write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
} else {
byte[] value =
r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
if (value != null) {
write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
} else {
write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
}
write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
}
});
}
});
}
private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req);
byte[] value = new byte[req.content().readableBytes()];
req.content().readBytes(value);
conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value))
.whenComplete((r, e) -> {
if (e != null) {
exceptionCaught(ctx, e);
} else {
write(ctx, HttpResponseStatus.OK, Optional.empty());
}
});
addListener(
conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
(r, e) -> {
if (e != null) {
exceptionCaught(ctx, e);
} else {
write(ctx, HttpResponseStatus.OK, Optional.empty());
}
});
}
@Override

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
@ -348,7 +350,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
writer.sync().whenCompleteAsync((result, error) -> {
addListener(writer.sync(), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
@ -194,7 +196,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
// should not happen
throw new AssertionError(e);
}
output.flush(false).whenComplete((len, error) -> {
addListener(output.flush(false), (len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
@ -215,7 +217,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
output.writeInt(trailer.getSerializedSize());
output.write(magic);
output.flush(false).whenComplete((len, error) -> {
addListener(output.flush(false), (len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@ -75,7 +77,7 @@ public final class CombinedAsyncWriter implements AsyncWriter {
public CompletableFuture<Long> sync() {
CompletableFuture<Long> future = new CompletableFuture<>();
AtomicInteger remaining = new AtomicInteger(writers.size());
writers.forEach(w -> w.sync().whenComplete((length, error) -> {
writers.forEach(w -> addListener(w.sync(), (length, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;