HBASE-27207 ConnectionUtils.allOf should be moved to FutureUtils (#4627)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2022-07-15 17:04:03 +08:00 committed by GitHub
parent 01b45e285d
commit 02f26368e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 6 deletions

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

View File

@ -306,11 +306,6 @@ public final class ConnectionUtils {
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
} }
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
public static ScanResultCache createScanResultCache(Scan scan) { public static ScanResultCache createScanResultCache(Scan scan) {
if (scan.getAllowPartialResults()) { if (scan.getAllowPartialResults()) {
return new AllowPartialScanResultCache(); return new AllowPartialScanResultCache();

View File

@ -17,8 +17,11 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import static java.util.stream.Collectors.toList;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -197,4 +200,16 @@ public final class FutureUtils {
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;
} }
/**
* Returns a new CompletableFuture that is completed when all of the given CompletableFutures
* complete. If any of the given CompletableFutures complete exceptionally, then the returned
* CompletableFuture also does so, with a CompletionException holding this exception as its cause.
* Otherwise, the results of all given CompletableFutures could be obtained by the new returned
* CompletableFuture.
*/
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
} }