HBASE-13618 ReplicationSource is too eager to remove sinks.
This commit is contained in:
parent
e42e7ed110
commit
34327408c1
|
@ -163,6 +163,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
|
||||
// update metrics
|
||||
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
|
||||
replicationSinkMgr.reportSinkSuccess(sinkPeer);
|
||||
return true;
|
||||
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -140,6 +140,16 @@ public class ReplicationSinkManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report that a {@code SinkPeer} successfully replicated a chunk of data.
|
||||
*
|
||||
* @param sinkPeer
|
||||
* The SinkPeer that had a failed replication attempt on it
|
||||
*/
|
||||
public void reportSinkSuccess(SinkPeer sinkPeer) {
|
||||
badReportCounts.remove(sinkPeer.getServerName());
|
||||
}
|
||||
|
||||
void chooseSinks() {
|
||||
List<ServerName> slaveAddresses = endpoint.getRegionServers();
|
||||
Collections.shuffle(slaveAddresses, random);
|
||||
|
|
|
@ -110,7 +110,7 @@ public class TestReplicationSinkManager {
|
|||
@Test
|
||||
public void testReportBadSink_PastThreshold() {
|
||||
List<ServerName> serverNames = Lists.newArrayList();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
for (int i = 0; i < 30; i++) {
|
||||
serverNames.add(mock(ServerName.class));
|
||||
}
|
||||
when(replicationEndpoint.getRegionServers())
|
||||
|
@ -119,18 +119,44 @@ public class TestReplicationSinkManager {
|
|||
|
||||
sinkManager.chooseSinks();
|
||||
// Sanity check
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
assertEquals(3, sinkManager.getSinks().size());
|
||||
|
||||
ServerName serverName = sinkManager.getSinks().get(0);
|
||||
|
||||
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
|
||||
sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
}
|
||||
|
||||
// Reporting a bad sink more than the threshold count should remove it
|
||||
// from the list of potential sinks
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
|
||||
//
|
||||
// now try a sink that has some successes
|
||||
//
|
||||
serverName = sinkManager.getSinks().get(0);
|
||||
|
||||
sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
}
|
||||
sinkManager.reportSinkSuccess(sinkPeer); // one success
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
|
||||
// did not remove the sink, since we had one successful try
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
}
|
||||
// still not remove, since the success reset the counter
|
||||
assertEquals(2, sinkManager.getSinks().size());
|
||||
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
// but we exhausted the tries
|
||||
assertEquals(1, sinkManager.getSinks().size());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue