HBASE-15001 Fix thread-safety issues with replication

ReplicationSinkManager and HBaseInterClusterReplicationEndpoint
perform certain unsafe operations which might lead to undesirable
behavior with multiwal enabled.

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Ashu Pachauri 2015-12-17 13:25:39 -08:00 committed by Elliott Clark
parent ba04e0372d
commit 48113d7572
3 changed files with 49 additions and 25 deletions

View File

@ -143,9 +143,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
int sleepMultiplier = 1; int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop // Connect to peer cluster first, unless we have to stop
while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
replicationSinkMgr.chooseSinks(); replicationSinkMgr.chooseSinks();
if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) { if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -180,19 +180,24 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
List<Entry> entries = replicateContext.getEntries(); List<Entry> entries = replicateContext.getEntries();
String walGroupId = replicateContext.getWalGroupId(); String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1; int sleepMultiplier = 1;
int numReplicated = 0;
if (!peersSelected && this.isRunning()) { if (!peersSelected && this.isRunning()) {
connectToPeers(); connectToPeers();
peersSelected = true; peersSelected = true;
} }
if (replicationSinkMgr.getSinks().size() == 0) { int numSinks = replicationSinkMgr.getNumSinks();
if (numSinks == 0) {
LOG.warn("No replication sinks found, returning without replicating. The source should retry"
+ " with the same set of edits.");
return false; return false;
} }
// minimum of: configured threads, number of 100-waledit batches, // minimum of: configured threads, number of 100-waledit batches,
// and number of current sinks // and number of current sinks
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
replicationSinkMgr.getSinks().size());
List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n); List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
if (n == 1) { if (n == 1) {
entryLists.add(entries); entryLists.add(entries);
@ -237,7 +242,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// wait for all futures, remove successful parts // wait for all futures, remove successful parts
// (only the remaining parts will be retried) // (only the remaining parts will be retried)
Future<Integer> f = pool.take(); Future<Integer> f = pool.take();
entryLists.set(f.get().intValue(), Collections.<Entry>emptyList()); int index = f.get().intValue();
int batchSize = entryLists.get(index).size();
entryLists.set(index, Collections.<Entry>emptyList());
// Now, we have marked the batch as done replicating, record its size
numReplicated += batchSize;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
iox = new IOException(ie); iox = new IOException(ie);
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
@ -249,6 +258,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// if we had any exceptions, try again // if we had any exceptions, try again
throw iox; throw iox;
} }
if (numReplicated != entries.size()) {
// Something went wrong here and we don't know what, let's just fail and retry.
LOG.warn("The number of edits replicated is different from the number received,"
+ " failing for now.");
return false;
}
// update metrics // update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId); walGroupId);

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -105,7 +106,7 @@ public class ReplicationSinkManager {
* *
* @return a replication sink to replicate to * @return a replication sink to replicate to
*/ */
public SinkPeer getReplicationSink() throws IOException { public synchronized SinkPeer getReplicationSink() throws IOException {
if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
LOG.info("Current list of sinks is out of date or empty, updating"); LOG.info("Current list of sinks is out of date or empty, updating");
chooseSinks(); chooseSinks();
@ -127,7 +128,7 @@ public class ReplicationSinkManager {
* @param sinkPeer * @param sinkPeer
* The SinkPeer that had a failed replication attempt on it * The SinkPeer that had a failed replication attempt on it
*/ */
public void reportBadSink(SinkPeer sinkPeer) { public synchronized void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName(); ServerName serverName = sinkPeer.getServerName();
int badReportCount = (badReportCounts.containsKey(serverName) int badReportCount = (badReportCounts.containsKey(serverName)
? badReportCounts.get(serverName) : 0) + 1; ? badReportCounts.get(serverName) : 0) + 1;
@ -146,11 +147,14 @@ public class ReplicationSinkManager {
* @param sinkPeer * @param sinkPeer
* The SinkPeer that had a failed replication attempt on it * The SinkPeer that had a failed replication attempt on it
*/ */
public void reportSinkSuccess(SinkPeer sinkPeer) { public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
badReportCounts.remove(sinkPeer.getServerName()); badReportCounts.remove(sinkPeer.getServerName());
} }
void chooseSinks() { /**
* Refresh the list of sinks.
*/
public synchronized void chooseSinks() {
List<ServerName> slaveAddresses = endpoint.getRegionServers(); List<ServerName> slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, random); Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
@ -159,8 +163,13 @@ public class ReplicationSinkManager {
badReportCounts.clear(); badReportCounts.clear();
} }
List<ServerName> getSinks() { public synchronized int getNumSinks() {
return sinks; return sinks.size();
}
@VisibleForTesting
protected List<ServerName> getSinksForTesting() {
return Collections.unmodifiableList(sinks);
} }
/** /**

View File

@ -67,7 +67,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks(); sinkManager.chooseSinks();
assertEquals(2, sinkManager.getSinks().size()); assertEquals(2, sinkManager.getNumSinks());
} }
@ -81,7 +81,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks(); sinkManager.chooseSinks();
assertEquals(1, sinkManager.getSinks().size()); assertEquals(1, sinkManager.getNumSinks());
} }
@Test @Test
@ -93,14 +93,14 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks(); sinkManager.chooseSinks();
// Sanity check // Sanity check
assertEquals(1, sinkManager.getSinks().size()); assertEquals(1, sinkManager.getNumSinks());
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
sinkManager.reportBadSink(sinkPeer); sinkManager.reportBadSink(sinkPeer);
// Just reporting a bad sink once shouldn't have an effect // Just reporting a bad sink once shouldn't have an effect
assertEquals(1, sinkManager.getSinks().size()); assertEquals(1, sinkManager.getNumSinks());
} }
@ -120,9 +120,9 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks(); sinkManager.chooseSinks();
// Sanity check // Sanity check
assertEquals(3, sinkManager.getSinks().size()); assertEquals(3, sinkManager.getNumSinks());
ServerName serverName = sinkManager.getSinks().get(0); ServerName serverName = sinkManager.getSinksForTesting().get(0);
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
@ -133,12 +133,12 @@ public class TestReplicationSinkManager {
// Reporting a bad sink more than the threshold count should remove it // Reporting a bad sink more than the threshold count should remove it
// from the list of potential sinks // from the list of potential sinks
assertEquals(2, sinkManager.getSinks().size()); assertEquals(2, sinkManager.getNumSinks());
// //
// now try a sink that has some successes // now try a sink that has some successes
// //
serverName = sinkManager.getSinks().get(0); serverName = sinkManager.getSinksForTesting().get(0);
sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
@ -148,17 +148,17 @@ public class TestReplicationSinkManager {
sinkManager.reportBadSink(sinkPeer); sinkManager.reportBadSink(sinkPeer);
// did not remove the sink, since we had one successful try // did not remove the sink, since we had one successful try
assertEquals(2, sinkManager.getSinks().size()); assertEquals(2, sinkManager.getNumSinks());
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) { for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
sinkManager.reportBadSink(sinkPeer); sinkManager.reportBadSink(sinkPeer);
} }
// still not remove, since the success reset the counter // still not remove, since the success reset the counter
assertEquals(2, sinkManager.getSinks().size()); assertEquals(2, sinkManager.getNumSinks());
sinkManager.reportBadSink(sinkPeer); sinkManager.reportBadSink(sinkPeer);
// but we exhausted the tries // but we exhausted the tries
assertEquals(1, sinkManager.getSinks().size()); assertEquals(1, sinkManager.getNumSinks());
} }
@Test @Test
@ -174,7 +174,7 @@ public class TestReplicationSinkManager {
sinkManager.chooseSinks(); sinkManager.chooseSinks();
// Sanity check // Sanity check
List<ServerName> sinkList = sinkManager.getSinks(); List<ServerName> sinkList = sinkManager.getSinksForTesting();
assertEquals(2, sinkList.size()); assertEquals(2, sinkList.size());
ServerName serverNameA = sinkList.get(0); ServerName serverNameA = sinkList.get(0);
@ -190,7 +190,7 @@ public class TestReplicationSinkManager {
// We've gone down to 0 good sinks, so the replication sinks // We've gone down to 0 good sinks, so the replication sinks
// should have been refreshed now // should have been refreshed now
assertEquals(2, sinkManager.getSinks().size()); assertEquals(2, sinkManager.getNumSinks());
} }
} }