diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 4c719a9717b..624ded6899f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -202,15 +203,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi " entries of total size " + replicateContext.getSize()); } // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - futures.add(exec.submit(new Replicator(entryLists.get(i), i))); + futures.add(exec.submit(createReplicator(entryLists.get(i), i))); } } IOException iox = null; - for (Future f : futures) { + for (int index = futures.size() - 1; index >= 0; index--) { try { // wait for all futures, remove successful parts // (only the remaining parts will be retried) - entryLists.remove(f.get()); + Future f = futures.get(index); + entryLists.remove(f.get().intValue()); } catch (InterruptedException ie) { iox = new IOException(ie); } catch (ExecutionException ee) { @@ -289,7 +291,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return super.stopAndWait(); } - private class Replicator implements Callable { + @VisibleForTesting + protected Replicator createReplicator(List entries, int ordinal) { + return new Replicator(entries, ordinal); + } + + @VisibleForTesting + protected class Replicator implements Callable { private List entries; private int ordinal; public Replicator(List entries, int ordinal) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index ad9b227ff35..ac87269110d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -83,9 +83,11 @@ public class TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); - // smaller log roll size to trigger more events - conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); - conf1.setInt("replication.source.size.capacity", 10240); + // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger + // sufficient number of events. But we don't want to go too low because + // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want + // more than one batch sent to the peer cluster for better testing. + conf1.setInt("replication.source.size.capacity", 102400); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); @@ -98,6 +100,7 @@ public class TestReplicationBase { conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); + conf1.setFloat("replication.source.ratio", 1.0f); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); @@ -126,7 +129,9 @@ public class TestReplicationBase { LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); utility1.startMiniCluster(2); - utility2.startMiniCluster(2); + // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks + // as a component in deciding maximum number of parallel batches to send to the peer cluster. + utility2.startMiniCluster(4); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index ba2a7c18104..53aabfed80f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -92,6 +92,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas LOG.info("testSimplePutDelete"); MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + int numRS = peerCluster.getRegionServerThreads().size(); doPutTest(Bytes.toBytes(1)); @@ -100,14 +101,14 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas peerCluster.waitOnRegionServer(rsToStop); // Sanity check - assertEquals(1, peerCluster.getRegionServerThreads().size()); + assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); doPutTest(Bytes.toBytes(2)); peerCluster.startRegionServer(); // Sanity check - assertEquals(2, peerCluster.getRegionServerThreads().size()); + assertEquals(numRS, peerCluster.getRegionServerThreads().size()); doPutTest(Bytes.toBytes(3)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 66adf70c0c3..bf0cc1a7a4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -29,11 +29,14 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -60,7 +63,6 @@ public class TestReplicationEndpoint extends TestReplicationBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TestReplicationBase.setUpBeforeClass(); - utility2.shutdownMiniCluster(); // we don't need the second cluster admin.removePeer("2"); numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); } @@ -184,6 +186,49 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); } + @Test (timeout=120000) + public void testInterClusterReplication() throws Exception { + final String id = "testInterClusterReplication"; + + List regions = utility1.getHBaseCluster().getRegions(tableName); + int totEdits = 0; + + // Make sure edits are spread across regions because we do region based batching + // before shipping edits. + for(HRegion region: regions) { + HRegionInfo hri = region.getRegionInfo(); + byte[] row = hri.getStartKey(); + for (int i = 0; i < 100; i++) { + if (row.length > 0) { + doPut(row); + totEdits++; + } + } + } + + admin.addPeer(id, + new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2)) + .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), + null); + + final int numEdits = totEdits; + Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; + } + @Override + public String explainFailure() throws Exception { + String failure = "Failed to replicate all edits, expected = " + numEdits + + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); + return failure; + } + }); + + admin.removePeer("testInterClusterReplication"); + utility1.deleteTableData(tableName); + } + @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", @@ -270,6 +315,60 @@ public class TestReplicationEndpoint extends TestReplicationBase { } } + public static class InterClusterReplicationEndpointForTest + extends HBaseInterClusterReplicationEndpoint { + + static AtomicInteger replicateCount = new AtomicInteger(); + static boolean failedOnce; + + @Override + public boolean replicate(ReplicateContext replicateContext) { + boolean success = super.replicate(replicateContext); + if (success) { + replicateCount.addAndGet(replicateContext.entries.size()); + } + return success; + } + + @Override + protected Replicator createReplicator(List entries, int ordinal) { + // Fail only once, we don't want to slow down the test. + if (failedOnce) { + return new DummyReplicator(entries, ordinal); + } else { + failedOnce = true; + return new FailingDummyReplicator(entries, ordinal); + } + } + + protected class DummyReplicator extends Replicator { + + private int ordinal; + + public DummyReplicator(List entries, int ordinal) { + super(entries, ordinal); + this.ordinal = ordinal; + } + + @Override + public Integer call() throws IOException { + return ordinal; + } + } + + protected class FailingDummyReplicator extends DummyReplicator { + + public FailingDummyReplicator(List entries, int ordinal) { + super(entries, ordinal); + } + + @Override + public Integer call() throws IOException { + throw new IOException("Sample Exception: Failed to replicate."); + } + } + } + public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { static int COUNT = 10; static AtomicReference ex = new AtomicReference(null);