From d547feac6b673d59703e1a0ef46db38b26046e4c Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 30 May 2017 14:24:51 -0700 Subject: [PATCH] HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits --- .../apache/hadoop/hbase/ipc/RpcServer.java | 4 +- .../hbase/regionserver/wal/WALEdit.java | 8 + .../HBaseInterClusterReplicationEndpoint.java | 130 ++++-- .../org/apache/hadoop/hbase/wal/WALKey.java | 28 +- .../regionserver/TestReplicator.java | 427 ++++++++++++++++++ 5 files changed, 551 insertions(+), 46 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index d5536470dad..0a3db409454 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -171,7 +171,7 @@ public abstract class RpcServer implements RpcServerInterface, protected HBaseRPCErrorHandler errorHandler = null; - protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; + public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION = new RequestTooBigException(); @@ -186,7 +186,7 @@ public abstract class RpcServer implements RpcServerInterface, protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; /** Default value for above params */ - protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M + public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d5b95ee0c16..0f32a1d9bfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -171,6 +171,14 @@ public class WALEdit implements HeapSize { return ret; } + public long estimatedSerializedSizeOf() { + long ret = 0; + for (Cell cell: cells) { + ret += CellUtil.estimatedSerializedSizeOf(cell); + } + return ret; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 97f28b4655a..30cec5be9ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -25,7 +25,9 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -56,7 +59,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.ipc.RemoteException; -import javax.security.sasl.SaslException; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} @@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int socketTimeoutMultiplier; // Amount of time for shutdown to wait for all tasks to complete private long maxTerminationWait; + // Size limit for replication RPCs, in bytes + private int replicationRpcLimit; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi new LinkedBlockingQueue<>()); this.exec.allowCoreThreadTimeOut(true); this.abortable = ctx.getAbortable(); + // Set the size limit for replication RPCs to 95% of the max request size. + // We could do with less slop if we have an accurate estimate of encoded size. Being + // conservative for now. + this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, + RpcServer.DEFAULT_MAX_REQUEST_SIZE)); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return sleepMultiplier < maxRetriesMultiplier; } + private List> createBatches(final List entries) { + int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); + int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); + // Maintains the current batch for a given partition index + Map> entryMap = new HashMap<>(n); + List> entryLists = new ArrayList<>(); + int[] sizes = new int[n]; + + for (int i = 0; i < n; i++) { + entryMap.put(i, new ArrayList(entries.size()/n+1)); + } + + for (Entry e: entries) { + int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n); + int entrySize = (int)e.getKey().estimatedSerializedSizeOf() + + (int)e.getEdit().estimatedSerializedSizeOf(); + // If this batch is oversized, add it to final list and initialize a new empty batch + if (sizes[index] > 0 /* must include at least one entry */ && + sizes[index] + entrySize > replicationRpcLimit) { + entryLists.add(entryMap.get(index)); + entryMap.put(index, new ArrayList()); + sizes[index] = 0; + } + entryMap.get(index).add(e); + sizes[index] += entrySize; + } + + entryLists.addAll(entryMap.values()); + return entryLists; + } + /** * Do the shipping logic */ @Override public boolean replicate(ReplicateContext replicateContext) { CompletionService pool = new ExecutorCompletionService<>(this.exec); - List entries = replicateContext.getEntries(); + List> batches; String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; - int numReplicated = 0; if (!peersSelected && this.isRunning()) { connectToPeers(); @@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return false; } - // minimum of: configured threads, number of 100-waledit batches, - // and number of current sinks - int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); + batches = createBatches(replicateContext.getEntries()); - List> entryLists = new ArrayList<>(n); - if (n == 1) { - entryLists.add(entries); - } else { - for (int i=0; i(entries.size()/n+1)); - } - // now group by region - for (Entry e : entries) { - entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); - } - } while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi continue; } try { - if (LOG.isTraceEnabled()) { - LOG.trace("Replicating " + entries.size() + - " entries of total size " + replicateContext.getSize()); - } - int futures = 0; - for (int i=0; i entries = batches.get(i); + if (!entries.isEmpty()) { if (LOG.isTraceEnabled()) { - LOG.trace("Submitting " + entryLists.get(i).size() + + LOG.trace("Submitting " + entries.size() + " entries of total size " + replicateContext.getSize()); } // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - pool.submit(createReplicator(entryLists.get(i), i)); + pool.submit(createReplicator(entries, i)); futures++; } } IOException iox = null; + long lastWriteTime = 0; for (int i=0; i f = pool.take(); int index = f.get().intValue(); - int batchSize = entryLists.get(index).size(); - entryLists.set(index, Collections.emptyList()); - // Now, we have marked the batch as done replicating, record its size - numReplicated += batchSize; + List batch = batches.get(index); + batches.set(index, Collections.emptyList()); // remove successful batch + // Find the most recent write time in the batch + long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); + if (writeTime > lastWriteTime) { + lastWriteTime = writeTime; + } } catch (InterruptedException ie) { iox = new IOException(ie); } catch (ExecutionException ee) { @@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // if we had any exceptions, try again throw iox; } - if (numReplicated != entries.size()) { - // Something went wrong here and we don't know what, let's just fail and retry. - LOG.warn("The number of edits replicated is different from the number received," - + " failing for now."); - return false; - } // update metrics - this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), - walGroupId); + if (lastWriteTime > 0) { + this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); + } return true; } catch (IOException ioe) { @@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.ordinal = ordinal; } + protected void replicateEntries(BlockingInterface rrs, final List batch, + String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) + throws IOException { + if (LOG.isTraceEnabled()) { + long size = 0; + for (Entry e: entries) { + size += e.getKey().estimatedSerializedSizeOf(); + size += e.getEdit().estimatedSerializedSizeOf(); + } + LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " + + entries.size() + " entries with total size " + size + " bytes to " + + replicationClusterId); + } + try { + ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + if (LOG.isTraceEnabled()) { + LOG.trace("Completed replicating batch " + System.identityHashCode(entries)); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e); + } + throw e; + } + } + @Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); - ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; - } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); @@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi throw ioe; } } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index bd03e4d0430..1d84c4b7ead 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -618,4 +618,30 @@ public class WALKey implements SequenceId, Comparable { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } -} \ No newline at end of file + + public long estimatedSerializedSizeOf() { + long size = encodedRegionName != null ? encodedRegionName.length : 0; + size += tablename != null ? tablename.toBytes().length : 0; + if (clusterIds != null) { + size += 16 * clusterIds.size(); + } + if (nonceGroup != HConstants.NO_NONCE) { + size += Bytes.SIZEOF_LONG; // nonce group + } + if (nonce != HConstants.NO_NONCE) { + size += Bytes.SIZEOF_LONG; // nonce + } + if (replicationScope != null) { + for (Map.Entry scope: replicationScope.entrySet()) { + size += scope.getKey().length; + size += Bytes.SIZEOF_INT; + } + } + size += Bytes.SIZEOF_LONG; // sequence number + size += Bytes.SIZEOF_LONG; // write time + if (origLogSeqNum > 0) { + size += Bytes.SIZEOF_LONG; // original sequence number + } + return size; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java new file mode 100644 index 00000000000..7da56a3c69d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.*; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category(MediumTests.class) +public class TestReplicator extends TestReplicationBase { + + static final Log LOG = LogFactory.getLog(TestReplicator.class); + static final int NUM_ROWS = 10; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set RPC size limit to 10kb (will be applied to both source and sink clusters) + conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); + TestReplicationBase.setUpBeforeClass(); + admin.removePeer("2"); // Remove the peer set up for us by base class + } + + @Test + public void testReplicatorBatching() throws Exception { + // Clear the tables + truncateTable(utility1, tableName); + truncateTable(utility2, tableName); + + // Replace the peer set up for us by the base class with a wrapper for this test + admin.addPeer("testReplicatorBatching", + new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + + ReplicationEndpointForTest.setBatchCount(0); + ReplicationEndpointForTest.setEntriesCount(0); + try { + ReplicationEndpointForTest.pause(); + try { + // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all + // have to be replicated separately. + final byte[] valueBytes = new byte[8 *1024]; + for (int i = 0; i < NUM_ROWS; i++) { + htable1.put(new Put(("row"+Integer.toString(i)).getBytes()) + .addColumn(famName, null, valueBytes) + ); + } + } finally { + ReplicationEndpointForTest.resume(); + } + + // Wait for replication to complete. + Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS; + } + + @Override + public String explainFailure() throws Exception { + return "We waited too long for expected replication of " + NUM_ROWS + " entries"; + } + }); + + assertEquals("We sent an incorrect number of batches", NUM_ROWS, + ReplicationEndpointForTest.getBatchCount()); + assertEquals("We did not replicate enough rows", NUM_ROWS, + utility2.countRows(htable2)); + } finally { + admin.removePeer("testReplicatorBatching"); + } + } + + @Test + public void testReplicatorWithErrors() throws Exception { + // Clear the tables + truncateTable(utility1, tableName); + truncateTable(utility2, tableName); + + // Replace the peer set up for us by the base class with a wrapper for this test + admin.addPeer("testReplicatorWithErrors", + new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) + .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), + null); + + FailureInjectingReplicationEndpointForTest.setBatchCount(0); + FailureInjectingReplicationEndpointForTest.setEntriesCount(0); + try { + FailureInjectingReplicationEndpointForTest.pause(); + try { + // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all + // have to be replicated separately. + final byte[] valueBytes = new byte[8 *1024]; + for (int i = 0; i < NUM_ROWS; i++) { + htable1.put(new Put(("row"+Integer.toString(i)).getBytes()) + .addColumn(famName, null, valueBytes) + ); + } + } finally { + FailureInjectingReplicationEndpointForTest.resume(); + } + + // Wait for replication to complete. + // We can expect 10 batches + Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; + } + + @Override + public String explainFailure() throws Exception { + return "We waited too long for expected replication of " + NUM_ROWS + " entries"; + } + }); + + assertEquals("We did not replicate enough rows", NUM_ROWS, + utility2.countRows(htable2)); + } finally { + admin.removePeer("testReplicatorWithErrors"); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + } + + private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException { + HBaseAdmin admin = util.getHBaseAdmin(); + admin.disableTable(tableName); + admin.truncateTable(tablename, false); + } + + public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { + + private static int batchCount; + private static int entriesCount; + private static final Object latch = new Object(); + private static AtomicBoolean useLatch = new AtomicBoolean(false); + + public static void resume() { + useLatch.set(false); + synchronized (latch) { + latch.notifyAll(); + } + } + + public static void pause() { + useLatch.set(true); + } + + public static void await() throws InterruptedException { + if (useLatch.get()) { + LOG.info("Waiting on latch"); + latch.wait(); + LOG.info("Waited on latch, now proceeding"); + } + } + + public static int getBatchCount() { + return batchCount; + } + + public static void setBatchCount(int i) { + batchCount = i; + } + + public static int getEntriesCount() { + return entriesCount; + } + + public static void setEntriesCount(int i) { + entriesCount = i; + } + + public class ReplicatorForTest extends Replicator { + + public ReplicatorForTest(List entries, int ordinal) { + super(entries, ordinal); + } + + @Override + protected void replicateEntries(BlockingInterface rrs, final List entries, + String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) + throws IOException { + try { + long size = 0; + for (Entry e: entries) { + size += e.getKey().estimatedSerializedSizeOf(); + size += e.getEdit().estimatedSerializedSizeOf(); + } + LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " + + entries.size() + " entries with total size " + size + " bytes to " + + replicationClusterId); + super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, + hfileArchiveDir); + entriesCount += entries.size(); + batchCount++; + LOG.info("Completed replicating batch " + System.identityHashCode(entries)); + } catch (IOException e) { + LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e); + throw e; + } + } + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + try { + await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for latch", e); + } + return super.replicate(replicateContext); + } + + @Override + protected Replicator createReplicator(List entries, int ordinal) { + return new ReplicatorForTest(entries, ordinal); + } + } + + public static class FailureInjectingReplicationEndpointForTest + extends ReplicationEndpointForTest { + + static class FailureInjectingBlockingInterface implements BlockingInterface { + + private final BlockingInterface delegate; + private volatile boolean failNext; + + public FailureInjectingBlockingInterface(BlockingInterface delegate) { + this.delegate = delegate; + } + + @Override + public GetRegionInfoResponse getRegionInfo(RpcController controller, + GetRegionInfoRequest request) throws ServiceException { + return delegate.getRegionInfo(controller, request); + } + + @Override + public GetStoreFileResponse getStoreFile(RpcController controller, + GetStoreFileRequest request) throws ServiceException { + return delegate.getStoreFile(controller, request); + } + + @Override + public GetOnlineRegionResponse getOnlineRegion(RpcController controller, + GetOnlineRegionRequest request) throws ServiceException { + return delegate.getOnlineRegion(controller, request); + } + + @Override + public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) + throws ServiceException { + return delegate.openRegion(controller, request); + } + + @Override + public WarmupRegionResponse warmupRegion(RpcController controller, + WarmupRegionRequest request) throws ServiceException { + return delegate.warmupRegion(controller, request); + } + + @Override + public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) + throws ServiceException { + return delegate.closeRegion(controller, request); + } + + @Override + public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) + throws ServiceException { + return delegate.flushRegion(controller, request); + } + + @Override + public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request) + throws ServiceException { + return delegate.splitRegion(controller, request); + } + + @Override + public CompactRegionResponse compactRegion(RpcController controller, + CompactRegionRequest request) throws ServiceException { + return delegate.compactRegion(controller, request); + } + + @Override + public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + if (!failNext) { + failNext = true; + return delegate.replicateWALEntry(controller, request); + } else { + failNext = false; + throw new ServiceException("Injected failure"); + } + } + + @Override + public ReplicateWALEntryResponse replay(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + return delegate.replay(controller, request); + } + + @Override + public RollWALWriterResponse rollWALWriter(RpcController controller, + RollWALWriterRequest request) throws ServiceException { + return delegate.rollWALWriter(controller, request); + } + + @Override + public GetServerInfoResponse getServerInfo(RpcController controller, + GetServerInfoRequest request) throws ServiceException { + return delegate.getServerInfo(controller, request); + } + + @Override + public StopServerResponse stopServer(RpcController controller, StopServerRequest request) + throws ServiceException { + return delegate.stopServer(controller, request); + } + + @Override + public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, + UpdateFavoredNodesRequest request) throws ServiceException { + return delegate.updateFavoredNodes(controller, request); + } + + @Override + public UpdateConfigurationResponse updateConfiguration(RpcController controller, + UpdateConfigurationRequest request) throws ServiceException { + return delegate.updateConfiguration(controller, request); + } + + @Override + public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(RpcController controller, + CloseRegionForSplitOrMergeRequest request) throws ServiceException { + return delegate.closeRegionForSplitOrMerge(controller, request); + } + + @Override + public GetRegionLoadResponse getRegionLoad(RpcController controller, + GetRegionLoadRequest request) throws ServiceException { + return delegate.getRegionLoad(controller, request); + } + + @Override + public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, + ClearCompactionQueuesRequest request) throws ServiceException { + return delegate.clearCompactionQueues(controller, request); + } + + @Override + public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, + GetSpaceQuotaSnapshotsRequest request) throws ServiceException { + return delegate.getSpaceQuotaSnapshots(controller, request); + } + } + + public class FailureInjectingReplicatorForTest extends ReplicatorForTest { + + public FailureInjectingReplicatorForTest(List entries, int ordinal) { + super(entries, ordinal); + } + + @Override + protected void replicateEntries(BlockingInterface rrs, List entries, + String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) + throws IOException { + super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries, + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + } + } + + @Override + protected Replicator createReplicator(List entries, int ordinal) { + return new FailureInjectingReplicatorForTest(entries, ordinal); + } + } + +}