From 29e390c80895af54206d6a14eac50ca2859cf2b7 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 20 Dec 2016 19:23:49 +0800 Subject: [PATCH] HBASE-17314 Limit total buffered size for all replication sources --- .../org/apache/hadoop/hbase/HConstants.java | 9 + .../hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 36 +++- .../ReplicationSourceManager.java | 8 + .../replication/TestReplicationEndpoint.java | 3 +- .../replication/TestReplicationSource.java | 13 +- .../regionserver/TestGlobalThrottler.java | 187 ++++++++++++++++++ 7 files changed, 247 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4606b03d765..7d3190c00c2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -920,6 +920,15 @@ public final class HConstants { public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; + /** + * Max total size of buffered entries in all replication peers. It will prevent server getting + * OOM if there are many peers. Default value is 256MB which is four times to default + * replication.source.size.capacity. + */ + public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota"; + public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; + + /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6ec4b672473..6bf5786ea79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2222,7 +2222,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c7809b516e5..594979043a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -150,6 +150,9 @@ public class ReplicationSource extends Thread private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); + private AtomicLong totalBufferUsed; + private long totalBufferQuota; + /** * Instantiation method used by region servers * @@ -201,7 +204,9 @@ public class ReplicationSource extends Thread defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - + this.totalBufferUsed = manager.getTotalBufferUsed(); + this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" @@ -536,7 +541,7 @@ public class ReplicationSource extends Thread private boolean workerRunning = true; // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; - + List entries; // Use guava cache to set ttl for each key private LoadingCache canSkipWaitingSet = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.DAYS).build( @@ -555,6 +560,7 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = replicationQueueInfo; this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; + this.entries = new ArrayList<>(); } @Override @@ -627,8 +633,7 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; currentNbHFiles = 0; - List entries = new ArrayList(1); - + entries.clear(); Map lastPositionsForSerialScope = new HashMap<>(); currentSize = 0; try { @@ -720,6 +725,7 @@ public class ReplicationSource extends Thread continue; } shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); + releaseBufferQuota(); } if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue @@ -809,6 +815,7 @@ public class ReplicationSource extends Thread } } } + boolean totalBufferTooLarge = false; // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { @@ -826,15 +833,16 @@ public class ReplicationSource extends Thread logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); entries.add(entry); - currentSize += entry.getEdit().heapSize(); - currentSize += calculateTotalSizeOfStoreFiles(edit); + int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); + currentSize += delta; + totalBufferTooLarge = acquireBufferQuota(delta); } else { metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big // FIXME check the relationship between single wal group and overall - if (currentSize >= replicationQueueSizeCapacity + if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity || entries.size() >= replicationQueueNbCapacity) { break; } @@ -1317,5 +1325,19 @@ public class ReplicationSource extends Thread public void setWorkerRunning(boolean workerRunning) { this.workerRunning = workerRunning; } + + /** + * @param size delta size for grown buffer + * @return true if we should clear buffer and push all + */ + private boolean acquireBufferQuota(int size) { + return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + } + + private void releaseBufferQuota() { + totalBufferUsed.addAndGet(-currentSize); + currentSize = 0; + entries.clear(); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index cdc6fce500c..c371e19cb0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -126,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener { private Connection connection; private long replicationWaitTime; + private AtomicLong totalBufferUsed = new AtomicLong(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -446,6 +449,11 @@ public class ReplicationSourceManager implements ReplicationListener { } } + @VisibleForTesting + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } + /** * Factory method to create a replication source * @param conf the configuration to use diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 5a54314837c..b9d5582a4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -360,7 +361,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Override public boolean replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); - lastEntries = replicateContext.entries; + lastEntries = new ArrayList<>(replicateContext.entries); return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 9bf0e9338c0..f7e644f9e07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider; @@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import static org.mockito.Mockito.mock; @@ -140,11 +143,15 @@ public class TestReplicationSource { } }; replicationEndpoint.start(); - ReplicationPeers mockPeers = mock(ReplicationPeers.class); + ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); - source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint, - null); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + source.init(testConf, null, manager, null, mockPeers, null, "testPeer", + null, replicationEndpoint, null); ExecutorService executor = Executors.newSingleThreadExecutor(); final Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java new file mode 100644 index 00000000000..48ef781fd8c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestGlobalThrottler { + private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class); + private static Configuration conf1; + private static Configuration conf2; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] VALUE = Bytes.toBytes("v"); + private static final byte[] ROW = Bytes.toBytes("r"); + private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf1.setLong("replication.source.sleepforretries", 100); + // Each WAL is about 120 bytes + conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); + conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster2", null, true); + + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin1.addPeer("peer1", rpc, null); + admin1.addPeer("peer2", rpc, null); + admin1.addPeer("peer3", rpc, null); + + utility1.startMiniCluster(1, 1); + utility2.startMiniCluster(1, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + + volatile private boolean testQuotaPass = false; + volatile private boolean testQuotaNonZero = false; + @Test + public void testQuota() throws IOException { + TableName tableName = TableName.valueOf("testQuota"); + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL); + table.addFamily(fam); + utility1.getHBaseAdmin().createTable(table); + utility2.getHBaseAdmin().createTable(table); + + Thread watcher = new Thread(new Runnable() { + @Override + public void run() { + Replication replication = (Replication) utility1.getMiniHBaseCluster() + .getRegionServer(0).getReplicationSourceService(); + AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); + testQuotaPass = true; + while (!Thread.interrupted()) { + long size = bufferUsed.get(); + if (size > 0) { + testQuotaNonZero = true; + } + if (size > 600) { + // We read logs first then check throttler, so if the buffer quota limiter doesn't + // take effect, it will push many logs and exceed the quota. + testQuotaPass = false; + } + Threads.sleep(50); + } + } + }); + + watcher.start(); + + try(Table t1 = utility1.getConnection().getTable(tableName); + Table t2 = utility2.getConnection().getTable(tableName)) { + for (int i = 0; i < 50; i++) { + Put put = new Put(ROWS[i]); + put.addColumn(famName, VALUE, VALUE); + t1.put(put); + } + long start = EnvironmentEdgeManager.currentTime(); + while (EnvironmentEdgeManager.currentTime() - start < 180000) { + Scan scan = new Scan(); + scan.setCaching(50); + int count = 0; + try (ResultScanner results = t2.getScanner(scan)) { + for (Result result : results) { + count++; + } + } + if (count < 50) { + LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count); + Threads.sleep(200); + continue; + } + break; + } + } + + watcher.interrupt(); + Assert.assertTrue(testQuotaPass); + Assert.assertTrue(testQuotaNonZero); + } + + private List getRowNumbers(List cells) { + List listOfRowNumbers = new ArrayList<>(); + for (Cell c : cells) { + listOfRowNumbers.add(Integer.parseInt(Bytes + .toString(c.getRowArray(), c.getRowOffset() + ROW.length, + c.getRowLength() - ROW.length))); + } + return listOfRowNumbers; + } +}