diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 27fb9d62fdd..fb15ce43668 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -170,9 +170,9 @@ public class ReplicationSource extends Thread for (int i = 0; i < this.replicationQueueNbCapacity; i++) { this.entriesArray[i] = new HLog.Entry(); } - this.maxRetriesMultiplier = - this.conf.getInt("replication.source.maxretriesmultiplier", 10); - this.socketTimeoutMultiplier = maxRetriesMultiplier * maxRetriesMultiplier; + this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); + this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", + maxRetriesMultiplier * maxRetriesMultiplier); this.queue = new PriorityBlockingQueue( conf.getInt("hbase.regionserver.maxlogs", 32), @@ -452,14 +452,16 @@ public class ReplicationSource extends Thread } private void connectToPeers() { + int sleepMultiplier = 1; + // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { - try { - chooseSinks(); - Thread.sleep(this.sleepForRetries); - } catch (InterruptedException e) { - LOG.error("Interrupted while trying to connect to sinks", e); + chooseSinks(); + if (this.isActive() && this.currentPeers.size() == 0) { + if (sleepForRetries("Waiting for peers", sleepMultiplier)) { + sleepMultiplier++; + } } } }