HBASE-9501 Provide throttling for replication (Feng Honghua via JD)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1566923 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2014-02-11 00:48:38 +00:00
parent 083a1cfd9c
commit 27cc3f0ed3
3 changed files with 249 additions and 0 deletions

View File

@ -131,6 +131,8 @@ public class ReplicationSource extends Thread
private ReplicationSinkManager replicationSinkMgr;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// throttler
private ReplicationThrottler throttler;
/**
* Instantiation method used by region servers
@ -164,6 +166,8 @@ public class ReplicationSource extends Thread
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = HConnectionManager.getConnection(this.conf);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
@ -598,6 +602,7 @@ public class ReplicationSource extends Thread
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
@ -661,6 +666,22 @@ public class ReplicationSource extends Thread
}
SinkPeer sinkPeer = null;
try {
if (this.throttler.isEnabled()) {
long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
if (sleepTicks > 0) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
}
Thread.sleep(sleepTicks);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
}
// reset throttler's cycle start tick when sleep for throttling occurs
this.throttler.resetStartTick();
}
}
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
@ -675,6 +696,9 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (this.throttler.isEnabled()) {
this.throttler.addPushSize(currentSize);
}
this.totalReplicatedEdits += entries.size();
this.totalReplicatedOperations += currentNbOperations;
this.metrics.shipBatch(this.currentNbOperations);

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Per-peer per-node throttling controller for replication: enabled if
* bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed
* to peer within each cycle won't exceed 'bandwidth' bytes
*/
@InterfaceAudience.Private
public class ReplicationThrottler {
private final boolean enabled;
private final double bandwidth;
private long cyclePushSize;
private long cycleStartTick;
/**
* ReplicationThrottler constructor
* If bandwidth less than 1, throttling is disabled
* @param bandwidth per cycle(100ms)
*/
public ReplicationThrottler(final double bandwidth) {
this.bandwidth = bandwidth;
this.enabled = this.bandwidth > 0;
if (this.enabled) {
this.cyclePushSize = 0;
this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
}
}
/**
* If throttling is enabled
* @return true if throttling is enabled
*/
public boolean isEnabled() {
return this.enabled;
}
/**
* Get how long the caller should sleep according to the current size and
* current cycle's total push size and start tick, return the sleep interval
* for throttling control.
* @param size is the size of edits to be pushed
* @return sleep interval for throttling control
*/
public long getNextSleepInterval(final int size) {
if (!this.enabled) {
return 0;
}
long sleepTicks = 0;
long now = EnvironmentEdgeManager.currentTimeMillis();
// 1. if cyclePushSize exceeds bandwidth, we need to sleep some
// following cycles to amortize, this case can occur when a single push
// exceeds the bandwidth
if ((double)this.cyclePushSize > bandwidth) {
double cycles = Math.ceil((double)this.cyclePushSize / bandwidth);
long shouldTillTo = this.cycleStartTick + (long)(cycles * 100);
if (shouldTillTo > now) {
sleepTicks = shouldTillTo - now;
} else {
// no reset in shipEdits since no sleep, so we need to reset cycleStartTick here!
this.cycleStartTick = now;
}
this.cyclePushSize = 0;
} else {
long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms
if (now >= nextCycleTick) {
// 2. switch to next cycle if the current cycle has passed
this.cycleStartTick = now;
this.cyclePushSize = 0;
} else if (this.cyclePushSize > 0 &&
(double)(this.cyclePushSize + size) >= bandwidth) {
// 3. delay the push to next cycle if exceeds throttling bandwidth.
// enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case
// where a cycle's first push size(currentSize) > bandwidth
sleepTicks = nextCycleTick - now;
this.cyclePushSize = 0;
}
}
return sleepTicks;
}
/**
* Add current size to the current cycle's total push size
* @param size is the current size added to the current cycle's
* total push size
*/
public void addPushSize(final int size) {
if (this.enabled) {
this.cyclePushSize += size;
}
}
/**
* Reset the cycle start tick to NOW
*/
public void resetStartTick() {
if (this.enabled) {
this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis();
}
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestReplicationThrottler {
private static final Log LOG = LogFactory.getLog(TestReplicationThrottler.class);
/**
* unit test for throttling
*/
@Test(timeout=10000)
public void testThrottling() {
LOG.info("testThrottling");
// throttle bandwidth is 100 and 10 bytes/cycle respectively
ReplicationThrottler throttler1 = new ReplicationThrottler(100);
ReplicationThrottler throttler2 = new ReplicationThrottler(10);
long ticks1 = throttler1.getNextSleepInterval(1000);
long ticks2 = throttler2.getNextSleepInterval(1000);
// 1. the first push size is 1000, though 1000 bytes exceeds 100/10
// bandwidthes, but no sleep since it's the first push of current
// cycle, amortizing occurs when next push arrives
assertEquals(0, ticks1);
assertEquals(0, ticks2);
throttler1.addPushSize(1000);
throttler2.addPushSize(1000);
ticks1 = throttler1.getNextSleepInterval(5);
ticks2 = throttler2.getNextSleepInterval(5);
// 2. when the second push(5) arrives and throttling(5) is called, the
// current cyclePushSize is 1000 bytes, this should make throttler1
// sleep 1000/100 = 10 cycles = 1s and make throttler2 sleep 1000/10
// = 100 cycles = 10s before the second push occurs -- amortize case
// after amortizing, both cycleStartTick and cyclePushSize are reset
assertTrue(ticks1 == 1000 || ticks1 == 999);
assertTrue(ticks2 == 10000 || ticks2 == 9999);
throttler1.resetStartTick();
throttler2.resetStartTick();
throttler1.addPushSize(5);
throttler2.addPushSize(5);
ticks1 = throttler1.getNextSleepInterval(45);
ticks2 = throttler2.getNextSleepInterval(45);
// 3. when the third push(45) arrives and throttling(45) is called, the
// current cyclePushSize is 5 bytes, 50-byte makes throttler1 no
// sleep, but can make throttler2 delay to next cycle
// note: in real case, sleep time should cover time elapses during push
// operation
assertTrue(ticks1 == 0);
assertTrue(ticks2 == 100 || ticks2 == 99);
throttler2.resetStartTick();
throttler1.addPushSize(45);
throttler2.addPushSize(45);
ticks1 = throttler1.getNextSleepInterval(60);
ticks2 = throttler2.getNextSleepInterval(60);
// 4. when the fourth push(60) arrives and throttling(60) is called, throttler1
// delay to next cycle since 45+60 == 105; and throttler2 should firstly sleep
// ceiling(45/10)= 5 cycles = 500ms to amortize previous push
// note: in real case, sleep time should cover time elapses during push
// operation
assertTrue(ticks1 == 100 || ticks1 == 99);
assertTrue(ticks2 == 500 || ticks2 == 499);
}
}