HBASE-14777 Fix Inter Cluster Replication Future ordering issues

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Ashu Pachauri 2015-11-17 16:45:14 -08:00 committed by Elliott Clark
parent 98f7dcc389
commit de4f235bf4
4 changed files with 124 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -202,15 +203,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
" entries of total size " + replicateContext.getSize()); " entries of total size " + replicateContext.getSize());
} }
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
futures.add(exec.submit(new Replicator(entryLists.get(i), i))); futures.add(exec.submit(createReplicator(entryLists.get(i), i)));
} }
} }
IOException iox = null; IOException iox = null;
for (Future<Integer> f : futures) { for (int index = futures.size() - 1; index >= 0; index--) {
try { try {
// wait for all futures, remove successful parts // wait for all futures, remove successful parts
// (only the remaining parts will be retried) // (only the remaining parts will be retried)
entryLists.remove(f.get()); Future<Integer> f = futures.get(index);
entryLists.remove(f.get().intValue());
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
iox = new IOException(ie); iox = new IOException(ie);
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
@ -289,7 +291,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return super.stopAndWait(); return super.stopAndWait();
} }
private class Replicator implements Callable<Integer> { @VisibleForTesting
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
return new Replicator(entries, ordinal);
}
@VisibleForTesting
protected class Replicator implements Callable<Integer> {
private List<Entry> entries; private List<Entry> entries;
private int ordinal; private int ordinal;
public Replicator(List<Entry> entries, int ordinal) { public Replicator(List<Entry> entries, int ordinal) {

View File

@ -83,9 +83,11 @@ public class TestReplicationBase {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller log roll size to trigger more events // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); // sufficient number of events. But we don't want to go too low because
conf1.setInt("replication.source.size.capacity", 10240); // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
// more than one batch sent to the peer cluster for better testing.
conf1.setInt("replication.source.size.capacity", 102400);
conf1.setLong("replication.source.sleepforretries", 100); conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10);
@ -98,6 +100,7 @@ public class TestReplicationBase {
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf1.setLong("replication.sleep.before.failover", 2000); conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
utility1 = new HBaseTestingUtility(conf1); utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster(); utility1.startMiniZKCluster();
@ -126,7 +129,9 @@ public class TestReplicationBase {
LOG.info("Setup second Zk"); LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
utility1.startMiniCluster(2); utility1.startMiniCluster(2);
utility2.startMiniCluster(2); // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
utility2.startMiniCluster(4);
HTableDescriptor table = new HTableDescriptor(tableName); HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName); HColumnDescriptor fam = new HColumnDescriptor(famName);

View File

@ -91,6 +91,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
LOG.info("testSimplePutDelete"); LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
int numRS = peerCluster.getRegionServerThreads().size();
doPutTest(Bytes.toBytes(1)); doPutTest(Bytes.toBytes(1));
@ -99,14 +100,14 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
peerCluster.waitOnRegionServer(rsToStop); peerCluster.waitOnRegionServer(rsToStop);
// Sanity check // Sanity check
assertEquals(1, peerCluster.getRegionServerThreads().size()); assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(2)); doPutTest(Bytes.toBytes(2));
peerCluster.startRegionServer(); peerCluster.startRegionServer();
// Sanity check // Sanity check
assertEquals(2, peerCluster.getRegionServerThreads().size()); assertEquals(numRS, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(3)); doPutTest(Bytes.toBytes(3));

View File

@ -30,13 +30,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -60,7 +62,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass(); TestReplicationBase.setUpBeforeClass();
utility2.shutdownMiniCluster(); // we don't need the second cluster
admin.removePeer("2"); admin.removePeer("2");
numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
} }
@ -184,6 +185,49 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
} }
@Test (timeout=120000)
public void testInterClusterReplication() throws Exception {
final String id = "testInterClusterReplication";
List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
int totEdits = 0;
// Make sure edits are spread across regions because we do region based batching
// before shipping edits.
for(HRegion region: regions) {
HRegionInfo hri = region.getRegionInfo();
byte[] row = hri.getStartKey();
for (int i = 0; i < 100; i++) {
if (row.length > 0) {
doPut(row);
totEdits++;
}
}
}
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
final int numEdits = totEdits;
Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
}
@Override
public String explainFailure() throws Exception {
String failure = "Failed to replicate all edits, expected = " + numEdits
+ " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
return failure;
}
});
admin.removePeer("testInterClusterReplication");
utility1.deleteTableData(tableName);
}
@Test (timeout=120000) @Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception { public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
@ -270,6 +314,60 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
} }
public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {
static AtomicInteger replicateCount = new AtomicInteger();
static boolean failedOnce;
@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean success = super.replicate(replicateContext);
if (success) {
replicateCount.addAndGet(replicateContext.entries.size());
}
return success;
}
@Override
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
return new DummyReplicator(entries, ordinal);
} else {
failedOnce = true;
return new FailingDummyReplicator(entries, ordinal);
}
}
protected class DummyReplicator extends Replicator {
private int ordinal;
public DummyReplicator(List<Entry> entries, int ordinal) {
super(entries, ordinal);
this.ordinal = ordinal;
}
@Override
public Integer call() throws IOException {
return ordinal;
}
}
protected class FailingDummyReplicator extends DummyReplicator {
public FailingDummyReplicator(List<Entry> entries, int ordinal) {
super(entries, ordinal);
}
@Override
public Integer call() throws IOException {
throw new IOException("Sample Exception: Failed to replicate.");
}
}
}
public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
static int COUNT = 10; static int COUNT = 10;
static AtomicReference<Exception> ex = new AtomicReference<Exception>(null); static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);