HBASE-15001 Fix thread-safety issues with replication
ReplicationSinkManager and HBaseInterClusterReplicationEndpoint perform certain unsafe operations which might lead to undesirable behavior with multiwal enabled. Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
d1c412ae8f
commit
b3300602ed
|
@ -144,9 +144,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
int sleepMultiplier = 1;
|
||||
|
||||
// Connect to peer cluster first, unless we have to stop
|
||||
while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
|
||||
while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
|
||||
replicationSinkMgr.chooseSinks();
|
||||
if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
|
||||
if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
|
||||
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
|
||||
sleepMultiplier++;
|
||||
}
|
||||
|
@ -181,19 +181,24 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
List<Entry> entries = replicateContext.getEntries();
|
||||
String walGroupId = replicateContext.getWalGroupId();
|
||||
int sleepMultiplier = 1;
|
||||
int numReplicated = 0;
|
||||
|
||||
if (!peersSelected && this.isRunning()) {
|
||||
connectToPeers();
|
||||
peersSelected = true;
|
||||
}
|
||||
|
||||
if (replicationSinkMgr.getSinks().size() == 0) {
|
||||
int numSinks = replicationSinkMgr.getNumSinks();
|
||||
if (numSinks == 0) {
|
||||
LOG.warn("No replication sinks found, returning without replicating. The source should retry"
|
||||
+ " with the same set of edits.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// minimum of: configured threads, number of 100-waledit batches,
|
||||
// and number of current sinks
|
||||
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
|
||||
replicationSinkMgr.getSinks().size());
|
||||
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
|
||||
|
||||
List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
|
||||
if (n == 1) {
|
||||
entryLists.add(entries);
|
||||
|
@ -238,7 +243,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
// wait for all futures, remove successful parts
|
||||
// (only the remaining parts will be retried)
|
||||
Future<Integer> f = pool.take();
|
||||
entryLists.set(f.get().intValue(), Collections.<Entry>emptyList());
|
||||
int index = f.get().intValue();
|
||||
int batchSize = entryLists.get(index).size();
|
||||
entryLists.set(index, Collections.<Entry>emptyList());
|
||||
// Now, we have marked the batch as done replicating, record its size
|
||||
numReplicated += batchSize;
|
||||
} catch (InterruptedException ie) {
|
||||
iox = new IOException(ie);
|
||||
} catch (ExecutionException ee) {
|
||||
|
@ -250,6 +259,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
// if we had any exceptions, try again
|
||||
throw iox;
|
||||
}
|
||||
if (numReplicated != entries.size()) {
|
||||
// Something went wrong here and we don't know what, let's just fail and retry.
|
||||
LOG.warn("The number of edits replicated is different from the number received,"
|
||||
+ " failing for now.");
|
||||
return false;
|
||||
}
|
||||
// update metrics
|
||||
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
|
||||
walGroupId);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -105,7 +106,7 @@ public class ReplicationSinkManager {
|
|||
*
|
||||
* @return a replication sink to replicate to
|
||||
*/
|
||||
public SinkPeer getReplicationSink() throws IOException {
|
||||
public synchronized SinkPeer getReplicationSink() throws IOException {
|
||||
if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
|
||||
LOG.info("Current list of sinks is out of date or empty, updating");
|
||||
chooseSinks();
|
||||
|
@ -127,7 +128,7 @@ public class ReplicationSinkManager {
|
|||
* @param sinkPeer
|
||||
* The SinkPeer that had a failed replication attempt on it
|
||||
*/
|
||||
public void reportBadSink(SinkPeer sinkPeer) {
|
||||
public synchronized void reportBadSink(SinkPeer sinkPeer) {
|
||||
ServerName serverName = sinkPeer.getServerName();
|
||||
int badReportCount = (badReportCounts.containsKey(serverName)
|
||||
? badReportCounts.get(serverName) : 0) + 1;
|
||||
|
@ -146,11 +147,14 @@ public class ReplicationSinkManager {
|
|||
* @param sinkPeer
|
||||
* The SinkPeer that had a failed replication attempt on it
|
||||
*/
|
||||
public void reportSinkSuccess(SinkPeer sinkPeer) {
|
||||
public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
|
||||
badReportCounts.remove(sinkPeer.getServerName());
|
||||
}
|
||||
|
||||
void chooseSinks() {
|
||||
/**
|
||||
* Refresh the list of sinks.
|
||||
*/
|
||||
public synchronized void chooseSinks() {
|
||||
List<ServerName> slaveAddresses = endpoint.getRegionServers();
|
||||
Collections.shuffle(slaveAddresses, random);
|
||||
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
|
||||
|
@ -159,8 +163,13 @@ public class ReplicationSinkManager {
|
|||
badReportCounts.clear();
|
||||
}
|
||||
|
||||
List<ServerName> getSinks() {
|
||||
return sinks;
|
||||
public synchronized int getNumSinks() {
|
||||
return sinks.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected List<ServerName> getSinksForTesting() {
|
||||
return Collections.unmodifiableList(sinks);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TestReplicationSinkManager {
|
|||
|
||||
sinkManager.chooseSinks();
|
||||
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(2, sinkManager.getNumSinks());
|
||||
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,7 @@ public class TestReplicationSinkManager {
|
|||
|
||||
sinkManager.chooseSinks();
|
||||
|
||||
assertEquals(1, sinkManager.getSinks().size());
|
||||
assertEquals(1, sinkManager.getNumSinks());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -92,14 +92,14 @@ public class TestReplicationSinkManager {
|
|||
|
||||
sinkManager.chooseSinks();
|
||||
// Sanity check
|
||||
assertEquals(1, sinkManager.getSinks().size());
|
||||
assertEquals(1, sinkManager.getNumSinks());
|
||||
|
||||
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
|
||||
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
|
||||
// Just reporting a bad sink once shouldn't have an effect
|
||||
assertEquals(1, sinkManager.getSinks().size());
|
||||
assertEquals(1, sinkManager.getNumSinks());
|
||||
|
||||
}
|
||||
|
||||
|
@ -119,9 +119,9 @@ public class TestReplicationSinkManager {
|
|||
|
||||
sinkManager.chooseSinks();
|
||||
// Sanity check
|
||||
assertEquals(3, sinkManager.getSinks().size());
|
||||
assertEquals(3, sinkManager.getNumSinks());
|
||||
|
||||
ServerName serverName = sinkManager.getSinks().get(0);
|
||||
ServerName serverName = sinkManager.getSinksForTesting().get(0);
|
||||
|
||||
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
|
||||
|
@ -132,12 +132,12 @@ public class TestReplicationSinkManager {
|
|||
|
||||
// Reporting a bad sink more than the threshold count should remove it
|
||||
// from the list of potential sinks
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(2, sinkManager.getNumSinks());
|
||||
|
||||
//
|
||||
// now try a sink that has some successes
|
||||
//
|
||||
serverName = sinkManager.getSinks().get(0);
|
||||
serverName = sinkManager.getSinksForTesting().get(0);
|
||||
|
||||
sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
|
||||
|
@ -147,17 +147,17 @@ public class TestReplicationSinkManager {
|
|||
sinkManager.reportBadSink(sinkPeer);
|
||||
|
||||
// did not remove the sink, since we had one successful try
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(2, sinkManager.getNumSinks());
|
||||
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
}
|
||||
// still not remove, since the success reset the counter
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(2, sinkManager.getNumSinks());
|
||||
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
// but we exhausted the tries
|
||||
assertEquals(1, sinkManager.getSinks().size());
|
||||
assertEquals(1, sinkManager.getNumSinks());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -173,7 +173,7 @@ public class TestReplicationSinkManager {
|
|||
sinkManager.chooseSinks();
|
||||
// Sanity check
|
||||
|
||||
List<ServerName> sinkList = sinkManager.getSinks();
|
||||
List<ServerName> sinkList = sinkManager.getSinksForTesting();
|
||||
assertEquals(2, sinkList.size());
|
||||
|
||||
ServerName serverNameA = sinkList.get(0);
|
||||
|
@ -189,7 +189,7 @@ public class TestReplicationSinkManager {
|
|||
|
||||
// We've gone down to 0 good sinks, so the replication sinks
|
||||
// should have been refreshed now
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(2, sinkManager.getNumSinks());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue