HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold

Closes #2127

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Viraj Jasani 2020-07-24 13:12:15 +05:30
parent 0b85729da4
commit 09e7ccd42d
No known key found for this signature in database
GPG Key ID: B3D6C0B41C8ADFD5
7 changed files with 86 additions and 41 deletions

View File

@ -1583,6 +1583,16 @@ public final class HConstants {
"hbase.regionserver.slowlog.systable.enabled"; "hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
/**
* Number of rows in a batch operation above which a warning will be logged.
*/
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
private HConstants() { private HConstants() {
// Can't be instantiated with this ctor. // Can't be instantiated with this ctor.
} }

View File

@ -294,15 +294,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/ */
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
/* /*
* Whether to reject rows with size > threshold defined by * Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
@ -1265,7 +1256,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final Configuration conf = rs.getConfiguration(); final Configuration conf = rs.getConfiguration();
this.ld = ld; this.ld = ld;
regionServer = rs; regionServer = rs;
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
rejectRowsWithSizeOverThreshold = rejectRowsWithSizeOverThreshold =
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);

View File

@ -212,7 +212,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override @Override
public void startReplicationService() throws IOException { public void startReplicationService() throws IOException {
this.replicationManager.init(); this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server); this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate( this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncClusterConnection;
@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -93,15 +94,21 @@ public class ReplicationSink {
private SourceFSConfigurationProvider provider; private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter; private WALEntrySinkFilter walEntrySinkFilter;
/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;
/** /**
* Create a sink for replication * Create a sink for replication
* @param conf conf object * @param conf conf object
* @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name * @throws IOException thrown when HDFS goes bad or bad file name
*/ */
public ReplicationSink(Configuration conf, Stoppable stopper) public ReplicationSink(Configuration conf)
throws IOException { throws IOException {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf(); decorateConf();
this.metrics = new MetricsSink(); this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter(); this.walEntrySinkFilter = setupWALEntrySinkFilter();
@ -211,11 +218,7 @@ public class ReplicationSink {
// Map of table name Vs list of pair of family and list of // Map of table name Vs list of pair of family and list of
// hfile paths from its namespace // hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList()); bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} }
} else { } else {
@ -250,7 +253,7 @@ public class ReplicationSink {
if (!rowMap.isEmpty()) { if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations."); LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values()); batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
} }
LOG.debug("Finished replicating mutations."); LOG.debug("Finished replicating mutations.");
} }
@ -366,16 +369,8 @@ public class ReplicationSink {
*/ */
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1,
K2 key2, V value) { K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1); Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
if (innerMap == null) { List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
innerMap = new HashMap<>();
map.put(key1, innerMap);
}
List<V> values = innerMap.get(key2);
if (values == null) {
values = new ArrayList<>();
innerMap.put(key2, values);
}
values.add(value); values.add(value);
return values; return values;
} }
@ -403,13 +398,24 @@ public class ReplicationSink {
* Do the changes and handle the pool * Do the changes and handle the pool
* @param tableName table to insert into * @param tableName table to insert into
* @param allRows list of actions * @param allRows list of actions
* @param batchRowSizeThreshold rowSize threshold for batch mutation
*/ */
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
throws IOException {
if (allRows.isEmpty()) { if (allRows.isEmpty()) {
return; return;
} }
AsyncTable<?> table = getConnection().getTable(tableName); AsyncTable<?> table = getConnection().getTable(tableName);
List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList()); List<Future<?>> futures = new ArrayList<>();
for (List<Row> rows : allRows) {
List<List<Row>> batchRows;
if (rows.size() > batchRowSizeThreshold) {
batchRows = Lists.partition(rows, batchRowSizeThreshold);
} else {
batchRows = Collections.singletonList(rows);
}
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
}
for (Future<?> future : futures) { for (Future<?> future : futures) {
try { try {
FutureUtils.get(future); FutureUtils.get(future);

View File

@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -80,8 +81,8 @@ public class TestMultiLogThreshold {
final TableName tableName = TableName.valueOf("tableName"); final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration(); CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM); TEST_UTIL.createTable(tableName, TEST_FAM);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
@Category({ReplicationTests.class, MediumTests.class}) @Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink { public class TestReplicationSink {
@ClassRule @ClassRule
@ -127,9 +126,8 @@ public class TestReplicationSink {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName()); TestSourceFSConfigurationProvider.class.getCanonicalName());
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
@ -202,6 +200,40 @@ public class TestReplicationSink {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
} }
@Test
public void testLargeEditsPutDelete() throws Exception {
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 5510; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
ResultScanner resultScanner = table1.getScanner(new Scan());
int totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5510, totalRows);
entries = new ArrayList<>();
cells = new ArrayList<>();
for (int i = 0; i < 11000; i++) {
entries.add(
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
resultScanner = table1.getScanner(new Scan());
totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5500, totalRows);
}
/** /**
* Insert to 2 different tables * Insert to 2 different tables
* @throws Exception * @throws Exception
@ -220,7 +252,11 @@ public class TestReplicationSink {
Scan scan = new Scan(); Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan); ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) { for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); assertEquals(0, Bytes.toInt(res.getRow()) % 2);
}
scanRes = table1.getScanner(scan);
for(Result res : scanRes) {
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
} }
} }

View File

@ -111,7 +111,7 @@ public class TestWALEntrySinkFilter {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries. // Create some dumb walentries.
List<AdminProtos.WALEntry> entries = new ArrayList<>(); List<AdminProtos.WALEntry> entries = new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();