HBASE-11964 Improve spreading replication load from failed regionservers

This commit is contained in:
Andrew Purtell 2014-10-24 14:17:46 -07:00
parent b8ed37b88e
commit 54fdd96516
4 changed files with 10 additions and 4 deletions

View File

@ -161,7 +161,10 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity = this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000); this.conf.getInt("replication.source.nb.capacity", 25000);
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queue = this.queue =
new PriorityBlockingQueue<Path>( new PriorityBlockingQueue<Path>(
this.conf.getInt("hbase.regionserver.maxlogs", 32), this.conf.getInt("hbase.regionserver.maxlogs", 32),
@ -171,8 +174,6 @@ public class ReplicationSource extends Thread
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers; this.replicationPeers = replicationPeers;
this.manager = manager; this.manager = manager;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs; this.fs = fs;
this.metrics = metrics; this.metrics = metrics;
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);

View File

@ -137,7 +137,8 @@ public class ReplicationSourceManager implements ReplicationListener {
this.fs = fs; this.fs = fs;
this.logDir = logDir; this.logDir = logDir;
this.oldLogDir = oldLogDir; this.oldLogDir = oldLogDir;
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.sleepBeforeFailover =
conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
this.clusterId = clusterId; this.clusterId = clusterId;
this.replicationTracker.registerListener(this); this.replicationTracker.registerListener(this);
this.replicationPeers.getAllPeerIds(); this.replicationPeers.getAllPeerIds();

View File

@ -99,6 +99,8 @@ public class TestReplicationBase {
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setInt("replication.stats.thread.period.seconds", 5);
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10);
utility1 = new HBaseTestingUtility(conf1); utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster(); utility1.startMiniZKCluster();

View File

@ -128,6 +128,8 @@ public class TestReplicationSourceManager {
ReplicationSourceDummy.class.getCanonicalName()); ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT); HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
utility = new HBaseTestingUtility(conf); utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster(); utility.startMiniZKCluster();