HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly

This commit is contained in:
zhangduo 2019-02-02 21:24:18 +08:00
parent ce8214ca36
commit 54a8cc87c6
24 changed files with 275 additions and 216 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -81,7 +83,7 @@ public class AsyncMetaTableAccessor {
long time = EnvironmentEdgeManager.currentTime(); long time = EnvironmentEdgeManager.currentTime();
try { try {
get.setTimeRange(0, time); get.setTimeRange(0, time);
metaTable.get(get).whenComplete((result, error) -> { addListener(metaTable.get(get), (result, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
@ -109,16 +111,14 @@ public class AsyncMetaTableAccessor {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
try { try {
RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
metaTable.get( addListener(metaTable.get(new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo)) .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
.addFamily(HConstants.CATALOG_FAMILY)).whenComplete(
(r, err) -> {
if (err != null) { if (err != null) {
future.completeExceptionally(err); future.completeExceptionally(err);
return; return;
} }
future.complete(getRegionLocations(r).map( future.complete(getRegionLocations(r)
locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
}); });
} catch (IOException parseEx) { } catch (IOException parseEx) {
LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName)); LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
@ -136,34 +136,29 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName( public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
AsyncTable<?> metaTable, byte[] encodedRegionName) { AsyncTable<?> metaTable, byte[] encodedRegionName) {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) addListener(
.whenComplete( metaTable
(results, err) -> { .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
if (err != null) { (results, err) -> {
future.completeExceptionally(err); if (err != null) {
return; future.completeExceptionally(err);
} return;
String encodedRegionNameStr = Bytes.toString(encodedRegionName); }
results String encodedRegionNameStr = Bytes.toString(encodedRegionName);
.stream() results.stream().filter(result -> !result.isEmpty())
.filter(result -> !result.isEmpty()) .filter(result -> MetaTableAccessor.getRegionInfo(result) != null).forEach(result -> {
.filter(result -> MetaTableAccessor.getRegionInfo(result) != null) getRegionLocations(result).ifPresent(locations -> {
.forEach( for (HRegionLocation location : locations.getRegionLocations()) {
result -> { if (location != null &&
getRegionLocations(result).ifPresent( encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
locations -> { future.complete(Optional.of(location));
for (HRegionLocation location : locations.getRegionLocations()) { return;
if (location != null }
&& encodedRegionNameStr.equals(location.getRegion() }
.getEncodedName())) { });
future.complete(Optional.of(location));
return;
}
}
});
});
future.complete(Optional.empty());
}); });
future.complete(Optional.empty());
});
return future; return future;
} }
@ -190,19 +185,18 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) { AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
getTableRegionsAndLocations(metaTable, tableName, true).whenComplete( addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
(locations, err) -> { if (err != null) {
if (err != null) { future.completeExceptionally(err);
future.completeExceptionally(err); } else if (locations == null || locations.isEmpty()) {
} else if (locations == null || locations.isEmpty()) { future.complete(Collections.emptyList());
future.complete(Collections.emptyList()); } else {
} else { List<HRegionLocation> regionLocations =
List<HRegionLocation> regionLocations = locations.stream() locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
.map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) .collect(Collectors.toList());
.collect(Collectors.toList()); future.complete(regionLocations);
future.complete(regionLocations); }
} });
});
return future; return future;
} }
@ -254,7 +248,7 @@ public class AsyncMetaTableAccessor {
} }
}; };
scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> { addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -61,7 +63,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
return; return;
} }
resetCallTimeout(); resetCallTimeout();
callable.call(controller, adminStub).whenComplete((result, error) -> { addListener(callable.call(controller, adminStub), (result, error) -> {
if (error != null) { if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> { onError(error, () -> "Call to admin stub failed", err -> {
}); });

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -409,7 +410,7 @@ class AsyncBatchRpcRetryingCaller<T> {
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) { if (error != null) {
error = translateException(error); error = unwrapCompletionException(translateException(error));
if (error instanceof DoNotRetryIOException) { if (error instanceof DoNotRetryIOException) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
return; return;

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -25,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -96,7 +97,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator(); Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) { for (CompletableFuture<?> future : table.batch(toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next(); CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
future.whenComplete((r, e) -> { addListener(future, (r, e) -> {
if (e != null) { if (e != null) {
toCompleteFuture.completeExceptionally(e); toCompleteFuture.completeExceptionally(e);
} else { } else {

View File

@ -188,7 +188,7 @@ class AsyncConnectionImpl implements AsyncConnection {
} }
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) { private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
registry.getMasterAddress().whenComplete((sn, error) -> { addListener(registry.getMasterAddress(), (sn, error) -> {
if (sn == null) { if (sn == null) {
String msg = "ZooKeeper available but no active master location found"; String msg = "ZooKeeper available but no active master location found";
LOG.info(msg); LOG.info(msg);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.security.access.UserPermission; import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -67,15 +68,7 @@ class AsyncHBaseAdmin implements AsyncAdmin {
} }
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
CompletableFuture<T> asyncFuture = new CompletableFuture<>(); return FutureUtils.wrapFuture(future, pool);
future.whenCompleteAsync((r, e) -> {
if (e != null) {
asyncFuture.completeExceptionally(e);
} else {
asyncFuture.complete(r);
}
}, pool);
return asyncFuture;
} }
@Override @Override

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -43,20 +45,20 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs, Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) { long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt); startLogErrorsCnt);
this.callable = callable; this.callable = callable;
} }
@Override @Override
protected void doCall() { protected void doCall() {
conn.getMasterStub().whenComplete((stub, error) -> { addListener(conn.getMasterStub(), (stub, error) -> {
if (error != null) { if (error != null) {
onError(error, () -> "Get async master stub failed", err -> { onError(error, () -> "Get async master stub failed", err -> {
}); });
return; return;
} }
resetCallTimeout(); resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error2) -> { addListener(callable.call(controller, stub), (result, error2) -> {
if (error2 != null) { if (error2 != null) {
onError(error2, () -> "Call to master failed", err -> { onError(error2, () -> "Call to master failed", err -> {
}); });

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -71,7 +72,7 @@ class AsyncMetaRegionLocator {
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
LOG.debug("Start fetching meta region location from registry."); LOG.debug("Start fetching meta region location from registry.");
CompletableFuture<RegionLocations> future = metaRelocateFuture.get(); CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
registry.getMetaRegionLocation().whenComplete((locs, error) -> { addListener(registry.getMetaRegionLocation(), (locs, error) -> {
if (error != null) { if (error != null) {
LOG.debug("Failed to fetch meta region location from registry", error); LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error); metaRelocateFuture.getAndSet(null).completeExceptionally(error);

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -62,7 +64,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
return; return;
} }
resetCallTimeout(); resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error) -> { addListener(callable.call(controller, stub), (result, error) -> {
if (error != null) { if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> { onError(error, () -> "Call to admin stub failed", err -> {
}); });

View File

@ -79,7 +79,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
return; return;
} }
resetCallTimeout(); resetCallTimeout();
callable.call(controller, loc, stub).whenComplete((result, error) -> { addListener(callable.call(controller, loc, stub), (result, error) -> {
if (error != null) { if (error != null) {
onError(error, onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -87,15 +88,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
} }
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
CompletableFuture<T> asyncFuture = new CompletableFuture<>(); return FutureUtils.wrapFuture(future, pool);
future.whenCompleteAsync((r, e) -> {
if (e != null) {
asyncFuture.completeExceptionally(e);
} else {
asyncFuture.complete(r);
}
}, pool);
return asyncFuture;
} }
@Override @Override

View File

@ -18,19 +18,20 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
@ -282,7 +283,7 @@ public class ConnectionFactory {
final User user) { final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
registry.getClusterId().whenComplete((clusterId, error) -> { addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
@ -295,9 +296,8 @@ public class ConnectionFactory {
AsyncConnectionImpl.class, AsyncConnection.class); AsyncConnectionImpl.class, AsyncConnection.class);
try { try {
future.complete( future.complete(
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() -> user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user)) .newInstance(clazz, conf, registry, clusterId, user)));
);
} catch (Exception e) { } catch (Exception e) {
future.completeExceptionally(e); future.completeExceptionally(e);
} }

View File

@ -17,22 +17,23 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/** /**
* The implementation of a master based coprocessor rpc channel. * The implementation of a master based coprocessor rpc channel.
@ -75,12 +76,13 @@ class MasterCoprocessorRpcChannelImpl implements RpcChannel {
@Override @Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request, public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) { Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() addListener(
.whenComplete(((r, e) -> { callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
if (e != null) { ((r, e) -> {
((ClientCoprocessorRpcController) controller).setFailed(e); if (e != null) {
} ((ClientCoprocessorRpcController) controller).setFailed(e);
done.run(r); }
})); done.run(r);
}));
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcChannel;
@ -412,12 +413,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) { ProcedureBiConsumer consumer) {
CompletableFuture<Long> procFuture = this CompletableFuture<Long> procFuture =
.<Long> newMasterCaller() this.<Long> newMasterCaller().action((controller, stub) -> this
.action( .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
(controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, CompletableFuture<Void> future = waitProcedureResult(procFuture);
respConverter)).call(); addListener(future, consumer);
return waitProcedureResult(procFuture).whenComplete(consumer); return future;
} }
@FunctionalInterface @FunctionalInterface
@ -2879,7 +2880,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
// If any region compaction state is MAJOR_AND_MINOR // If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too. // the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(unwrapCompletionException(err2));
} else if (regionState == CompactionState.MAJOR_AND_MINOR) { } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState); future.complete(regionState);
} else { } else {
@ -3026,7 +3027,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
serverNames.stream().forEach(serverName -> { serverNames.stream().forEach(serverName -> {
futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(unwrapCompletionException(err2));
} else { } else {
serverStates.put(serverName, serverState); serverStates.put(serverName, serverState);
} }
@ -3545,7 +3546,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
futures futures
.add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(unwrapCompletionException(err2));
} else { } else {
aggregator.append(stats); aggregator.append(stats);
} }
@ -3554,7 +3555,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(ret, err3) -> { (ret, err3) -> {
if (err3 != null) { if (err3 != null) {
future.completeExceptionally(err3); future.completeExceptionally(unwrapCompletionException(err3));
} else { } else {
future.complete(aggregator.sum()); future.complete(aggregator.sum());
} }

View File

@ -583,7 +583,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e)); locateFinished, unfinishedRequest, l, e));
} }
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
if (e != null) { if (e != null) {
callback.onRegionError(region, e); callback.onRegionError(region, e);
} else { } else {

View File

@ -17,10 +17,16 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -33,12 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/** /**
* The implementation of a region based coprocessor rpc channel. * The implementation of a region based coprocessor rpc channel.
*/ */
@ -102,16 +102,16 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
@Override @Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request, public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) { Message responsePrototype, RpcCallback<Message> done) {
conn.callerFactory.<Message> single().table(tableName).row(row) addListener(
conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call() .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
.whenComplete((r, e) -> { (r, e) -> {
if (e != null) { if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e); ((ClientCoprocessorRpcController) controller).setFailed(e);
} }
done.run(r); done.run(r);
}); });
} }
} }

View File

@ -17,22 +17,23 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
/** /**
* The implementation of a region server based coprocessor rpc channel. * The implementation of a region server based coprocessor rpc channel.
@ -75,12 +76,13 @@ public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
@Override @Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request, public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) { Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() addListener(
.whenComplete(((r, e) -> { callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
if (e != null) { ((r, e) -> {
((ClientCoprocessorRpcController) controller).setFailed(e); if (e != null) {
} ((ClientCoprocessorRpcController) controller).setFailed(e);
done.run(r); }
})); done.run(r);
}));
} }
} }

View File

@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica; import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic; import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@ -68,7 +70,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) { private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>(); CompletableFuture<T> future = new CompletableFuture<>();
zk.get(path).whenComplete((data, error) -> { addListener(zk.get(path), (data, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
@ -139,7 +141,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
MutableInt remaining = new MutableInt(locs.length); MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> { znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) { if (replicaId == DEFAULT_REPLICA_ID) {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
return; return;
@ -154,13 +156,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
new IOException("Meta region is in state " + stateAndServerName.getFirst())); new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return; return;
} }
locs[DEFAULT_REPLICA_ID] = locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
stateAndServerName.getSecond());
tryComplete(remaining, locs, future); tryComplete(remaining, locs, future);
}); });
} else { } else {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> { addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) { if (future.isDone()) {
return; return;
} }
@ -174,12 +175,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto); Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) { if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state " + LOG.warn("Meta region for replica " + replicaId + " is in state " +
stateAndServerName.getFirst()); stateAndServerName.getFirst());
locs[replicaId] = null; locs[replicaId] = null;
} else { } else {
locs[replicaId] = locs[replicaId] =
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId), new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond()); stateAndServerName.getSecond());
} }
} }
tryComplete(remaining, locs, future); tryComplete(remaining, locs, future);

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -57,13 +59,66 @@ public final class FutureUtils {
BiConsumer<? super T, ? super Throwable> action) { BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> { future.whenComplete((resp, error) -> {
try { try {
action.accept(resp, error); // See this post on stack overflow(shorten since the url is too long),
// https://s.apache.org/completionexception
// For a chain of CompleableFuture, only the first child CompletableFuture can get the
// original exception, others will get a CompletionException, which wraps the original
// exception. So here we unwrap it before passing it to the callback action.
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t); LOG.error("Unexpected error caught when processing CompletableFuture", t);
} }
}); });
} }
/**
* Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
* exception is that we will call
* {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
* @see #addListener(CompletableFuture, BiConsumer)
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
future.whenCompleteAsync((resp, error) -> {
try {
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
}, executor);
}
/**
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
* the callbacks in the given {@code executor}.
*/
public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
Executor executor) {
CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
addListener(future, (r, e) -> {
if (e != null) {
wrappedFuture.completeExceptionally(e);
} else {
wrappedFuture.complete(r);
}
}, executor);
return wrappedFuture;
}
/**
* Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
*/
public static Throwable unwrapCompletionException(Throwable error) {
if (error instanceof CompletionException) {
Throwable cause = error.getCause();
if (cause != null) {
return cause;
}
}
return error;
}
/** /**
* A helper class for getting the result of a Future, and convert the error to an * A helper class for getting the result of a Future, and convert the error to an
* {@link IOException}. * {@link IOException}.

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -29,7 +29,6 @@ import java.util.NavigableSet;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
@ -455,10 +454,10 @@ public final class AsyncAggregationClient {
} }
public static <R, S, P extends Message, Q extends Message, T extends Message> public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table, CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>(); CompletableFuture<R> future = new CompletableFuture<>();
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else if (sumByRegion.isEmpty()) { } else if (sumByRegion.isEmpty()) {

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client.example; package org.apache.hadoop.hbase.client.example;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -78,7 +80,7 @@ public class AsyncClientExample extends Configured implements Tool {
for (;;) { for (;;) {
if (future.compareAndSet(null, new CompletableFuture<>())) { if (future.compareAndSet(null, new CompletableFuture<>())) {
CompletableFuture<AsyncConnection> toComplete = future.get(); CompletableFuture<AsyncConnection> toComplete = future.get();
ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> { addListener(ConnectionFactory.createAsyncConnection(getConf()),(conn, error) -> {
if (error != null) { if (error != null) {
toComplete.completeExceptionally(error); toComplete.completeExceptionally(error);
// we need to reset the future holder so we will get a chance to recreate an async // we need to reset the future holder so we will get a chance to recreate an async
@ -98,15 +100,15 @@ public class AsyncClientExample extends Configured implements Tool {
} }
} }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NONNULL_PARAM_VIOLATION", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
justification="it is valid to pass NULL to CompletableFuture#completedFuture") justification = "it is valid to pass NULL to CompletableFuture#completedFuture")
private CompletableFuture<Void> closeConn() { private CompletableFuture<Void> closeConn() {
CompletableFuture<AsyncConnection> f = future.get(); CompletableFuture<AsyncConnection> f = future.get();
if (f == null) { if (f == null) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
CompletableFuture<Void> closeFuture = new CompletableFuture<>(); CompletableFuture<Void> closeFuture = new CompletableFuture<>();
f.whenComplete((conn, error) -> { addListener(f, (conn, error) -> {
if (error == null) { if (error == null) {
IOUtils.closeQuietly(conn); IOUtils.closeQuietly(conn);
} }
@ -136,44 +138,44 @@ public class AsyncClientExample extends Configured implements Tool {
CountDownLatch latch = new CountDownLatch(numOps); CountDownLatch latch = new CountDownLatch(numOps);
IntStream.range(0, numOps).forEach(i -> { IntStream.range(0, numOps).forEach(i -> {
CompletableFuture<AsyncConnection> future = getConn(); CompletableFuture<AsyncConnection> future = getConn();
future.whenComplete((conn, error) -> { addListener(future, (conn, error) -> {
if (error != null) { if (error != null) {
LOG.warn("failed to get async connection for " + i, error); LOG.warn("failed to get async connection for " + i, error);
latch.countDown(); latch.countDown();
return; return;
} }
AsyncTable<?> table = conn.getTable(tableName, threadPool); AsyncTable<?> table = conn.getTable(tableName, threadPool);
table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))) addListener(table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))),
.whenComplete((putResp, putErr) -> { (putResp, putErr) -> {
if (putErr != null) { if (putErr != null) {
LOG.warn("put failed for " + i, putErr); LOG.warn("put failed for " + i, putErr);
latch.countDown();
return;
}
LOG.info("put for " + i + " succeeded, try getting");
addListener(table.get(new Get(getKey(i))), (result, getErr) -> {
if (getErr != null) {
LOG.warn("get failed for " + i);
latch.countDown(); latch.countDown();
return; return;
} }
LOG.info("put for " + i + " succeeded, try getting"); if (result.isEmpty()) {
table.get(new Get(getKey(i))).whenComplete((result, getErr) -> { LOG.warn("get failed for " + i + ", server returns empty result");
if (getErr != null) { } else if (!result.containsColumn(FAMILY, QUAL)) {
LOG.warn("get failed for " + i); LOG.warn("get failed for " + i + ", the result does not contain " +
latch.countDown(); Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
return; } else {
} int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
if (result.isEmpty()) { if (v != i) {
LOG.warn("get failed for " + i + ", server returns empty result"); LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
} else if (!result.containsColumn(FAMILY, QUAL)) { ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
LOG.warn("get failed for " + i + ", the result does not contain " +
Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
} else { } else {
int v = Bytes.toInt(result.getValue(FAMILY, QUAL)); LOG.info("get for " + i + " succeeded");
if (v != i) {
LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
} else {
LOG.info("get for " + i + " succeeded");
}
} }
latch.countDown(); }
}); latch.countDown();
}); });
});
}); });
}); });
latch.await(); latch.await();

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client.example; package org.apache.hadoop.hbase.client.example;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Optional; import java.util.Optional;
@ -159,36 +161,38 @@ public class HttpProxyExample {
private void get(ChannelHandlerContext ctx, FullHttpRequest req) { private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req); Params params = parse(req);
conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) addListener(
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))) conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
.whenComplete((r, e) -> { .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
if (e != null) { (r, e) -> {
exceptionCaught(ctx, e); if (e != null) {
exceptionCaught(ctx, e);
} else {
byte[] value =
r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
if (value != null) {
write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
} else { } else {
byte[] value = write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
if (value != null) {
write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
} else {
write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
}
} }
}); }
});
} }
private void put(ChannelHandlerContext ctx, FullHttpRequest req) { private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req); Params params = parse(req);
byte[] value = new byte[req.content().readableBytes()]; byte[] value = new byte[req.content().readableBytes()];
req.content().readBytes(value); req.content().readBytes(value);
conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) addListener(
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)) conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
.whenComplete((r, e) -> { .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
if (e != null) { (r, e) -> {
exceptionCaught(ctx, e); if (e != null) {
} else { exceptionCaught(ctx, e);
write(ctx, HttpResponseStatus.OK, Optional.empty()); } else {
} write(ctx, HttpResponseStatus.OK, Optional.empty());
}); }
});
} }
@Override @Override

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.Sequencer;
@ -348,7 +350,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime(); final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L; final long epoch = (long) epochAndState >>> 2L;
writer.sync().whenCompleteAsync((result, error) -> { addListener(writer.sync(), (result, error) -> {
if (error != null) { if (error != null) {
syncFailed(epoch, error); syncFailed(epoch, error);
} else { } else {

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -194,7 +196,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
// should not happen // should not happen
throw new AssertionError(e); throw new AssertionError(e);
} }
output.flush(false).whenComplete((len, error) -> { addListener(output.flush(false), (len, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else { } else {
@ -215,7 +217,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
} }
output.writeInt(trailer.getSerializedSize()); output.writeInt(trailer.getSerializedSize());
output.write(magic); output.write(magic);
output.flush(false).whenComplete((len, error) -> { addListener(output.flush(false), (len, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);
} else { } else {