HBASE-17314 Limit total buffered size for all replication sources
This commit is contained in:
parent
3599716dff
commit
3826e63967
|
@ -932,6 +932,10 @@ public final class HConstants {
|
||||||
public static final long
|
public static final long
|
||||||
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
|
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
|
||||||
|
|
||||||
|
public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota";
|
||||||
|
public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Directory where the source cluster file system client configuration are placed which is used by
|
* Directory where the source cluster file system client configuration are placed which is used by
|
||||||
* sink cluster to copy HFiles from source cluster file system
|
* sink cluster to copy HFiles from source cluster file system
|
||||||
|
|
|
@ -2340,7 +2340,8 @@ public class HRegionServer extends HasThread implements
|
||||||
* @return Return the object that implements the replication
|
* @return Return the object that implements the replication
|
||||||
* source service.
|
* source service.
|
||||||
*/
|
*/
|
||||||
ReplicationSourceService getReplicationSourceService() {
|
@VisibleForTesting
|
||||||
|
public ReplicationSourceService getReplicationSourceService() {
|
||||||
return replicationSourceHandler;
|
return replicationSourceHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
@ -150,6 +151,9 @@ public class ReplicationSource extends Thread
|
||||||
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
|
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
|
||||||
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
|
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
|
||||||
|
|
||||||
|
private AtomicInteger totalBufferUsed;
|
||||||
|
private int totalBufferQuota;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiation method used by region servers
|
* Instantiation method used by region servers
|
||||||
*
|
*
|
||||||
|
@ -201,7 +205,9 @@ public class ReplicationSource extends Thread
|
||||||
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||||
currentBandwidth = getCurrentBandwidth();
|
currentBandwidth = getCurrentBandwidth();
|
||||||
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
||||||
|
this.totalBufferUsed = manager.getTotalBufferUsed();
|
||||||
|
this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
|
||||||
|
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
||||||
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
|
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
|
||||||
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
|
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
|
||||||
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
|
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
|
||||||
|
@ -534,7 +540,7 @@ public class ReplicationSource extends Thread
|
||||||
private boolean workerRunning = true;
|
private boolean workerRunning = true;
|
||||||
// Current number of hfiles that we need to replicate
|
// Current number of hfiles that we need to replicate
|
||||||
private long currentNbHFiles = 0;
|
private long currentNbHFiles = 0;
|
||||||
|
List<WAL.Entry> entries;
|
||||||
// Use guava cache to set ttl for each key
|
// Use guava cache to set ttl for each key
|
||||||
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
|
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
|
||||||
.expireAfterAccess(1, TimeUnit.DAYS).build(
|
.expireAfterAccess(1, TimeUnit.DAYS).build(
|
||||||
|
@ -554,6 +560,7 @@ public class ReplicationSource extends Thread
|
||||||
this.replicationQueueInfo = replicationQueueInfo;
|
this.replicationQueueInfo = replicationQueueInfo;
|
||||||
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
|
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
this.entries = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -626,8 +633,7 @@ public class ReplicationSource extends Thread
|
||||||
boolean gotIOE = false;
|
boolean gotIOE = false;
|
||||||
currentNbOperations = 0;
|
currentNbOperations = 0;
|
||||||
currentNbHFiles = 0;
|
currentNbHFiles = 0;
|
||||||
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
|
entries.clear();
|
||||||
|
|
||||||
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
|
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
|
||||||
currentSize = 0;
|
currentSize = 0;
|
||||||
try {
|
try {
|
||||||
|
@ -719,6 +725,7 @@ public class ReplicationSource extends Thread
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
|
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
|
||||||
|
releaseBufferQuota();
|
||||||
}
|
}
|
||||||
if (replicationQueueInfo.isQueueRecovered()) {
|
if (replicationQueueInfo.isQueueRecovered()) {
|
||||||
// use synchronize to make sure one last thread will clean the queue
|
// use synchronize to make sure one last thread will clean the queue
|
||||||
|
@ -808,7 +815,7 @@ public class ReplicationSource extends Thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
boolean totalBufferTooLarge = false;
|
||||||
// don't replicate if the log entries have already been consumed by the cluster
|
// don't replicate if the log entries have already been consumed by the cluster
|
||||||
if (replicationEndpoint.canReplicateToSameCluster()
|
if (replicationEndpoint.canReplicateToSameCluster()
|
||||||
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
|
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
|
||||||
|
@ -826,15 +833,16 @@ public class ReplicationSource extends Thread
|
||||||
logKey.addClusterId(clusterId);
|
logKey.addClusterId(clusterId);
|
||||||
currentNbOperations += countDistinctRowKeys(edit);
|
currentNbOperations += countDistinctRowKeys(edit);
|
||||||
entries.add(entry);
|
entries.add(entry);
|
||||||
currentSize += entry.getEdit().heapSize();
|
int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit);
|
||||||
currentSize += calculateTotalSizeOfStoreFiles(edit);
|
currentSize += delta;
|
||||||
|
totalBufferTooLarge = acquireBufferQuota(delta);
|
||||||
} else {
|
} else {
|
||||||
metrics.incrLogEditsFiltered();
|
metrics.incrLogEditsFiltered();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Stop if too many entries or too big
|
// Stop if too many entries or too big
|
||||||
// FIXME check the relationship between single wal group and overall
|
// FIXME check the relationship between single wal group and overall
|
||||||
if (currentSize >= replicationQueueSizeCapacity
|
if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity
|
||||||
|| entries.size() >= replicationQueueNbCapacity) {
|
|| entries.size() >= replicationQueueNbCapacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1315,5 +1323,19 @@ public class ReplicationSource extends Thread
|
||||||
public void setWorkerRunning(boolean workerRunning) {
|
public void setWorkerRunning(boolean workerRunning) {
|
||||||
this.workerRunning = workerRunning;
|
this.workerRunning = workerRunning;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param size delta size for grown buffer
|
||||||
|
* @return true if we should clear buffer and push all
|
||||||
|
*/
|
||||||
|
private boolean acquireBufferQuota(int size) {
|
||||||
|
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseBufferQuota() {
|
||||||
|
totalBufferUsed.addAndGet(-currentSize);
|
||||||
|
currentSize = 0;
|
||||||
|
entries.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -126,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private long replicationWaitTime;
|
private long replicationWaitTime;
|
||||||
|
|
||||||
|
private AtomicInteger totalBufferUsed = new AtomicInteger();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a replication manager and sets the watch on all the other registered region servers
|
* Creates a replication manager and sets the watch on all the other registered region servers
|
||||||
* @param replicationQueues the interface for manipulating replication queues
|
* @param replicationQueues the interface for manipulating replication queues
|
||||||
|
@ -435,6 +438,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
AtomicInteger getTotalBufferUsed() {
|
||||||
|
return totalBufferUsed;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory method to create a replication source
|
* Factory method to create a replication source
|
||||||
* @param conf the configuration to use
|
* @param conf the configuration to use
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -361,7 +362,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
@Override
|
@Override
|
||||||
public boolean replicate(ReplicateContext replicateContext) {
|
public boolean replicate(ReplicateContext replicateContext) {
|
||||||
replicateCount.incrementAndGet();
|
replicateCount.incrementAndGet();
|
||||||
lastEntries = replicateContext.entries;
|
lastEntries = new ArrayList<>(replicateContext.entries);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,184 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTestConst;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestGlobalThrottler {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
|
||||||
|
private static Configuration conf1;
|
||||||
|
private static Configuration conf2;
|
||||||
|
|
||||||
|
private static HBaseTestingUtility utility1;
|
||||||
|
private static HBaseTestingUtility utility2;
|
||||||
|
|
||||||
|
private static final byte[] famName = Bytes.toBytes("f");
|
||||||
|
private static final byte[] VALUE = Bytes.toBytes("v");
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("r");
|
||||||
|
private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf1 = HBaseConfiguration.create();
|
||||||
|
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||||
|
conf1.setLong("replication.source.sleepforretries", 100);
|
||||||
|
// Each WAL is about 120 bytes
|
||||||
|
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
|
||||||
|
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
|
||||||
|
|
||||||
|
utility1 = new HBaseTestingUtility(conf1);
|
||||||
|
utility1.startMiniZKCluster();
|
||||||
|
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
|
||||||
|
new ZooKeeperWatcher(conf1, "cluster1", null, true);
|
||||||
|
|
||||||
|
conf2 = new Configuration(conf1);
|
||||||
|
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||||
|
|
||||||
|
utility2 = new HBaseTestingUtility(conf2);
|
||||||
|
utility2.setZkCluster(miniZK);
|
||||||
|
new ZooKeeperWatcher(conf2, "cluster2", null, true);
|
||||||
|
|
||||||
|
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
|
rpc.setClusterKey(utility2.getClusterKey());
|
||||||
|
admin1.addPeer("peer1", rpc, null);
|
||||||
|
admin1.addPeer("peer2", rpc, null);
|
||||||
|
admin1.addPeer("peer3", rpc, null);
|
||||||
|
|
||||||
|
utility1.startMiniCluster(1, 1);
|
||||||
|
utility2.startMiniCluster(1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void setDownAfterClass() throws Exception {
|
||||||
|
utility2.shutdownMiniCluster();
|
||||||
|
utility1.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
volatile private boolean testQuotaPass = false;
|
||||||
|
volatile private boolean testQuotaNonZero = false;
|
||||||
|
@Test
|
||||||
|
public void testQuota() throws IOException {
|
||||||
|
TableName tableName = TableName.valueOf("testQuota");
|
||||||
|
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||||
|
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||||
|
table.addFamily(fam);
|
||||||
|
utility1.getHBaseAdmin().createTable(table);
|
||||||
|
utility2.getHBaseAdmin().createTable(table);
|
||||||
|
|
||||||
|
Thread watcher = new Thread(()->{
|
||||||
|
Replication replication = (Replication)utility1.getMiniHBaseCluster()
|
||||||
|
.getRegionServer(0).getReplicationSourceService();
|
||||||
|
AtomicInteger bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
|
||||||
|
testQuotaPass = true;
|
||||||
|
while (!Thread.interrupted()) {
|
||||||
|
int size = bufferUsed.get();
|
||||||
|
if (size > 0) {
|
||||||
|
testQuotaNonZero = true;
|
||||||
|
}
|
||||||
|
if (size > 600) {
|
||||||
|
// We read logs first then check throttler, so if the buffer quota limiter doesn't
|
||||||
|
// take effect, it will push many logs and exceed the quota.
|
||||||
|
testQuotaPass = false;
|
||||||
|
}
|
||||||
|
Threads.sleep(50);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
watcher.start();
|
||||||
|
|
||||||
|
try(Table t1 = utility1.getConnection().getTable(tableName);
|
||||||
|
Table t2 = utility2.getConnection().getTable(tableName)) {
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
Put put = new Put(ROWS[i]);
|
||||||
|
put.addColumn(famName, VALUE, VALUE);
|
||||||
|
t1.put(put);
|
||||||
|
}
|
||||||
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
|
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(50);
|
||||||
|
int count = 0;
|
||||||
|
try (ResultScanner results = t2.getScanner(scan)) {
|
||||||
|
for (Result result : results) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count < 50) {
|
||||||
|
LOG.info("Waiting for all logs pushed to slave. Expected 50 , actual " + count);
|
||||||
|
Threads.sleep(200);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher.interrupt();
|
||||||
|
Assert.assertTrue(testQuotaPass);
|
||||||
|
Assert.assertTrue(testQuotaNonZero);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Integer> getRowNumbers(List<Cell> cells) {
|
||||||
|
List<Integer> listOfRowNumbers = new ArrayList<>();
|
||||||
|
for (Cell c : cells) {
|
||||||
|
listOfRowNumbers.add(Integer.parseInt(Bytes
|
||||||
|
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
|
||||||
|
c.getRowLength() - ROW.length)));
|
||||||
|
}
|
||||||
|
return listOfRowNumbers;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue