HBASE-14953 Replication: retry on RejectedExecutionException
In HBaseInterClusterReplicationEndpoint, we fail the whole batch in case of a RejectedExecutionException on an individual sub-batch. We should let the submitted sub-batches finish and retry only for the remaining ones. Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
22b95aebcd
commit
453a66c3b9
|
@ -29,7 +29,7 @@ import java.util.concurrent.CompletionService;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -113,8 +113,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
// per sink thread pool
|
||||
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
||||
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
||||
this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>());
|
||||
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>());
|
||||
this.exec.allowCoreThreadTimeOut(true);
|
||||
|
||||
this.replicationBulkLoadDataEnabled =
|
||||
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
|
|
Loading…
Reference in New Issue