HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)
* HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection
This commit is contained in:
parent
9da2a3fead
commit
34ba2c51cf
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||||
import org.apache.hadoop.hbase.io.SizedCellScanner;
|
import org.apache.hadoop.hbase.io.SizedCellScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
|
|
||||||
|
@ -52,12 +53,12 @@ public class ReplicationProtobufUtil {
|
||||||
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
|
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
|
||||||
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
|
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
|
||||||
*/
|
*/
|
||||||
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
|
public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
|
||||||
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
|
AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
|
||||||
int timeout) throws IOException {
|
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
|
||||||
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
|
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
|
||||||
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
|
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
|
||||||
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
|
return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,18 +29,11 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -65,7 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
|
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
|
||||||
|
@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
|
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
|
||||||
|
|
||||||
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
|
|
||||||
|
|
||||||
/** Drop edits for tables that been deleted from the replication source and target */
|
/** Drop edits for tables that been deleted from the replication source and target */
|
||||||
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
|
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
|
||||||
"hbase.replication.drop.on.deleted.table";
|
"hbase.replication.drop.on.deleted.table";
|
||||||
|
@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
private int maxRetriesMultiplier;
|
private int maxRetriesMultiplier;
|
||||||
// Socket timeouts require even bolder actions since we don't want to DDOS
|
// Socket timeouts require even bolder actions since we don't want to DDOS
|
||||||
private int socketTimeoutMultiplier;
|
private int socketTimeoutMultiplier;
|
||||||
// Amount of time for shutdown to wait for all tasks to complete
|
|
||||||
private long maxTerminationWait;
|
|
||||||
// Size limit for replication RPCs, in bytes
|
// Size limit for replication RPCs, in bytes
|
||||||
private int replicationRpcLimit;
|
private int replicationRpcLimit;
|
||||||
// Metrics for this source
|
// Metrics for this source
|
||||||
private MetricsSource metrics;
|
private MetricsSource metrics;
|
||||||
private boolean peersSelected = false;
|
private boolean peersSelected = false;
|
||||||
private String replicationClusterId = "";
|
private String replicationClusterId = "";
|
||||||
private ThreadPoolExecutor exec;
|
|
||||||
private int maxThreads;
|
private int maxThreads;
|
||||||
private Path baseNamespaceDir;
|
private Path baseNamespaceDir;
|
||||||
private Path hfileArchiveDir;
|
private Path hfileArchiveDir;
|
||||||
private boolean replicationBulkLoadDataEnabled;
|
private boolean replicationBulkLoadDataEnabled;
|
||||||
private Abortable abortable;
|
|
||||||
private boolean dropOnDeletedTables;
|
private boolean dropOnDeletedTables;
|
||||||
private boolean dropOnDeletedColumnFamilies;
|
private boolean dropOnDeletedColumnFamilies;
|
||||||
private boolean isSerial = false;
|
private boolean isSerial = false;
|
||||||
// Initialising as 0 to guarantee at least one logging message
|
// Initialising as 0 to guarantee at least one logging message
|
||||||
private long lastSinkFetchTime = 0;
|
private long lastSinkFetchTime = 0;
|
||||||
|
private volatile boolean stopping = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Context context) throws IOException {
|
public void init(Context context) throws IOException {
|
||||||
|
@ -124,20 +113,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
|
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
|
||||||
this.socketTimeoutMultiplier =
|
this.socketTimeoutMultiplier =
|
||||||
this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
|
this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
|
||||||
// A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
|
|
||||||
// tasks to terminate when doStop() is called.
|
|
||||||
long maxTerminationWaitMultiplier = this.conf.getLong(
|
|
||||||
"replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
|
|
||||||
this.maxTerminationWait = maxTerminationWaitMultiplier
|
|
||||||
* this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
|
||||||
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
|
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
|
||||||
this.metrics = context.getMetrics();
|
this.metrics = context.getMetrics();
|
||||||
// per sink thread pool
|
// per sink thread pool
|
||||||
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
||||||
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
||||||
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
|
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
|
|
||||||
this.abortable = ctx.getAbortable();
|
|
||||||
// Set the size limit for replication RPCs to 95% of the max request size.
|
// Set the size limit for replication RPCs to 95% of the max request size.
|
||||||
// We could do with less slop if we have an accurate estimate of encoded size. Being
|
// We could do with less slop if we have an accurate estimate of encoded size. Being
|
||||||
// conservative for now.
|
// conservative for now.
|
||||||
|
@ -394,30 +374,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
return entryList;
|
return entryList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
|
private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
|
||||||
List<List<Entry>> batches) throws IOException {
|
throws IOException {
|
||||||
int futures = 0;
|
List<CompletableFuture<Integer>> futures =
|
||||||
|
new ArrayList<CompletableFuture<Integer>>(batches.size());
|
||||||
for (int i = 0; i < batches.size(); i++) {
|
for (int i = 0; i < batches.size(); i++) {
|
||||||
List<Entry> entries = batches.get(i);
|
List<Entry> entries = batches.get(i);
|
||||||
if (!entries.isEmpty()) {
|
if (entries.isEmpty()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
continue;
|
||||||
LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
|
|
||||||
replicateContext.getSize());
|
|
||||||
}
|
|
||||||
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
|
||||||
pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
|
|
||||||
futures++;
|
|
||||||
}
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
|
||||||
|
replicateContext.getSize());
|
||||||
|
}
|
||||||
|
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
||||||
|
futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
|
||||||
}
|
}
|
||||||
|
|
||||||
IOException iox = null;
|
IOException iox = null;
|
||||||
long lastWriteTime = 0;
|
long lastWriteTime = 0;
|
||||||
for (int i = 0; i < futures; i++) {
|
|
||||||
|
for (CompletableFuture<Integer> f : futures) {
|
||||||
try {
|
try {
|
||||||
// wait for all futures, remove successful parts
|
// wait for all futures, remove successful parts
|
||||||
// (only the remaining parts will be retried)
|
// (only the remaining parts will be retried)
|
||||||
Future<Integer> f = pool.take();
|
int index = FutureUtils.get(f);
|
||||||
int index = f.get();
|
|
||||||
List<Entry> batch = batches.get(index);
|
List<Entry> batch = batches.get(index);
|
||||||
batches.set(index, Collections.emptyList()); // remove successful batch
|
batches.set(index, Collections.emptyList()); // remove successful batch
|
||||||
// Find the most recent write time in the batch
|
// Find the most recent write time in the batch
|
||||||
|
@ -425,12 +406,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
if (writeTime > lastWriteTime) {
|
if (writeTime > lastWriteTime) {
|
||||||
lastWriteTime = writeTime;
|
lastWriteTime = writeTime;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (IOException e) {
|
||||||
iox = new IOException(ie);
|
iox = e;
|
||||||
} catch (ExecutionException ee) {
|
} catch (RuntimeException e) {
|
||||||
iox = ee.getCause() instanceof IOException
|
iox = new IOException(e);
|
||||||
? (IOException) ee.getCause()
|
|
||||||
: new IOException(ee.getCause());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (iox != null) {
|
if (iox != null) {
|
||||||
|
@ -445,7 +424,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean replicate(ReplicateContext replicateContext) {
|
public boolean replicate(ReplicateContext replicateContext) {
|
||||||
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
|
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
|
|
||||||
if (!peersSelected && this.isRunning()) {
|
if (!peersSelected && this.isRunning()) {
|
||||||
|
@ -468,7 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
}
|
}
|
||||||
|
|
||||||
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
|
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
|
||||||
while (this.isRunning() && !exec.isShutdown()) {
|
while (this.isRunning() && !this.stopping) {
|
||||||
if (!isPeerEnabled()) {
|
if (!isPeerEnabled()) {
|
||||||
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
|
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
|
||||||
sleepMultiplier++;
|
sleepMultiplier++;
|
||||||
|
@ -477,7 +455,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// replicate the batches to sink side.
|
// replicate the batches to sink side.
|
||||||
parallelReplicate(pool, replicateContext, batches);
|
parallelReplicate(replicateContext, batches);
|
||||||
return true;
|
return true;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (ioe instanceof RemoteException) {
|
if (ioe instanceof RemoteException) {
|
||||||
|
@ -532,82 +510,117 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() {
|
protected void doStop() {
|
||||||
disconnect(); // don't call super.doStop()
|
|
||||||
// Allow currently running replication tasks to finish
|
// Allow currently running replication tasks to finish
|
||||||
exec.shutdown();
|
this.stopping = true;
|
||||||
try {
|
disconnect(); // don't call super.doStop()
|
||||||
exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
// Abort if the tasks did not terminate in time
|
|
||||||
if (!exec.isTerminated()) {
|
|
||||||
String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
|
|
||||||
+ "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
|
|
||||||
+ "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
|
|
||||||
abortable.abort(errMsg, new IOException(errMsg));
|
|
||||||
}
|
|
||||||
notifyStopped();
|
notifyStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
|
protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
|
||||||
throws IOException {
|
int timeout) {
|
||||||
|
int entriesHashCode = System.identityHashCode(entries);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
|
||||||
|
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
|
||||||
|
entriesHashCode, entries.size(), size, replicationClusterId);
|
||||||
|
}
|
||||||
SinkPeer sinkPeer = null;
|
SinkPeer sinkPeer = null;
|
||||||
|
final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
|
||||||
try {
|
try {
|
||||||
int entriesHashCode = System.identityHashCode(entries);
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
|
|
||||||
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
|
|
||||||
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
|
|
||||||
}
|
|
||||||
sinkPeer = getReplicationSink();
|
sinkPeer = getReplicationSink();
|
||||||
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
|
} catch (IOException e) {
|
||||||
try {
|
this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
|
||||||
ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
|
resultCompletableFuture.completeExceptionally(e);
|
||||||
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
|
return resultCompletableFuture;
|
||||||
hfileArchiveDir, timeout);
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
assert sinkPeer != null;
|
||||||
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
|
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
|
||||||
|
final SinkPeer sinkPeerToUse = sinkPeer;
|
||||||
|
FutureUtils.addListener(
|
||||||
|
ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
|
||||||
|
replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
|
||||||
|
(response, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
|
||||||
|
resultCompletableFuture.completeExceptionally(exception);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
reportSinkSuccess(sinkPeerToUse);
|
||||||
if (LOG.isTraceEnabled()) {
|
resultCompletableFuture.complete(batchIndex);
|
||||||
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
|
});
|
||||||
}
|
return resultCompletableFuture;
|
||||||
throw e;
|
}
|
||||||
}
|
|
||||||
reportSinkSuccess(sinkPeer);
|
private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
|
||||||
} catch (IOException ioe) {
|
final SinkPeer sinkPeer) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
|
||||||
|
}
|
||||||
|
if (exception instanceof IOException) {
|
||||||
if (sinkPeer != null) {
|
if (sinkPeer != null) {
|
||||||
reportBadSink(sinkPeer);
|
reportBadSink(sinkPeer);
|
||||||
}
|
}
|
||||||
throw ioe;
|
|
||||||
}
|
}
|
||||||
return batchIndex;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
|
/**
|
||||||
throws IOException {
|
* Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
|
||||||
int batchSize = 0, index = 0;
|
* WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
|
||||||
|
* send the next batch, until we send all entries out.
|
||||||
|
*/
|
||||||
|
private CompletableFuture<Integer> serialReplicateRegionEntries(
|
||||||
|
PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
|
||||||
|
if (!walEntryPeekingIterator.hasNext()) {
|
||||||
|
return CompletableFuture.completedFuture(batchIndex);
|
||||||
|
}
|
||||||
|
int batchSize = 0;
|
||||||
List<Entry> batch = new ArrayList<>();
|
List<Entry> batch = new ArrayList<>();
|
||||||
for (Entry entry : entries) {
|
while (walEntryPeekingIterator.hasNext()) {
|
||||||
|
Entry entry = walEntryPeekingIterator.peek();
|
||||||
int entrySize = getEstimatedEntrySize(entry);
|
int entrySize = getEstimatedEntrySize(entry);
|
||||||
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
|
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
|
||||||
replicateEntries(batch, index++, timeout);
|
break;
|
||||||
batch.clear();
|
|
||||||
batchSize = 0;
|
|
||||||
}
|
}
|
||||||
|
walEntryPeekingIterator.next();
|
||||||
batch.add(entry);
|
batch.add(entry);
|
||||||
batchSize += entrySize;
|
batchSize += entrySize;
|
||||||
}
|
}
|
||||||
if (batchSize > 0) {
|
|
||||||
replicateEntries(batch, index, timeout);
|
if (batchSize <= 0) {
|
||||||
|
return CompletableFuture.completedFuture(batchIndex);
|
||||||
}
|
}
|
||||||
return batchIndex;
|
final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
|
||||||
|
FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
resultCompletableFuture.completeExceptionally(exception);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!walEntryPeekingIterator.hasNext()) {
|
||||||
|
resultCompletableFuture.complete(batchIndex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
FutureUtils.addListener(
|
||||||
|
serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
|
||||||
|
(currentResponse, currentException) -> {
|
||||||
|
if (currentException != null) {
|
||||||
|
resultCompletableFuture.completeExceptionally(currentException);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resultCompletableFuture.complete(batchIndex);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return resultCompletableFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
|
/**
|
||||||
|
* Replicate entries to peer cluster by async API.
|
||||||
|
*/
|
||||||
|
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
|
||||||
|
int timeout) {
|
||||||
return isSerial
|
return isSerial
|
||||||
? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
|
? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
|
||||||
: () -> replicateEntries(entries, batchIndex, timeout);
|
timeout)
|
||||||
|
: replicateEntries(entries, batchIndex, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String logPeerId() {
|
private String logPeerId() {
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
@ -267,14 +268,14 @@ public class SyncReplicationTestBase {
|
||||||
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
|
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
|
||||||
}
|
}
|
||||||
if (!expectedRejection) {
|
if (!expectedRejection) {
|
||||||
ReplicationProtobufUtil.replicateWALEntry(
|
FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
|
||||||
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
|
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
|
||||||
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
|
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
ReplicationProtobufUtil.replicateWALEntry(
|
FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
|
||||||
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
|
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
|
||||||
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
|
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
|
||||||
fail("Should throw IOException when sync-replication state is in A or DA");
|
fail("Should throw IOException when sync-replication state is in A or DA");
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
assertRejection(e.unwrapRemoteException());
|
assertRejection(e.unwrapRemoteException());
|
||||||
|
|
|
@ -30,7 +30,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -556,15 +556,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
|
||||||
|
int timeout) {
|
||||||
// Fail only once, we don't want to slow down the test.
|
// Fail only once, we don't want to slow down the test.
|
||||||
if (failedOnce) {
|
if (failedOnce) {
|
||||||
return () -> ordinal;
|
return CompletableFuture.completedFuture(ordinal);
|
||||||
} else {
|
} else {
|
||||||
failedOnce = true;
|
failedOnce = true;
|
||||||
return () -> {
|
CompletableFuture<Integer> future = new CompletableFuture<Integer>();
|
||||||
throw new IOException("Sample Exception: Failed to replicate.");
|
future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
|
||||||
};
|
return future;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
|
||||||
return () -> {
|
int timeout) {
|
||||||
int batchIndex = replicateEntries(entries, ordinal, timeout);
|
return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
|
||||||
entriesCount += entries.size();
|
entriesCount += entries.size();
|
||||||
int count = batchCount.incrementAndGet();
|
int count = batchCount.incrementAndGet();
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
||||||
return batchIndex;
|
});
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,20 +245,23 @@ public class TestReplicator extends TestReplicationBase {
|
||||||
private final AtomicBoolean failNext = new AtomicBoolean(false);
|
private final AtomicBoolean failNext = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
|
||||||
return () -> {
|
int timeout) {
|
||||||
if (failNext.compareAndSet(false, true)) {
|
|
||||||
int batchIndex = replicateEntries(entries, ordinal, timeout);
|
if (failNext.compareAndSet(false, true)) {
|
||||||
|
return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
|
||||||
entriesCount += entries.size();
|
entriesCount += entries.size();
|
||||||
int count = batchCount.incrementAndGet();
|
int count = batchCount.incrementAndGet();
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
||||||
return batchIndex;
|
});
|
||||||
} else if (failNext.compareAndSet(true, false)) {
|
} else if (failNext.compareAndSet(true, false)) {
|
||||||
throw new ServiceException("Injected failure");
|
CompletableFuture<Integer> future = new CompletableFuture<Integer>();
|
||||||
}
|
future.completeExceptionally(new ServiceException("Injected failure"));
|
||||||
return ordinal;
|
return future;
|
||||||
};
|
}
|
||||||
|
return CompletableFuture.completedFuture(ordinal);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -165,11 +165,10 @@ public class TestSerialReplicationEndpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
|
||||||
return () -> {
|
int timeout) {
|
||||||
entryQueue.addAll(entries);
|
entryQueue.addAll(entries);
|
||||||
return ordinal;
|
return CompletableFuture.completedFuture(ordinal);
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue