HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits
This commit is contained in:
parent
1a37f3be82
commit
140c559a3a
|
@ -258,7 +258,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
protected HBaseRPCErrorHandler errorHandler = null;
|
protected HBaseRPCErrorHandler errorHandler = null;
|
||||||
|
|
||||||
static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
|
public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
|
||||||
private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
|
private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
|
||||||
new RequestTooBigException();
|
new RequestTooBigException();
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
|
private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
|
||||||
|
|
||||||
/** Default value for above params */
|
/** Default value for above params */
|
||||||
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
|
public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
|
||||||
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
|
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
|
||||||
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
||||||
|
|
||||||
|
|
|
@ -292,6 +292,14 @@ public class WALEdit implements Writable, HeapSize {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long estimatedSerializedSizeOf() {
|
||||||
|
long ret = 0;
|
||||||
|
for (Cell cell: cells) {
|
||||||
|
ret += CellUtil.estimatedSerializedSizeOf(cell);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
|
@ -23,7 +23,9 @@ import java.net.ConnectException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||||
|
@ -54,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import javax.security.sasl.SaslException;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
private int socketTimeoutMultiplier;
|
private int socketTimeoutMultiplier;
|
||||||
// Amount of time for shutdown to wait for all tasks to complete
|
// Amount of time for shutdown to wait for all tasks to complete
|
||||||
private long maxTerminationWait;
|
private long maxTerminationWait;
|
||||||
|
// Size limit for replication RPCs, in bytes
|
||||||
|
private int replicationRpcLimit;
|
||||||
//Metrics for this source
|
//Metrics for this source
|
||||||
private MetricsSource metrics;
|
private MetricsSource metrics;
|
||||||
// Handles connecting to peer region servers
|
// Handles connecting to peer region servers
|
||||||
|
@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
new LinkedBlockingQueue<Runnable>());
|
new LinkedBlockingQueue<Runnable>());
|
||||||
this.exec.allowCoreThreadTimeOut(true);
|
this.exec.allowCoreThreadTimeOut(true);
|
||||||
this.abortable = ctx.getAbortable();
|
this.abortable = ctx.getAbortable();
|
||||||
|
// Set the size limit for replication RPCs to 95% of the max request size.
|
||||||
|
// We could do with less slop if we have an accurate estimate of encoded size. Being
|
||||||
|
// conservative for now.
|
||||||
|
this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
|
||||||
|
RpcServer.DEFAULT_MAX_REQUEST_SIZE));
|
||||||
|
|
||||||
this.replicationBulkLoadDataEnabled =
|
this.replicationBulkLoadDataEnabled =
|
||||||
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||||
|
@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
return sleepMultiplier < maxRetriesMultiplier;
|
return sleepMultiplier < maxRetriesMultiplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<List<Entry>> createBatches(final List<Entry> entries) {
|
||||||
|
int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
|
||||||
|
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
|
||||||
|
// Maintains the current batch for a given partition index
|
||||||
|
Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
|
||||||
|
List<List<Entry>> entryLists = new ArrayList<>();
|
||||||
|
int[] sizes = new int[n];
|
||||||
|
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Entry e: entries) {
|
||||||
|
int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
|
||||||
|
int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
|
||||||
|
(int)e.getEdit().estimatedSerializedSizeOf();
|
||||||
|
// If this batch is oversized, add it to final list and initialize a new empty batch
|
||||||
|
if (sizes[index] > 0 /* must include at least one entry */ &&
|
||||||
|
sizes[index] + entrySize > replicationRpcLimit) {
|
||||||
|
entryLists.add(entryMap.get(index));
|
||||||
|
entryMap.put(index, new ArrayList<Entry>());
|
||||||
|
sizes[index] = 0;
|
||||||
|
}
|
||||||
|
entryMap.get(index).add(e);
|
||||||
|
sizes[index] += entrySize;
|
||||||
|
}
|
||||||
|
|
||||||
|
entryLists.addAll(entryMap.values());
|
||||||
|
return entryLists;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the shipping logic
|
* Do the shipping logic
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean replicate(ReplicateContext replicateContext) {
|
public boolean replicate(ReplicateContext replicateContext) {
|
||||||
CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
|
CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
|
||||||
List<Entry> entries = replicateContext.getEntries();
|
List<List<Entry>> batches;
|
||||||
String walGroupId = replicateContext.getWalGroupId();
|
String walGroupId = replicateContext.getWalGroupId();
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
int numReplicated = 0;
|
|
||||||
|
|
||||||
if (!peersSelected && this.isRunning()) {
|
if (!peersSelected && this.isRunning()) {
|
||||||
connectToPeers();
|
connectToPeers();
|
||||||
|
@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// minimum of: configured threads, number of 100-waledit batches,
|
batches = createBatches(replicateContext.getEntries());
|
||||||
// and number of current sinks
|
|
||||||
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
|
|
||||||
|
|
||||||
List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
|
|
||||||
if (n == 1) {
|
|
||||||
entryLists.add(entries);
|
|
||||||
} else {
|
|
||||||
for (int i=0; i<n; i++) {
|
|
||||||
entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
|
|
||||||
}
|
|
||||||
// now group by region
|
|
||||||
for (Entry e : entries) {
|
|
||||||
entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while (this.isRunning() && !exec.isShutdown()) {
|
while (this.isRunning() && !exec.isShutdown()) {
|
||||||
if (!isPeerEnabled()) {
|
if (!isPeerEnabled()) {
|
||||||
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
|
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
|
||||||
|
@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Replicating " + entries.size() +
|
|
||||||
" entries of total size " + replicateContext.getSize());
|
|
||||||
}
|
|
||||||
|
|
||||||
int futures = 0;
|
int futures = 0;
|
||||||
for (int i=0; i<entryLists.size(); i++) {
|
for (int i=0; i<batches.size(); i++) {
|
||||||
if (!entryLists.get(i).isEmpty()) {
|
List<Entry> entries = batches.get(i);
|
||||||
|
if (!entries.isEmpty()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Submitting " + entryLists.get(i).size() +
|
LOG.trace("Submitting " + entries.size() +
|
||||||
" entries of total size " + replicateContext.getSize());
|
" entries of total size " + replicateContext.getSize());
|
||||||
}
|
}
|
||||||
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
||||||
pool.submit(createReplicator(entryLists.get(i), i));
|
pool.submit(createReplicator(entries, i));
|
||||||
futures++;
|
futures++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
IOException iox = null;
|
IOException iox = null;
|
||||||
|
|
||||||
|
long lastWriteTime = 0;
|
||||||
for (int i=0; i<futures; i++) {
|
for (int i=0; i<futures; i++) {
|
||||||
try {
|
try {
|
||||||
// wait for all futures, remove successful parts
|
// wait for all futures, remove successful parts
|
||||||
// (only the remaining parts will be retried)
|
// (only the remaining parts will be retried)
|
||||||
Future<Integer> f = pool.take();
|
Future<Integer> f = pool.take();
|
||||||
int index = f.get().intValue();
|
int index = f.get().intValue();
|
||||||
int batchSize = entryLists.get(index).size();
|
List<Entry> batch = batches.get(index);
|
||||||
entryLists.set(index, Collections.<Entry>emptyList());
|
batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
|
||||||
// Now, we have marked the batch as done replicating, record its size
|
// Find the most recent write time in the batch
|
||||||
numReplicated += batchSize;
|
long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
|
||||||
|
if (writeTime > lastWriteTime) {
|
||||||
|
lastWriteTime = writeTime;
|
||||||
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
iox = new IOException(ie);
|
iox = new IOException(ie);
|
||||||
} catch (ExecutionException ee) {
|
} catch (ExecutionException ee) {
|
||||||
|
@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
// if we had any exceptions, try again
|
// if we had any exceptions, try again
|
||||||
throw iox;
|
throw iox;
|
||||||
}
|
}
|
||||||
if (numReplicated != entries.size()) {
|
|
||||||
// Something went wrong here and we don't know what, let's just fail and retry.
|
|
||||||
LOG.warn("The number of edits replicated is different from the number received,"
|
|
||||||
+ " failing for now.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// update metrics
|
// update metrics
|
||||||
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
|
if (lastWriteTime > 0) {
|
||||||
walGroupId);
|
this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
this.ordinal = ordinal;
|
this.ordinal = ordinal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
|
||||||
|
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||||
|
throws IOException {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
long size = 0;
|
||||||
|
for (Entry e: entries) {
|
||||||
|
size += e.getKey().estimatedSerializedSizeOf();
|
||||||
|
size += e.getEdit().estimatedSerializedSizeOf();
|
||||||
|
}
|
||||||
|
LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
|
||||||
|
entries.size() + " entries with total size " + size + " bytes to " +
|
||||||
|
replicationClusterId);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
|
||||||
|
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer call() throws IOException {
|
public Integer call() throws IOException {
|
||||||
SinkPeer sinkPeer = null;
|
SinkPeer sinkPeer = null;
|
||||||
try {
|
try {
|
||||||
sinkPeer = replicationSinkMgr.getReplicationSink();
|
sinkPeer = replicationSinkMgr.getReplicationSink();
|
||||||
BlockingInterface rrs = sinkPeer.getRegionServer();
|
BlockingInterface rrs = sinkPeer.getRegionServer();
|
||||||
ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
|
replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
|
||||||
replicationSinkMgr.reportSinkSuccess(sinkPeer);
|
replicationSinkMgr.reportSinkSuccess(sinkPeer);
|
||||||
return ordinal;
|
return ordinal;
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (sinkPeer != null) {
|
if (sinkPeer != null) {
|
||||||
replicationSinkMgr.reportBadSink(sinkPeer);
|
replicationSinkMgr.reportBadSink(sinkPeer);
|
||||||
|
@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -614,4 +614,29 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
public long estimatedSerializedSizeOf() {
|
||||||
|
long size = encodedRegionName != null ? encodedRegionName.length : 0;
|
||||||
|
size += tablename != null ? tablename.toBytes().length : 0;
|
||||||
|
if (clusterIds != null) {
|
||||||
|
size += 16 * clusterIds.size();
|
||||||
|
}
|
||||||
|
if (nonceGroup != HConstants.NO_NONCE) {
|
||||||
|
size += Bytes.SIZEOF_LONG; // nonce group
|
||||||
|
}
|
||||||
|
if (nonce != HConstants.NO_NONCE) {
|
||||||
|
size += Bytes.SIZEOF_LONG; // nonce
|
||||||
|
}
|
||||||
|
if (scopes != null) {
|
||||||
|
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
|
||||||
|
size += scope.getKey().length;
|
||||||
|
size += Bytes.SIZEOF_INT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
size += Bytes.SIZEOF_LONG; // sequence number
|
||||||
|
size += Bytes.SIZEOF_LONG; // write time
|
||||||
|
if (origLogSeqNum > 0) {
|
||||||
|
size += Bytes.SIZEOF_LONG; // original sequence number
|
||||||
|
}
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,408 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.*;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestReplicator extends TestReplicationBase {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestReplicator.class);
|
||||||
|
static final int NUM_ROWS = 10;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// Set RPC size limit to 10kb (will be applied to both source and sink clusters)
|
||||||
|
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
|
||||||
|
TestReplicationBase.setUpBeforeClass();
|
||||||
|
admin.removePeer("2"); // Remove the peer set up for us by base class
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicatorBatching() throws Exception {
|
||||||
|
// Clear the tables
|
||||||
|
truncateTable(utility1, tableName);
|
||||||
|
truncateTable(utility2, tableName);
|
||||||
|
|
||||||
|
// Replace the peer set up for us by the base class with a wrapper for this test
|
||||||
|
admin.addPeer("testReplicatorBatching",
|
||||||
|
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
|
||||||
|
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
|
||||||
|
|
||||||
|
ReplicationEndpointForTest.setBatchCount(0);
|
||||||
|
ReplicationEndpointForTest.setEntriesCount(0);
|
||||||
|
try {
|
||||||
|
ReplicationEndpointForTest.pause();
|
||||||
|
try {
|
||||||
|
// Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
|
||||||
|
// have to be replicated separately.
|
||||||
|
final byte[] valueBytes = new byte[8 *1024];
|
||||||
|
for (int i = 0; i < NUM_ROWS; i++) {
|
||||||
|
htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
|
||||||
|
.addColumn(famName, null, valueBytes)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
ReplicationEndpointForTest.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for replication to complete.
|
||||||
|
// We can expect 10 batches, 1 row each
|
||||||
|
Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "We waited too long for expected replication of " + NUM_ROWS + " entries";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("We sent an incorrect number of batches", NUM_ROWS,
|
||||||
|
ReplicationEndpointForTest.getBatchCount());
|
||||||
|
assertEquals("We did not replicate enough rows", NUM_ROWS,
|
||||||
|
utility2.countRows(htable2));
|
||||||
|
} finally {
|
||||||
|
admin.removePeer("testReplicatorBatching");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicatorWithErrors() throws Exception {
|
||||||
|
// Clear the tables
|
||||||
|
truncateTable(utility1, tableName);
|
||||||
|
truncateTable(utility2, tableName);
|
||||||
|
|
||||||
|
// Replace the peer set up for us by the base class with a wrapper for this test
|
||||||
|
admin.addPeer("testReplicatorWithErrors",
|
||||||
|
new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
|
||||||
|
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
|
||||||
|
null);
|
||||||
|
|
||||||
|
FailureInjectingReplicationEndpointForTest.setBatchCount(0);
|
||||||
|
FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
|
||||||
|
try {
|
||||||
|
FailureInjectingReplicationEndpointForTest.pause();
|
||||||
|
try {
|
||||||
|
// Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
|
||||||
|
// have to be replicated separately.
|
||||||
|
final byte[] valueBytes = new byte[8 *1024];
|
||||||
|
for (int i = 0; i < NUM_ROWS; i++) {
|
||||||
|
htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
|
||||||
|
.addColumn(famName, null, valueBytes)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
FailureInjectingReplicationEndpointForTest.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for replication to complete.
|
||||||
|
// We can expect 10 batches
|
||||||
|
Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "We waited too long for expected replication of " + NUM_ROWS + " entries";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("We did not replicate enough rows", NUM_ROWS,
|
||||||
|
utility2.countRows(htable2));
|
||||||
|
} finally {
|
||||||
|
admin.removePeer("testReplicatorWithErrors");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TestReplicationBase.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
|
||||||
|
HBaseAdmin admin = util.getHBaseAdmin();
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.truncateTable(tablename, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
|
||||||
|
|
||||||
|
private static int batchCount;
|
||||||
|
private static int entriesCount;
|
||||||
|
private static final Object latch = new Object();
|
||||||
|
private static AtomicBoolean useLatch = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public static void resume() {
|
||||||
|
useLatch.set(false);
|
||||||
|
synchronized (latch) {
|
||||||
|
latch.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void pause() {
|
||||||
|
useLatch.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void await() throws InterruptedException {
|
||||||
|
if (useLatch.get()) {
|
||||||
|
LOG.info("Waiting on latch");
|
||||||
|
latch.wait();
|
||||||
|
LOG.info("Waited on latch, now proceeding");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getBatchCount() {
|
||||||
|
return batchCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setBatchCount(int i) {
|
||||||
|
batchCount = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getEntriesCount() {
|
||||||
|
return entriesCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setEntriesCount(int i) {
|
||||||
|
entriesCount = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
public class ReplicatorForTest extends Replicator {
|
||||||
|
|
||||||
|
public ReplicatorForTest(List<Entry> entries, int ordinal) {
|
||||||
|
super(entries, ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
|
||||||
|
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
long size = 0;
|
||||||
|
for (Entry e: entries) {
|
||||||
|
size += e.getKey().estimatedSerializedSizeOf();
|
||||||
|
size += e.getEdit().estimatedSerializedSizeOf();
|
||||||
|
}
|
||||||
|
LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
|
||||||
|
entries.size() + " entries with total size " + size + " bytes to " +
|
||||||
|
replicationClusterId);
|
||||||
|
super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
|
||||||
|
hfileArchiveDir);
|
||||||
|
entriesCount += entries.size();
|
||||||
|
batchCount++;
|
||||||
|
LOG.info("Completed replicating batch " + System.identityHashCode(entries));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean replicate(ReplicateContext replicateContext) {
|
||||||
|
try {
|
||||||
|
await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("Interrupted waiting for latch", e);
|
||||||
|
}
|
||||||
|
return super.replicate(replicateContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||||
|
return new ReplicatorForTest(entries, ordinal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FailureInjectingReplicationEndpointForTest
|
||||||
|
extends ReplicationEndpointForTest {
|
||||||
|
|
||||||
|
static class FailureInjectingBlockingInterface implements BlockingInterface {
|
||||||
|
|
||||||
|
private final BlockingInterface delegate;
|
||||||
|
private volatile boolean failNext;
|
||||||
|
|
||||||
|
public FailureInjectingBlockingInterface(BlockingInterface delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
|
||||||
|
ReplicateWALEntryRequest request) throws ServiceException {
|
||||||
|
if (!failNext) {
|
||||||
|
failNext = true;
|
||||||
|
return delegate.replicateWALEntry(controller, request);
|
||||||
|
} else {
|
||||||
|
failNext = false;
|
||||||
|
throw new ServiceException("Injected failure");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetRegionInfoResponse getRegionInfo(RpcController controller,
|
||||||
|
GetRegionInfoRequest request) throws ServiceException {
|
||||||
|
return delegate.getRegionInfo(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetStoreFileResponse getStoreFile(RpcController controller,
|
||||||
|
GetStoreFileRequest request) throws ServiceException {
|
||||||
|
return delegate.getStoreFile(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
|
||||||
|
GetOnlineRegionRequest request) throws ServiceException {
|
||||||
|
return delegate.getOnlineRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
return delegate.openRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WarmupRegionResponse warmupRegion(RpcController controller,
|
||||||
|
WarmupRegionRequest request) throws ServiceException {
|
||||||
|
return delegate.warmupRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
return delegate.closeRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
return delegate.flushRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
return delegate.splitRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompactRegionResponse compactRegion(RpcController controller,
|
||||||
|
CompactRegionRequest request) throws ServiceException {
|
||||||
|
return delegate.compactRegion(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MergeRegionsResponse mergeRegions(RpcController controller,
|
||||||
|
MergeRegionsRequest request) throws ServiceException {
|
||||||
|
return delegate.mergeRegions(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplicateWALEntryResponse replay(RpcController controller,
|
||||||
|
ReplicateWALEntryRequest request) throws ServiceException {
|
||||||
|
return delegate.replay(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RollWALWriterResponse rollWALWriter(RpcController controller,
|
||||||
|
RollWALWriterRequest request) throws ServiceException {
|
||||||
|
return delegate.rollWALWriter(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetServerInfoResponse getServerInfo(RpcController controller,
|
||||||
|
GetServerInfoRequest request) throws ServiceException {
|
||||||
|
return delegate.getServerInfo(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
return delegate.stopServer(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
|
||||||
|
UpdateFavoredNodesRequest request) throws ServiceException {
|
||||||
|
return delegate.updateFavoredNodes(controller, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateConfigurationResponse updateConfiguration(RpcController controller,
|
||||||
|
UpdateConfigurationRequest request) throws ServiceException {
|
||||||
|
return delegate.updateConfiguration(controller, request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
|
||||||
|
|
||||||
|
public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
|
||||||
|
super(entries, ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
|
||||||
|
String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
|
||||||
|
throws IOException {
|
||||||
|
super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
|
||||||
|
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
|
||||||
|
return new FailureInjectingReplicatorForTest(entries, ordinal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue