From 3826e639672eea11d73da333e6c15f6b7c23a46c Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 20 Dec 2016 16:05:18 +0800 Subject: [PATCH] HBASE-17314 Limit total buffered size for all replication sources --- .../org/apache/hadoop/hbase/HConstants.java | 4 + .../hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 38 +++- .../ReplicationSourceManager.java | 8 + .../replication/TestReplicationEndpoint.java | 3 +- .../regionserver/TestGlobalThrottler.java | 184 ++++++++++++++++++ 6 files changed, 230 insertions(+), 10 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 48d97780249..dc96c2aec78 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -932,6 +932,10 @@ public final class HConstants { public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; + public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota"; + public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; + + /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5bc0a66a9eb..853d699342a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2340,7 +2340,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c988f8766ce..97368e658da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; @@ -150,6 +151,9 @@ public class ReplicationSource extends Thread private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); + private AtomicInteger totalBufferUsed; + private int totalBufferQuota; + /** * Instantiation method used by region servers * @@ -201,7 +205,9 @@ public class ReplicationSource extends Thread defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - + this.totalBufferUsed = manager.getTotalBufferUsed(); + this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" @@ -534,7 +540,7 @@ public class ReplicationSource extends Thread private boolean workerRunning = true; // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; - + List entries; // Use guava cache to set ttl for each key private LoadingCache canSkipWaitingSet = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.DAYS).build( @@ -554,6 +560,7 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = replicationQueueInfo; this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; + this.entries = new ArrayList<>(); } @Override @@ -626,8 +633,7 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; currentNbHFiles = 0; - List entries = new ArrayList(1); - + entries.clear(); Map lastPositionsForSerialScope = new HashMap<>(); currentSize = 0; try { @@ -719,6 +725,7 @@ public class ReplicationSource extends Thread continue; } shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); + releaseBufferQuota(); } if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue @@ -808,7 +815,7 @@ public class ReplicationSource extends Thread } } } - + boolean totalBufferTooLarge = false; // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { @@ -826,15 +833,16 @@ public class ReplicationSource extends Thread logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); entries.add(entry); - currentSize += entry.getEdit().heapSize(); - currentSize += calculateTotalSizeOfStoreFiles(edit); + int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); + currentSize += delta; + totalBufferTooLarge = acquireBufferQuota(delta); } else { metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big // FIXME check the relationship between single wal group and overall - if (currentSize >= replicationQueueSizeCapacity + if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity || entries.size() >= replicationQueueNbCapacity) { break; } @@ -1315,5 +1323,19 @@ public class ReplicationSource extends Thread public void setWorkerRunning(boolean workerRunning) { this.workerRunning = workerRunning; } + + /** + * @param size delta size for grown buffer + * @return true if we should clear buffer and push all + */ + private boolean acquireBufferQuota(int size) { + return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + } + + private void releaseBufferQuota() { + totalBufferUsed.addAndGet(-currentSize); + currentSize = 0; + entries.clear(); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2c9fdcc1fbb..2634a5262ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -126,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener { private Connection connection; private long replicationWaitTime; + private AtomicInteger totalBufferUsed = new AtomicInteger(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -435,6 +438,11 @@ public class ReplicationSourceManager implements ReplicationListener { } } + @VisibleForTesting + AtomicInteger getTotalBufferUsed() { + return totalBufferUsed; + } + /** * Factory method to create a replication source * @param conf the configuration to use diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 002b8c9863d..f9c467e2609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -361,7 +362,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Override public boolean replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); - lastEntries = replicateContext.entries; + lastEntries = new ArrayList<>(replicateContext.entries); return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java new file mode 100644 index 00000000000..a40d7ed3834 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestGlobalThrottler { + private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class); + private static Configuration conf1; + private static Configuration conf2; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] VALUE = Bytes.toBytes("v"); + private static final byte[] ROW = Bytes.toBytes("r"); + private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf1.setLong("replication.source.sleepforretries", 100); + // Each WAL is about 120 bytes + conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); + conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster2", null, true); + + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin1.addPeer("peer1", rpc, null); + admin1.addPeer("peer2", rpc, null); + admin1.addPeer("peer3", rpc, null); + + utility1.startMiniCluster(1, 1); + utility2.startMiniCluster(1, 1); + } + + @AfterClass + public static void setDownAfterClass() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + + volatile private boolean testQuotaPass = false; + volatile private boolean testQuotaNonZero = false; + @Test + public void testQuota() throws IOException { + TableName tableName = TableName.valueOf("testQuota"); + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL); + table.addFamily(fam); + utility1.getHBaseAdmin().createTable(table); + utility2.getHBaseAdmin().createTable(table); + + Thread watcher = new Thread(()->{ + Replication replication = (Replication)utility1.getMiniHBaseCluster() + .getRegionServer(0).getReplicationSourceService(); + AtomicInteger bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); + testQuotaPass = true; + while (!Thread.interrupted()) { + int size = bufferUsed.get(); + if (size > 0) { + testQuotaNonZero = true; + } + if (size > 600) { + // We read logs first then check throttler, so if the buffer quota limiter doesn't + // take effect, it will push many logs and exceed the quota. + testQuotaPass = false; + } + Threads.sleep(50); + } + }); + watcher.start(); + + try(Table t1 = utility1.getConnection().getTable(tableName); + Table t2 = utility2.getConnection().getTable(tableName)) { + for (int i = 0; i < 50; i++) { + Put put = new Put(ROWS[i]); + put.addColumn(famName, VALUE, VALUE); + t1.put(put); + } + long start = EnvironmentEdgeManager.currentTime(); + while (EnvironmentEdgeManager.currentTime() - start < 180000) { + Scan scan = new Scan(); + scan.setCaching(50); + int count = 0; + try (ResultScanner results = t2.getScanner(scan)) { + for (Result result : results) { + count++; + } + } + if (count < 50) { + LOG.info("Waiting for all logs pushed to slave. Expected 50 , actual " + count); + Threads.sleep(200); + continue; + } + break; + } + } + + watcher.interrupt(); + Assert.assertTrue(testQuotaPass); + Assert.assertTrue(testQuotaNonZero); + } + + private List getRowNumbers(List cells) { + List listOfRowNumbers = new ArrayList<>(); + for (Cell c : cells) { + listOfRowNumbers.add(Integer.parseInt(Bytes + .toString(c.getRowArray(), c.getRowOffset() + ROW.length, + c.getRowLength() - ROW.length))); + } + return listOfRowNumbers; + } +}