HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)

This commit is contained in:
chenglei 2022-06-22 20:01:56 +08:00 committed by GitHub
parent 902bc3ed70
commit de804938b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 136 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -52,12 +53,12 @@ public class ReplicationProtobufUtil {
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/ */
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
int timeout) throws IOException { Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
} }
/** /**

View File

@ -29,18 +29,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -65,7 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
/** /**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
/** Drop edits for tables that been deleted from the replication source and target */ /** Drop edits for tables that been deleted from the replication source and target */
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
"hbase.replication.drop.on.deleted.table"; "hbase.replication.drop.on.deleted.table";
@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int maxRetriesMultiplier; private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS // Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier; private int socketTimeoutMultiplier;
// Amount of time for shutdown to wait for all tasks to complete
private long maxTerminationWait;
// Size limit for replication RPCs, in bytes // Size limit for replication RPCs, in bytes
private int replicationRpcLimit; private int replicationRpcLimit;
// Metrics for this source // Metrics for this source
private MetricsSource metrics; private MetricsSource metrics;
private boolean peersSelected = false; private boolean peersSelected = false;
private String replicationClusterId = ""; private String replicationClusterId = "";
private ThreadPoolExecutor exec;
private int maxThreads; private int maxThreads;
private Path baseNamespaceDir; private Path baseNamespaceDir;
private Path hfileArchiveDir; private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled; private boolean replicationBulkLoadDataEnabled;
private Abortable abortable;
private boolean dropOnDeletedTables; private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies; private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false; private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message // Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0; private long lastSinkFetchTime = 0;
private volatile boolean stopping = false;
@Override @Override
public void init(Context context) throws IOException { public void init(Context context) throws IOException {
@ -124,20 +113,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.socketTimeoutMultiplier =
this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
// A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
// tasks to terminate when doStop() is called.
long maxTerminationWaitMultiplier = this.conf.getLong(
"replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
this.maxTerminationWait = maxTerminationWaitMultiplier
* this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics(); this.metrics = context.getMetrics();
// per sink thread pool // per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size. // Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being // We could do with less slop if we have an accurate estimate of encoded size. Being
// conservative for now. // conservative for now.
@ -394,30 +374,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return entryList; return entryList;
} }
private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
List<List<Entry>> batches) throws IOException { throws IOException {
int futures = 0; List<CompletableFuture<Integer>> futures =
new ArrayList<CompletableFuture<Integer>>(batches.size());
for (int i = 0; i < batches.size(); i++) { for (int i = 0; i < batches.size(); i++) {
List<Entry> entries = batches.get(i); List<Entry> entries = batches.get(i);
if (!entries.isEmpty()) { if (entries.isEmpty()) {
if (LOG.isTraceEnabled()) { continue;
LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
futures++;
} }
if (LOG.isTraceEnabled()) {
LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
} }
IOException iox = null; IOException iox = null;
long lastWriteTime = 0; long lastWriteTime = 0;
for (int i = 0; i < futures; i++) {
for (CompletableFuture<Integer> f : futures) {
try { try {
// wait for all futures, remove successful parts // wait for all futures, remove successful parts
// (only the remaining parts will be retried) // (only the remaining parts will be retried)
Future<Integer> f = pool.take(); int index = FutureUtils.get(f);
int index = f.get();
List<Entry> batch = batches.get(index); List<Entry> batch = batches.get(index);
batches.set(index, Collections.emptyList()); // remove successful batch batches.set(index, Collections.emptyList()); // remove successful batch
// Find the most recent write time in the batch // Find the most recent write time in the batch
@ -425,12 +406,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
if (writeTime > lastWriteTime) { if (writeTime > lastWriteTime) {
lastWriteTime = writeTime; lastWriteTime = writeTime;
} }
} catch (InterruptedException ie) { } catch (IOException e) {
iox = new IOException(ie); iox = e;
} catch (ExecutionException ee) { } catch (RuntimeException e) {
iox = ee.getCause() instanceof IOException iox = new IOException(e);
? (IOException) ee.getCause()
: new IOException(ee.getCause());
} }
} }
if (iox != null) { if (iox != null) {
@ -445,7 +424,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
*/ */
@Override @Override
public boolean replicate(ReplicateContext replicateContext) { public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
int sleepMultiplier = 1; int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) { if (!peersSelected && this.isRunning()) {
@ -468,7 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} }
List<List<Entry>> batches = createBatches(replicateContext.getEntries()); List<List<Entry>> batches = createBatches(replicateContext.getEntries());
while (this.isRunning() && !exec.isShutdown()) { while (this.isRunning() && !this.stopping) {
if (!isPeerEnabled()) { if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
@ -477,7 +455,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} }
try { try {
// replicate the batches to sink side. // replicate the batches to sink side.
parallelReplicate(pool, replicateContext, batches); parallelReplicate(replicateContext, batches);
return true; return true;
} catch (IOException ioe) { } catch (IOException ioe) {
if (ioe instanceof RemoteException) { if (ioe instanceof RemoteException) {
@ -532,82 +510,117 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
@Override @Override
protected void doStop() { protected void doStop() {
disconnect(); // don't call super.doStop()
// Allow currently running replication tasks to finish // Allow currently running replication tasks to finish
exec.shutdown(); this.stopping = true;
try { disconnect(); // don't call super.doStop()
exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
// Abort if the tasks did not terminate in time
if (!exec.isTerminated()) {
String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
+ "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
+ "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
abortable.abort(errMsg, new IOException(errMsg));
}
notifyStopped(); notifyStopped();
} }
protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
throws IOException { int timeout) {
int entriesHashCode = System.identityHashCode(entries);
if (LOG.isTraceEnabled()) {
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
entriesHashCode, entries.size(), size, replicationClusterId);
}
SinkPeer sinkPeer = null; SinkPeer sinkPeer = null;
final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
try { try {
int entriesHashCode = System.identityHashCode(entries);
if (LOG.isTraceEnabled()) {
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = getReplicationSink(); sinkPeer = getReplicationSink();
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); } catch (IOException e) {
try { this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
ReplicationProtobufUtil.replicateWALEntry(rsAdmin, resultCompletableFuture.completeExceptionally(e);
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, return resultCompletableFuture;
hfileArchiveDir, timeout); }
if (LOG.isTraceEnabled()) { assert sinkPeer != null;
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
final SinkPeer sinkPeerToUse = sinkPeer;
FutureUtils.addListener(
ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
(response, exception) -> {
if (exception != null) {
onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
resultCompletableFuture.completeExceptionally(exception);
return;
} }
} catch (IOException e) { reportSinkSuccess(sinkPeerToUse);
if (LOG.isTraceEnabled()) { resultCompletableFuture.complete(batchIndex);
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); });
} return resultCompletableFuture;
throw e; }
}
reportSinkSuccess(sinkPeer); private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
} catch (IOException ioe) { final SinkPeer sinkPeer) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
}
if (exception instanceof IOException) {
if (sinkPeer != null) { if (sinkPeer != null) {
reportBadSink(sinkPeer); reportBadSink(sinkPeer);
} }
throw ioe;
} }
return batchIndex;
} }
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) /**
throws IOException { * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
int batchSize = 0, index = 0; * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
* send the next batch, until we send all entries out.
*/
private CompletableFuture<Integer> serialReplicateRegionEntries(
PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
if (!walEntryPeekingIterator.hasNext()) {
return CompletableFuture.completedFuture(batchIndex);
}
int batchSize = 0;
List<Entry> batch = new ArrayList<>(); List<Entry> batch = new ArrayList<>();
for (Entry entry : entries) { while (walEntryPeekingIterator.hasNext()) {
Entry entry = walEntryPeekingIterator.peek();
int entrySize = getEstimatedEntrySize(entry); int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
replicateEntries(batch, index++, timeout); break;
batch.clear();
batchSize = 0;
} }
walEntryPeekingIterator.next();
batch.add(entry); batch.add(entry);
batchSize += entrySize; batchSize += entrySize;
} }
if (batchSize > 0) {
replicateEntries(batch, index, timeout); if (batchSize <= 0) {
return CompletableFuture.completedFuture(batchIndex);
} }
return batchIndex; final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
if (exception != null) {
resultCompletableFuture.completeExceptionally(exception);
return;
}
if (!walEntryPeekingIterator.hasNext()) {
resultCompletableFuture.complete(batchIndex);
return;
}
FutureUtils.addListener(
serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
(currentResponse, currentException) -> {
if (currentException != null) {
resultCompletableFuture.completeExceptionally(currentException);
return;
}
resultCompletableFuture.complete(batchIndex);
});
});
return resultCompletableFuture;
} }
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { /**
* Replicate entries to peer cluster by async API.
*/
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
int timeout) {
return isSerial return isSerial
? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
: () -> replicateEntries(entries, batchIndex, timeout); timeout)
: replicateEntries(entries, batchIndex, timeout);
} }
private String logPeerId() { private String logPeerId() {

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
@ -267,14 +268,14 @@ public class SyncReplicationTestBase {
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
} }
if (!expectedRejection) { if (!expectedRejection) {
ReplicationProtobufUtil.replicateWALEntry( FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
} else { } else {
try { try {
ReplicationProtobufUtil.replicateWALEntry( FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
fail("Should throw IOException when sync-replication state is in A or DA"); fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) { } catch (RemoteException e) {
assertRejection(e.unwrapRemoteException()); assertRejection(e.unwrapRemoteException());

View File

@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -556,15 +556,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
int timeout) {
// Fail only once, we don't want to slow down the test. // Fail only once, we don't want to slow down the test.
if (failedOnce) { if (failedOnce) {
return () -> ordinal; return CompletableFuture.completedFuture(ordinal);
} else { } else {
failedOnce = true; failedOnce = true;
return () -> { CompletableFuture<Integer> future = new CompletableFuture<Integer>();
throw new IOException("Sample Exception: Failed to replicate."); future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
}; return future;
} }
} }
} }

View File

@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
return () -> { int timeout) {
int batchIndex = replicateEntries(entries, ordinal, timeout); return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
entriesCount += entries.size(); entriesCount += entries.size();
int count = batchCount.incrementAndGet(); int count = batchCount.incrementAndGet();
LOG.info( LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
return batchIndex; });
};
} }
} }
@ -245,20 +245,23 @@ public class TestReplicator extends TestReplicationBase {
private final AtomicBoolean failNext = new AtomicBoolean(false); private final AtomicBoolean failNext = new AtomicBoolean(false);
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
return () -> { int timeout) {
if (failNext.compareAndSet(false, true)) {
int batchIndex = replicateEntries(entries, ordinal, timeout); if (failNext.compareAndSet(false, true)) {
return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
entriesCount += entries.size(); entriesCount += entries.size();
int count = batchCount.incrementAndGet(); int count = batchCount.incrementAndGet();
LOG.info( LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
return batchIndex; });
} else if (failNext.compareAndSet(true, false)) { } else if (failNext.compareAndSet(true, false)) {
throw new ServiceException("Injected failure"); CompletableFuture<Integer> future = new CompletableFuture<Integer>();
} future.completeExceptionally(new ServiceException("Injected failure"));
return ordinal; return future;
}; }
return CompletableFuture.completedFuture(ordinal);
} }
} }
} }

View File

@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -165,11 +165,10 @@ public class TestSerialReplicationEndpoint {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
return () -> { int timeout) {
entryQueue.addAll(entries); entryQueue.addAll(entries);
return ordinal; return CompletableFuture.completedFuture(ordinal);
};
} }
@Override @Override