diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 624ded6899f..7c07ecc3ada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -154,6 +157,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi */ @Override public boolean replicate(ReplicateContext replicateContext) { + CompletionService pool = new ExecutorCompletionService(this.exec); List entries = replicateContext.getEntries(); String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; @@ -195,7 +199,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi " entries of total size " + replicateContext.getSize()); } - List> futures = new ArrayList>(entryLists.size()); + int futures = 0; for (int i=0; i= 0; index--) { + + for (int i=0; i f = futures.get(index); - entryLists.remove(f.get().intValue()); + Future f = pool.take(); + entryLists.set(f.get().intValue(), Collections.emptyList()); } catch (InterruptedException ie) { iox = new IOException(ie); } catch (ExecutionException ee) {