HBASE-14777 second addendum, better fix using CompletionService.

This commit is contained in:
Lars Hofhansl 2015-11-25 21:17:56 -08:00
parent e73a9594c2
commit 6531b465a7
1 changed files with 11 additions and 5 deletions

View File

@ -22,9 +22,12 @@ import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -154,6 +157,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
*/ */
@Override @Override
public boolean replicate(ReplicateContext replicateContext) { public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
List<Entry> entries = replicateContext.getEntries(); List<Entry> entries = replicateContext.getEntries();
String walGroupId = replicateContext.getWalGroupId(); String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1; int sleepMultiplier = 1;
@ -195,7 +199,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
" entries of total size " + replicateContext.getSize()); " entries of total size " + replicateContext.getSize());
} }
List<Future<Integer>> futures = new ArrayList<Future<Integer>>(entryLists.size()); int futures = 0;
for (int i=0; i<entryLists.size(); i++) { for (int i=0; i<entryLists.size(); i++) {
if (!entryLists.get(i).isEmpty()) { if (!entryLists.get(i).isEmpty()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -203,16 +207,18 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
" entries of total size " + replicateContext.getSize()); " entries of total size " + replicateContext.getSize());
} }
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
futures.add(exec.submit(createReplicator(entryLists.get(i), i))); pool.submit(createReplicator(entryLists.get(i), i));
futures++;
} }
} }
IOException iox = null; IOException iox = null;
for (int index = futures.size() - 1; index >= 0; index--) {
for (int i=0; i<futures; i++) {
try { try {
// wait for all futures, remove successful parts // wait for all futures, remove successful parts
// (only the remaining parts will be retried) // (only the remaining parts will be retried)
Future<Integer> f = futures.get(index); Future<Integer> f = pool.take();
entryLists.remove(f.get().intValue()); entryLists.set(f.get().intValue(), Collections.<Entry>emptyList());
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
iox = new IOException(ie); iox = new IOException(ie);
} catch (ExecutionException ee) { } catch (ExecutionException ee) {