HBASE-20481 Replicate entries from same region serially in ReplicationEndpoint for serial replication
This commit is contained in:
parent
5f260451d5
commit
de71cb5915
@ -117,7 +117,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
|
||||
/**
|
||||
* Initialize the replication endpoint with the given context.
|
||||
* @param context replication context
|
||||
* @throws IOException
|
||||
* @throws IOException error occur when initialize the endpoint.
|
||||
*/
|
||||
void init(Context context) throws IOException;
|
||||
|
||||
|
@ -24,9 +24,9 @@ import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@ -37,6 +37,9 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -108,6 +111,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
private boolean replicationBulkLoadDataEnabled;
|
||||
private Abortable abortable;
|
||||
private boolean dropOnDeletedTables;
|
||||
private boolean isSerial = false;
|
||||
|
||||
@Override
|
||||
public void init(Context context) throws IOException {
|
||||
@ -160,6 +164,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
|
||||
baseNamespaceDir = new Path(rootDir, baseNSDir);
|
||||
hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
|
||||
isSerial = context.getPeerConfig().isSerial();
|
||||
}
|
||||
|
||||
private void decorateConf() {
|
||||
@ -203,40 +208,60 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
return sleepMultiplier < maxRetriesMultiplier;
|
||||
}
|
||||
|
||||
private List<List<Entry>> createBatches(final List<Entry> entries) {
|
||||
private int getEstimatedEntrySize(Entry e) {
|
||||
long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
|
||||
return (int) size;
|
||||
}
|
||||
|
||||
private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
|
||||
int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
|
||||
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
|
||||
// Maintains the current batch for a given partition index
|
||||
Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
|
||||
List<List<Entry>> entryLists = new ArrayList<>();
|
||||
int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
|
||||
List<List<Entry>> entryLists =
|
||||
Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
|
||||
int[] sizes = new int[n];
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
|
||||
}
|
||||
|
||||
for (Entry e: entries) {
|
||||
int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
|
||||
int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
|
||||
(int)e.getEdit().estimatedSerializedSizeOf();
|
||||
// If this batch is oversized, add it to final list and initialize a new empty batch
|
||||
if (sizes[index] > 0 /* must include at least one entry */ &&
|
||||
sizes[index] + entrySize > replicationRpcLimit) {
|
||||
entryLists.add(entryMap.get(index));
|
||||
entryMap.put(index, new ArrayList<Entry>());
|
||||
for (Entry e : entries) {
|
||||
int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
|
||||
int entrySize = getEstimatedEntrySize(e);
|
||||
// If this batch has at least one entry and is over sized, move it to the tail of list and
|
||||
// initialize the entryLists[index] to be a empty list.
|
||||
if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
|
||||
entryLists.add(entryLists.get(index));
|
||||
entryLists.set(index, new ArrayList<>());
|
||||
sizes[index] = 0;
|
||||
}
|
||||
entryMap.get(index).add(e);
|
||||
entryLists.get(index).add(e);
|
||||
sizes[index] += entrySize;
|
||||
}
|
||||
|
||||
entryLists.addAll(entryMap.values());
|
||||
return entryLists;
|
||||
}
|
||||
|
||||
private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
|
||||
Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (Entry e : entries) {
|
||||
regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
|
||||
.add(e);
|
||||
}
|
||||
return new ArrayList<>(regionEntries.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
|
||||
* concurrently. Note that, for serial replication, we need to make sure that entries from the
|
||||
* same region to be replicated serially, so entries from the same region consist of a batch, and
|
||||
* we will divide a batch into several batches by replicationRpcLimit in method
|
||||
* serialReplicateRegionEntries()
|
||||
*/
|
||||
private List<List<Entry>> createBatches(final List<Entry> entries) {
|
||||
if (isSerial) {
|
||||
return createSerialBatches(entries);
|
||||
} else {
|
||||
return createParallelBatches(entries);
|
||||
}
|
||||
}
|
||||
|
||||
private TableName parseTable(String msg) {
|
||||
// ... TableNotFoundException: '<table>'/n...
|
||||
Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
|
||||
Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
|
||||
Matcher m = p.matcher(msg);
|
||||
if (m.find()) {
|
||||
String table = m.group(1);
|
||||
@ -252,17 +277,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
|
||||
// Filter a set of batches by TableName
|
||||
private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
|
||||
List<List<Entry>> entryLists = new ArrayList<>();
|
||||
for (List<Entry> entries : oldEntryList) {
|
||||
ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
|
||||
entryLists.add(thisList);
|
||||
for (Entry e : entries) {
|
||||
if (!e.getKey().getTableName().equals(table)) {
|
||||
thisList.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return entryLists;
|
||||
return oldEntryList
|
||||
.stream().map(entries -> entries.stream()
|
||||
.filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void reconnectToPeerCluster() {
|
||||
@ -277,13 +295,55 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
}
|
||||
}
|
||||
|
||||
private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
|
||||
List<List<Entry>> batches) throws IOException {
|
||||
int futures = 0;
|
||||
for (int i = 0; i < batches.size(); i++) {
|
||||
List<Entry> entries = batches.get(i);
|
||||
if (!entries.isEmpty()) {
|
||||
LOG.trace("Submitting {} entries of total size {}", entries.size(),
|
||||
replicateContext.getSize());
|
||||
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
||||
pool.submit(createReplicator(entries, i));
|
||||
futures++;
|
||||
}
|
||||
}
|
||||
|
||||
IOException iox = null;
|
||||
long lastWriteTime = 0;
|
||||
for (int i = 0; i < futures; i++) {
|
||||
try {
|
||||
// wait for all futures, remove successful parts
|
||||
// (only the remaining parts will be retried)
|
||||
Future<Integer> f = pool.take();
|
||||
int index = f.get();
|
||||
List<Entry> batch = batches.get(index);
|
||||
batches.set(index, Collections.emptyList()); // remove successful batch
|
||||
// Find the most recent write time in the batch
|
||||
long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
|
||||
if (writeTime > lastWriteTime) {
|
||||
lastWriteTime = writeTime;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
iox = new IOException(ie);
|
||||
} catch (ExecutionException ee) {
|
||||
// cause must be an IOException
|
||||
iox = (IOException) ee.getCause();
|
||||
}
|
||||
}
|
||||
if (iox != null) {
|
||||
// if we had any exceptions, try again
|
||||
throw iox;
|
||||
}
|
||||
return lastWriteTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the shipping logic
|
||||
*/
|
||||
@Override
|
||||
public boolean replicate(ReplicateContext replicateContext) {
|
||||
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
|
||||
List<List<Entry>> batches;
|
||||
String walGroupId = replicateContext.getWalGroupId();
|
||||
int sleepMultiplier = 1;
|
||||
|
||||
@ -294,13 +354,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
|
||||
int numSinks = replicationSinkMgr.getNumSinks();
|
||||
if (numSinks == 0) {
|
||||
LOG.warn("No replication sinks found, returning without replicating. The source should retry"
|
||||
+ " with the same set of edits.");
|
||||
LOG.warn("No replication sinks found, returning without replicating. The source should " +
|
||||
"retry with the same set of edits.");
|
||||
return false;
|
||||
}
|
||||
|
||||
batches = createBatches(replicateContext.getEntries());
|
||||
|
||||
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
|
||||
while (this.isRunning() && !exec.isShutdown()) {
|
||||
if (!isPeerEnabled()) {
|
||||
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
|
||||
@ -312,52 +371,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
reconnectToPeerCluster();
|
||||
}
|
||||
try {
|
||||
int futures = 0;
|
||||
for (int i=0; i<batches.size(); i++) {
|
||||
List<Entry> entries = batches.get(i);
|
||||
if (!entries.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Submitting " + entries.size() +
|
||||
" entries of total size " + replicateContext.getSize());
|
||||
}
|
||||
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
||||
pool.submit(createReplicator(entries, i));
|
||||
futures++;
|
||||
}
|
||||
}
|
||||
IOException iox = null;
|
||||
long lastWriteTime;
|
||||
|
||||
// replicate the batches to sink side.
|
||||
lastWriteTime = parallelReplicate(pool, replicateContext, batches);
|
||||
|
||||
long lastWriteTime = 0;
|
||||
for (int i=0; i<futures; i++) {
|
||||
try {
|
||||
// wait for all futures, remove successful parts
|
||||
// (only the remaining parts will be retried)
|
||||
Future<Integer> f = pool.take();
|
||||
int index = f.get().intValue();
|
||||
List<Entry> batch = batches.get(index);
|
||||
batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
|
||||
// Find the most recent write time in the batch
|
||||
long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
|
||||
if (writeTime > lastWriteTime) {
|
||||
lastWriteTime = writeTime;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
iox = new IOException(ie);
|
||||
} catch (ExecutionException ee) {
|
||||
// cause must be an IOException
|
||||
iox = (IOException)ee.getCause();
|
||||
}
|
||||
}
|
||||
if (iox != null) {
|
||||
// if we had any exceptions, try again
|
||||
throw iox;
|
||||
}
|
||||
// update metrics
|
||||
if (lastWriteTime > 0) {
|
||||
this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
|
||||
}
|
||||
return true;
|
||||
|
||||
} catch (IOException ioe) {
|
||||
// Didn't ship anything, but must still age the last time we did
|
||||
this.metrics.refreshAgeOfLastShippedOp(walGroupId);
|
||||
@ -376,7 +399,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
// Would potentially be better to retry in one of the outer loops
|
||||
// and add a table filter there; but that would break the encapsulation,
|
||||
// so we're doing the filtering here.
|
||||
LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'");
|
||||
LOG.info("Missing table detected at sink, local table also does not exist, " +
|
||||
"filtering edits for '" + table + "'");
|
||||
batches = filterBatches(batches, table);
|
||||
continue;
|
||||
}
|
||||
@ -396,8 +420,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
// happened, the cluster is alive and calling it right away
|
||||
// even for a test just makes things worse.
|
||||
sleepForRetries("Encountered a SocketTimeoutException. Since the " +
|
||||
"call to the remote cluster timed out, which is usually " +
|
||||
"caused by a machine failure or a massive slowdown",
|
||||
"call to the remote cluster timed out, which is usually " +
|
||||
"caused by a machine failure or a massive slowdown",
|
||||
this.socketTimeoutMultiplier);
|
||||
} else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
|
||||
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
|
||||
@ -420,7 +444,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
disconnect(); //don't call super.doStop()
|
||||
disconnect(); // don't call super.doStop()
|
||||
if (this.conn != null) {
|
||||
try {
|
||||
this.conn.close();
|
||||
@ -446,61 +470,58 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||
return new Replicator(entries, ordinal);
|
||||
protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
|
||||
SinkPeer sinkPeer = null;
|
||||
try {
|
||||
int entriesHashCode = System.identityHashCode(entries);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
|
||||
LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}",
|
||||
entriesHashCode, entries.size(), size, replicationClusterId);
|
||||
}
|
||||
sinkPeer = replicationSinkMgr.getReplicationSink();
|
||||
BlockingInterface rrs = sinkPeer.getRegionServer();
|
||||
try {
|
||||
ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
LOG.trace("Completed replicating batch {}", entriesHashCode);
|
||||
} catch (IOException e) {
|
||||
LOG.trace("Failed replicating batch {}", entriesHashCode, e);
|
||||
throw e;
|
||||
}
|
||||
replicationSinkMgr.reportSinkSuccess(sinkPeer);
|
||||
} catch (IOException ioe) {
|
||||
if (sinkPeer != null) {
|
||||
replicationSinkMgr.reportBadSink(sinkPeer);
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
return batchIndex;
|
||||
}
|
||||
|
||||
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
|
||||
throws IOException {
|
||||
int batchSize = 0, index = 0;
|
||||
List<Entry> batch = new ArrayList<>();
|
||||
for (Entry entry : entries) {
|
||||
int entrySize = getEstimatedEntrySize(entry);
|
||||
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
|
||||
replicateEntries(batch, index++);
|
||||
batch.clear();
|
||||
batchSize = 0;
|
||||
}
|
||||
batch.add(entry);
|
||||
batchSize += entrySize;
|
||||
}
|
||||
if (batchSize > 0) {
|
||||
replicateEntries(batch, index);
|
||||
}
|
||||
return batchIndex;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected class Replicator implements Callable<Integer> {
|
||||
private List<Entry> entries;
|
||||
private int ordinal;
|
||||
public Replicator(List<Entry> entries, int ordinal) {
|
||||
this.entries = entries;
|
||||
this.ordinal = ordinal;
|
||||
}
|
||||
|
||||
protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
|
||||
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long size = 0;
|
||||
for (Entry e: entries) {
|
||||
size += e.getKey().estimatedSerializedSizeOf();
|
||||
size += e.getEdit().estimatedSerializedSizeOf();
|
||||
}
|
||||
LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
|
||||
entries.size() + " entries with total size " + size + " bytes to " +
|
||||
replicationClusterId);
|
||||
}
|
||||
try {
|
||||
ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws IOException {
|
||||
SinkPeer sinkPeer = null;
|
||||
try {
|
||||
sinkPeer = replicationSinkMgr.getReplicationSink();
|
||||
BlockingInterface rrs = sinkPeer.getRegionServer();
|
||||
replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
replicationSinkMgr.reportSinkSuccess(sinkPeer);
|
||||
return ordinal;
|
||||
} catch (IOException ioe) {
|
||||
if (sinkPeer != null) {
|
||||
replicationSinkMgr.reportBadSink(sinkPeer);
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
|
||||
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
|
||||
: () -> replicateEntries(entries, batchIndex);
|
||||
}
|
||||
}
|
||||
|
@ -291,7 +291,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||
notifyStopped();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public UUID getPeerUUID() {
|
||||
return UUID.randomUUID();
|
||||
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -443,40 +444,15 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
// Fail only once, we don't want to slow down the test.
|
||||
if (failedOnce) {
|
||||
return new DummyReplicator(entries, ordinal);
|
||||
return () -> ordinal;
|
||||
} else {
|
||||
failedOnce = true;
|
||||
return new FailingDummyReplicator(entries, ordinal);
|
||||
}
|
||||
}
|
||||
|
||||
protected class DummyReplicator extends Replicator {
|
||||
|
||||
private int ordinal;
|
||||
|
||||
public DummyReplicator(List<Entry> entries, int ordinal) {
|
||||
super(entries, ordinal);
|
||||
this.ordinal = ordinal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws IOException {
|
||||
return ordinal;
|
||||
}
|
||||
}
|
||||
|
||||
protected class FailingDummyReplicator extends DummyReplicator {
|
||||
|
||||
public FailingDummyReplicator(List<Entry> entries, int ordinal) {
|
||||
super(entries, ordinal);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws IOException {
|
||||
throw new IOException("Sample Exception: Failed to replicate.");
|
||||
return () -> {
|
||||
throw new IOException("Sample Exception: Failed to replicate.");
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,9 +21,9 @@ import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
@ -39,57 +39,14 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
@Ignore("Flaky, needs to be rewritten, see HBASE-19125")
|
||||
public class TestReplicator extends TestReplicationBase {
|
||||
|
||||
@ClassRule
|
||||
@ -104,7 +61,6 @@ public class TestReplicator extends TestReplicationBase {
|
||||
// Set RPC size limit to 10kb (will be applied to both source and sink clusters)
|
||||
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
|
||||
TestReplicationBase.setUpBeforeClass();
|
||||
admin.removePeer("2"); // Remove the peer set up for us by base class
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -116,7 +72,8 @@ public class TestReplicator extends TestReplicationBase {
|
||||
// Replace the peer set up for us by the base class with a wrapper for this test
|
||||
admin.addPeer("testReplicatorBatching",
|
||||
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
|
||||
null);
|
||||
|
||||
ReplicationEndpointForTest.setBatchCount(0);
|
||||
ReplicationEndpointForTest.setEntriesCount(0);
|
||||
@ -125,11 +82,10 @@ public class TestReplicator extends TestReplicationBase {
|
||||
try {
|
||||
// Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
|
||||
// have to be replicated separately.
|
||||
final byte[] valueBytes = new byte[8 *1024];
|
||||
final byte[] valueBytes = new byte[8 * 1024];
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
|
||||
.addColumn(famName, null, valueBytes)
|
||||
);
|
||||
htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
|
||||
valueBytes));
|
||||
}
|
||||
} finally {
|
||||
ReplicationEndpointForTest.resume();
|
||||
@ -151,8 +107,7 @@ public class TestReplicator extends TestReplicationBase {
|
||||
|
||||
assertEquals("We sent an incorrect number of batches", NUM_ROWS,
|
||||
ReplicationEndpointForTest.getBatchCount());
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS,
|
||||
utility2.countRows(htable2));
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
|
||||
} finally {
|
||||
admin.removePeer("testReplicatorBatching");
|
||||
}
|
||||
@ -168,7 +123,7 @@ public class TestReplicator extends TestReplicationBase {
|
||||
admin.addPeer("testReplicatorWithErrors",
|
||||
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
|
||||
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
|
||||
null);
|
||||
null);
|
||||
|
||||
FailureInjectingReplicationEndpointForTest.setBatchCount(0);
|
||||
FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
|
||||
@ -177,11 +132,10 @@ public class TestReplicator extends TestReplicationBase {
|
||||
try {
|
||||
// Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
|
||||
// have to be replicated separately.
|
||||
final byte[] valueBytes = new byte[8 *1024];
|
||||
final byte[] valueBytes = new byte[8 * 1024];
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
|
||||
.addColumn(famName, null, valueBytes)
|
||||
);
|
||||
htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
|
||||
valueBytes));
|
||||
}
|
||||
} finally {
|
||||
FailureInjectingReplicationEndpointForTest.resume();
|
||||
@ -201,8 +155,7 @@ public class TestReplicator extends TestReplicationBase {
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS,
|
||||
utility2.countRows(htable2));
|
||||
assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
|
||||
} finally {
|
||||
admin.removePeer("testReplicatorWithErrors");
|
||||
}
|
||||
@ -221,8 +174,8 @@ public class TestReplicator extends TestReplicationBase {
|
||||
|
||||
public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
|
||||
|
||||
private static AtomicInteger batchCount = new AtomicInteger(0);
|
||||
private static int entriesCount;
|
||||
protected static AtomicInteger batchCount = new AtomicInteger(0);
|
||||
protected static int entriesCount;
|
||||
private static final Object latch = new Object();
|
||||
private static AtomicBoolean useLatch = new AtomicBoolean(false);
|
||||
|
||||
@ -240,7 +193,7 @@ public class TestReplicator extends TestReplicationBase {
|
||||
public static void await() throws InterruptedException {
|
||||
if (useLatch.get()) {
|
||||
LOG.info("Waiting on latch");
|
||||
synchronized(latch) {
|
||||
synchronized (latch) {
|
||||
latch.wait();
|
||||
}
|
||||
LOG.info("Waited on latch, now proceeding");
|
||||
@ -265,38 +218,6 @@ public class TestReplicator extends TestReplicationBase {
|
||||
entriesCount = i;
|
||||
}
|
||||
|
||||
public class ReplicatorForTest extends Replicator {
|
||||
|
||||
public ReplicatorForTest(List<Entry> entries, int ordinal) {
|
||||
super(entries, ordinal);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
|
||||
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||
throws IOException {
|
||||
try {
|
||||
long size = 0;
|
||||
for (Entry e: entries) {
|
||||
size += e.getKey().estimatedSerializedSizeOf();
|
||||
size += e.getEdit().estimatedSerializedSizeOf();
|
||||
}
|
||||
LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
|
||||
entries.size() + " entries with total size " + size + " bytes to " +
|
||||
replicationClusterId);
|
||||
super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
|
||||
hfileArchiveDir);
|
||||
entriesCount += entries.size();
|
||||
int count = batchCount.incrementAndGet();
|
||||
LOG.info("Completed replicating batch " + System.identityHashCode(entries) +
|
||||
" count=" + count);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicate(ReplicateContext replicateContext) {
|
||||
try {
|
||||
@ -308,170 +229,37 @@ public class TestReplicator extends TestReplicationBase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||
return new ReplicatorForTest(entries, ordinal);
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
return () -> {
|
||||
int batchIndex = replicateEntries(entries, ordinal);
|
||||
entriesCount += entries.size();
|
||||
int count = batchCount.incrementAndGet();
|
||||
LOG.info(
|
||||
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
||||
return batchIndex;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public static class FailureInjectingReplicationEndpointForTest
|
||||
extends ReplicationEndpointForTest {
|
||||
|
||||
static class FailureInjectingBlockingInterface implements BlockingInterface {
|
||||
|
||||
private final BlockingInterface delegate;
|
||||
private volatile boolean failNext;
|
||||
|
||||
public FailureInjectingBlockingInterface(BlockingInterface delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetRegionInfoResponse getRegionInfo(RpcController controller,
|
||||
GetRegionInfoRequest request) throws ServiceException {
|
||||
return delegate.getRegionInfo(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetStoreFileResponse getStoreFile(RpcController controller,
|
||||
GetStoreFileRequest request) throws ServiceException {
|
||||
return delegate.getStoreFile(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
|
||||
GetOnlineRegionRequest request) throws ServiceException {
|
||||
return delegate.getOnlineRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
|
||||
throws ServiceException {
|
||||
return delegate.openRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WarmupRegionResponse warmupRegion(RpcController controller,
|
||||
WarmupRegionRequest request) throws ServiceException {
|
||||
return delegate.warmupRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
|
||||
throws ServiceException {
|
||||
return delegate.closeRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
|
||||
throws ServiceException {
|
||||
return delegate.flushRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactRegionResponse compactRegion(RpcController controller,
|
||||
CompactRegionRequest request) throws ServiceException {
|
||||
return delegate.compactRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
|
||||
ReplicateWALEntryRequest request) throws ServiceException {
|
||||
if (!failNext) {
|
||||
failNext = true;
|
||||
return delegate.replicateWALEntry(controller, request);
|
||||
} else {
|
||||
failNext = false;
|
||||
throw new ServiceException("Injected failure");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicateWALEntryResponse replay(RpcController controller,
|
||||
ReplicateWALEntryRequest request) throws ServiceException {
|
||||
return delegate.replay(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RollWALWriterResponse rollWALWriter(RpcController controller,
|
||||
RollWALWriterRequest request) throws ServiceException {
|
||||
return delegate.rollWALWriter(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetServerInfoResponse getServerInfo(RpcController controller,
|
||||
GetServerInfoRequest request) throws ServiceException {
|
||||
return delegate.getServerInfo(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
|
||||
throws ServiceException {
|
||||
return delegate.stopServer(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
|
||||
UpdateFavoredNodesRequest request) throws ServiceException {
|
||||
return delegate.updateFavoredNodes(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateConfigurationResponse updateConfiguration(RpcController controller,
|
||||
UpdateConfigurationRequest request) throws ServiceException {
|
||||
return delegate.updateConfiguration(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetRegionLoadResponse getRegionLoad(RpcController controller,
|
||||
GetRegionLoadRequest request) throws ServiceException {
|
||||
return delegate.getRegionLoad(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
|
||||
ClearCompactionQueuesRequest request) throws ServiceException {
|
||||
return delegate.clearCompactionQueues(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
|
||||
GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
|
||||
return delegate.getSpaceQuotaSnapshots(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse executeProcedures(RpcController controller,
|
||||
ExecuteProceduresRequest request)
|
||||
throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
|
||||
ClearRegionBlockCacheRequest request) throws ServiceException {
|
||||
return delegate.clearRegionBlockCache(controller, request);
|
||||
}
|
||||
}
|
||||
|
||||
public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
|
||||
|
||||
public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
|
||||
super(entries, ordinal);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
|
||||
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||
throws IOException {
|
||||
super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
}
|
||||
}
|
||||
private final AtomicBoolean failNext = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||
return new FailureInjectingReplicatorForTest(entries, ordinal);
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
return () -> {
|
||||
if (failNext.compareAndSet(false, true)) {
|
||||
int batchIndex = replicateEntries(entries, ordinal);
|
||||
entriesCount += entries.size();
|
||||
int count = batchCount.incrementAndGet();
|
||||
LOG.info(
|
||||
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
|
||||
return batchIndex;
|
||||
} else if (failNext.compareAndSet(true, false)) {
|
||||
throw new ServiceException("Injected failure");
|
||||
}
|
||||
return ordinal;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,188 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestSerialReplicationEndpoint {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class);
|
||||
|
||||
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static Configuration CONF;
|
||||
private static Connection CONN;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster();
|
||||
CONF = UTIL.getConfiguration();
|
||||
CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
|
||||
CONN = UTIL.getConnection();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
IOUtils.closeQuietly(CONN);
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private String getZKClusterKey() {
|
||||
return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
|
||||
CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
||||
}
|
||||
|
||||
private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
|
||||
throws IOException {
|
||||
TestEndpoint.reset();
|
||||
int cellNum = 10000;
|
||||
|
||||
TableName tableName = TableName.valueOf(tableNameStr);
|
||||
byte[] family = Bytes.toBytes("f");
|
||||
byte[] qualifier = Bytes.toBytes("q");
|
||||
TableDescriptor td =
|
||||
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
|
||||
UTIL.createTable(td, null);
|
||||
|
||||
try (Admin admin = CONN.getAdmin()) {
|
||||
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
|
||||
.setReplicateAllUserTables(false).setSerial(isSerial)
|
||||
.setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
|
||||
admin.addReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
|
||||
try (Table table = CONN.getTable(tableName)) {
|
||||
for (int i = 0; i < cellNum; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier, System.currentTimeMillis(),
|
||||
Bytes.toBytes(i));
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
|
||||
|
||||
int index = 0;
|
||||
Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum);
|
||||
if (!isSerial) {
|
||||
Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
|
||||
long seqA = a.getKey().getSequenceId();
|
||||
long seqB = b.getKey().getSequenceId();
|
||||
return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
|
||||
});
|
||||
}
|
||||
for (Entry entry : TestEndpoint.getEntries()) {
|
||||
Assert.assertEquals(entry.getKey().getTableName(), tableName);
|
||||
Assert.assertEquals(entry.getEdit().getCells().size(), 1);
|
||||
Cell cell = entry.getEdit().getCells().get(0);
|
||||
Assert.assertArrayEquals(
|
||||
Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()),
|
||||
Bytes.toBytes(index));
|
||||
index++;
|
||||
}
|
||||
Assert.assertEquals(index, cellNum);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialReplicate() throws Exception {
|
||||
testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParallelReplicate() throws Exception {
|
||||
testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
|
||||
}
|
||||
|
||||
public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
|
||||
|
||||
private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
public static void reset() {
|
||||
entryQueue.clear();
|
||||
}
|
||||
|
||||
public static List<Entry> getEntries() {
|
||||
return new ArrayList<>(entryQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canReplicateToSameCluster() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
return () -> {
|
||||
entryQueue.addAll(entries);
|
||||
return ordinal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<ServerName> getRegionServers() {
|
||||
// Return multiple server names for endpoint parallel replication.
|
||||
return new ArrayList<>(
|
||||
ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
|
||||
ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
|
||||
ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
|
||||
ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
|
||||
ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user