HBASE-14777 Fix Inter Cluster Replication Future ordering issues

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Ashu Pachauri 2015-11-17 16:45:14 -08:00 committed by Elliott Clark
parent b2c20cebb0
commit c8fbaf0c96
4 changed files with 124 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -202,15 +203,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
" entries of total size " + replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
futures.add(exec.submit(new Replicator(entryLists.get(i), i)));
futures.add(exec.submit(createReplicator(entryLists.get(i), i)));
}
}
IOException iox = null;
for (Future<Integer> f : futures) {
for (int index = futures.size() - 1; index >= 0; index--) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
entryLists.remove(f.get());
Future<Integer> f = futures.get(index);
entryLists.remove(f.get().intValue());
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
@ -289,7 +291,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return super.stopAndWait();
}
private class Replicator implements Callable<Integer> {
@VisibleForTesting
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
return new Replicator(entries, ordinal);
}
@VisibleForTesting
protected class Replicator implements Callable<Integer> {
private List<Entry> entries;
private int ordinal;
public Replicator(List<Entry> entries, int ordinal) {

View File

@ -83,9 +83,11 @@ public class TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller log roll size to trigger more events
conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf1.setInt("replication.source.size.capacity", 10240);
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
// sufficient number of events. But we don't want to go too low because
// HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
// more than one batch sent to the peer cluster for better testing.
conf1.setInt("replication.source.size.capacity", 102400);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
@ -98,6 +100,7 @@ public class TestReplicationBase {
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
@ -126,7 +129,9 @@ public class TestReplicationBase {
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
// Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
utility2.startMiniCluster(4);
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);

View File

@ -92,6 +92,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
int numRS = peerCluster.getRegionServerThreads().size();
doPutTest(Bytes.toBytes(1));
@ -100,14 +101,14 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
peerCluster.waitOnRegionServer(rsToStop);
// Sanity check
assertEquals(1, peerCluster.getRegionServerThreads().size());
assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(2));
peerCluster.startRegionServer();
// Sanity check
assertEquals(2, peerCluster.getRegionServerThreads().size());
assertEquals(numRS, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(3));

View File

@ -29,11 +29,14 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -60,7 +63,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
utility2.shutdownMiniCluster(); // we don't need the second cluster
admin.removePeer("2");
numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
}
@ -184,6 +186,49 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
}
@Test (timeout=120000)
public void testInterClusterReplication() throws Exception {
final String id = "testInterClusterReplication";
List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
int totEdits = 0;
// Make sure edits are spread across regions because we do region based batching
// before shipping edits.
for(HRegion region: regions) {
HRegionInfo hri = region.getRegionInfo();
byte[] row = hri.getStartKey();
for (int i = 0; i < 100; i++) {
if (row.length > 0) {
doPut(row);
totEdits++;
}
}
}
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
final int numEdits = totEdits;
Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
}
@Override
public String explainFailure() throws Exception {
String failure = "Failed to replicate all edits, expected = " + numEdits
+ " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
return failure;
}
});
admin.removePeer("testInterClusterReplication");
utility1.deleteTableData(tableName);
}
@Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
@ -270,6 +315,60 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {
static AtomicInteger replicateCount = new AtomicInteger();
static boolean failedOnce;
@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean success = super.replicate(replicateContext);
if (success) {
replicateCount.addAndGet(replicateContext.entries.size());
}
return success;
}
@Override
protected Replicator createReplicator(List<Entry> entries, int ordinal) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
return new DummyReplicator(entries, ordinal);
} else {
failedOnce = true;
return new FailingDummyReplicator(entries, ordinal);
}
}
protected class DummyReplicator extends Replicator {
private int ordinal;
public DummyReplicator(List<Entry> entries, int ordinal) {
super(entries, ordinal);
this.ordinal = ordinal;
}
@Override
public Integer call() throws IOException {
return ordinal;
}
}
protected class FailingDummyReplicator extends DummyReplicator {
public FailingDummyReplicator(List<Entry> entries, int ordinal) {
super(entries, ordinal);
}
@Override
public Integer call() throws IOException {
throw new IOException("Sample Exception: Failed to replicate.");
}
}
}
public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
static int COUNT = 10;
static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);