Revert "HBASE-17314 Limit total buffered size for all replication sources"

This reverts commit 3826e63967.
This commit is contained in:
Michael Stack 2016-12-21 11:17:28 -08:00
parent acd0218d91
commit a1d2ff4646
6 changed files with 10 additions and 230 deletions

View File

@ -932,10 +932,6 @@ public final class HConstants {
public static final long public static final long
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota";
public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
/** /**
* Directory where the source cluster file system client configuration are placed which is used by * Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system * sink cluster to copy HFiles from source cluster file system

View File

@ -2340,8 +2340,7 @@ public class HRegionServer extends HasThread implements
* @return Return the object that implements the replication * @return Return the object that implements the replication
* source service. * source service.
*/ */
@VisibleForTesting ReplicationSourceService getReplicationSourceService() {
public ReplicationSourceService getReplicationSourceService() {
return replicationSourceHandler; return replicationSourceHandler;
} }

View File

@ -38,7 +38,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -151,9 +150,6 @@ public class ReplicationSource extends Thread
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads = private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>(); new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
private AtomicInteger totalBufferUsed;
private int totalBufferQuota;
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
* *
@ -205,9 +201,7 @@ public class ReplicationSource extends Thread
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth(); currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
+ ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
@ -542,7 +536,7 @@ public class ReplicationSource extends Thread
private boolean workerRunning = true; private boolean workerRunning = true;
// Current number of hfiles that we need to replicate // Current number of hfiles that we need to replicate
private long currentNbHFiles = 0; private long currentNbHFiles = 0;
List<WAL.Entry> entries;
// Use guava cache to set ttl for each key // Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder() private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build( .expireAfterAccess(1, TimeUnit.DAYS).build(
@ -562,7 +556,6 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = replicationQueueInfo; this.replicationQueueInfo = replicationQueueInfo;
this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source; this.source = source;
this.entries = new ArrayList<>();
} }
@Override @Override
@ -635,7 +628,8 @@ public class ReplicationSource extends Thread
boolean gotIOE = false; boolean gotIOE = false;
currentNbOperations = 0; currentNbOperations = 0;
currentNbHFiles = 0; currentNbHFiles = 0;
entries.clear(); List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
Map<String, Long> lastPositionsForSerialScope = new HashMap<>(); Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0; currentSize = 0;
try { try {
@ -727,7 +721,6 @@ public class ReplicationSource extends Thread
continue; continue;
} }
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
releaseBufferQuota();
} }
if (replicationQueueInfo.isQueueRecovered()) { if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue // use synchronize to make sure one last thread will clean the queue
@ -817,7 +810,7 @@ public class ReplicationSource extends Thread
} }
} }
} }
boolean totalBufferTooLarge = false;
// don't replicate if the log entries have already been consumed by the cluster // don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster() if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) { || !entry.getKey().getClusterIds().contains(peerClusterId)) {
@ -835,16 +828,15 @@ public class ReplicationSource extends Thread
logKey.addClusterId(clusterId); logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit); currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry); entries.add(entry);
int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); currentSize += entry.getEdit().heapSize();
currentSize += delta; currentSize += calculateTotalSizeOfStoreFiles(edit);
totalBufferTooLarge = acquireBufferQuota(delta);
} else { } else {
metrics.incrLogEditsFiltered(); metrics.incrLogEditsFiltered();
} }
} }
// Stop if too many entries or too big // Stop if too many entries or too big
// FIXME check the relationship between single wal group and overall // FIXME check the relationship between single wal group and overall
if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity if (currentSize >= replicationQueueSizeCapacity
|| entries.size() >= replicationQueueNbCapacity) { || entries.size() >= replicationQueueNbCapacity) {
break; break;
} }
@ -1325,19 +1317,5 @@ public class ReplicationSource extends Thread
public void setWorkerRunning(boolean workerRunning) { public void setWorkerRunning(boolean workerRunning) {
this.workerRunning = workerRunning; this.workerRunning = workerRunning;
} }
/**
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(int size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
}
private void releaseBufferQuota() {
totalBufferUsed.addAndGet(-currentSize);
currentSize = 0;
entries.clear();
}
} }
} }

View File

@ -42,7 +42,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -127,8 +126,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private Connection connection; private Connection connection;
private long replicationWaitTime; private long replicationWaitTime;
private AtomicInteger totalBufferUsed = new AtomicInteger();
/** /**
* Creates a replication manager and sets the watch on all the other registered region servers * Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues * @param replicationQueues the interface for manipulating replication queues
@ -438,11 +435,6 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
} }
@VisibleForTesting
AtomicInteger getTotalBufferUsed() {
return totalBufferUsed;
}
/** /**
* Factory method to create a replication source * Factory method to create a replication source
* @param conf the configuration to use * @param conf the configuration to use

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -362,7 +361,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@Override @Override
public boolean replicate(ReplicateContext replicateContext) { public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet(); replicateCount.incrementAndGet();
lastEntries = new ArrayList<>(replicateContext.entries); lastEntries = replicateContext.entries;
return true; return true;
} }

View File

@ -1,184 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestGlobalThrottler {
private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class);
private static Configuration conf1;
private static Configuration conf2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] VALUE = Bytes.toBytes("v");
private static final byte[] ROW = Bytes.toBytes("r");
private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
conf1.setLong("replication.source.sleepforretries", 100);
// Each WAL is about 120 bytes
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200);
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZooKeeperWatcher(conf2, "cluster2", null, true);
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
admin1.addPeer("peer1", rpc, null);
admin1.addPeer("peer2", rpc, null);
admin1.addPeer("peer3", rpc, null);
utility1.startMiniCluster(1, 1);
utility2.startMiniCluster(1, 1);
}
@AfterClass
public static void setDownAfterClass() throws Exception {
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
volatile private boolean testQuotaPass = false;
volatile private boolean testQuotaNonZero = false;
@Test
public void testQuota() throws IOException {
TableName tableName = TableName.valueOf("testQuota");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
table.addFamily(fam);
utility1.getHBaseAdmin().createTable(table);
utility2.getHBaseAdmin().createTable(table);
Thread watcher = new Thread(()->{
Replication replication = (Replication)utility1.getMiniHBaseCluster()
.getRegionServer(0).getReplicationSourceService();
AtomicInteger bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
testQuotaPass = true;
while (!Thread.interrupted()) {
int size = bufferUsed.get();
if (size > 0) {
testQuotaNonZero = true;
}
if (size > 600) {
// We read logs first then check throttler, so if the buffer quota limiter doesn't
// take effect, it will push many logs and exceed the quota.
testQuotaPass = false;
}
Threads.sleep(50);
}
});
watcher.start();
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
for (int i = 0; i < 50; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
scan.setCaching(50);
int count = 0;
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
count++;
}
}
if (count < 50) {
LOG.info("Waiting for all logs pushed to slave. Expected 50 , actual " + count);
Threads.sleep(200);
continue;
}
break;
}
}
watcher.interrupt();
Assert.assertTrue(testQuotaPass);
Assert.assertTrue(testQuotaNonZero);
}
private List<Integer> getRowNumbers(List<Cell> cells) {
List<Integer> listOfRowNumbers = new ArrayList<>();
for (Cell c : cells) {
listOfRowNumbers.add(Integer.parseInt(Bytes
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
c.getRowLength() - ROW.length)));
}
return listOfRowNumbers;
}
}