HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2146)

Closes #2127

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Viraj Jasani 2020-07-28 01:10:02 +05:30 committed by GitHub
parent b154f20c4d
commit dd4417a9e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 29 deletions

View File

@ -1390,6 +1390,16 @@ public final class HConstants {
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
/**
* Number of rows in a batch operation above which a warning will be logged.
*/
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -231,15 +231,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
@ -1128,7 +1119,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
rowSizeWarnThreshold = rs.conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
rejectRowsWithSizeOverThreshold = rs.conf
.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);

View File

@ -245,7 +245,7 @@ public class Replication extends WALActionsListener.Base implements
} catch (ReplicationException e) {
throw new IOException(e);
}
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);

View File

@ -18,10 +18,13 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -40,7 +43,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -90,16 +92,21 @@ public class ReplicationSink {
private long hfilesReplicated = 0;
private SourceFSConfigurationProvider provider;
/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;
/**
* Create a sink for replication
*
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
public ReplicationSink(Configuration conf)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
@ -215,7 +222,7 @@ public class ReplicationSink {
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
@ -380,9 +387,10 @@ public class ReplicationSink {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
* @throws IOException
* @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
throws IOException {
if (allRows.isEmpty()) {
return;
}
@ -391,7 +399,15 @@ public class ReplicationSink {
Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
table.batch(rows);
List<List<Row>> batchRows;
if (rows.size() > batchRowSizeThreshold) {
batchRows = Lists.partition(rows, batchRowSizeThreshold);
} else {
batchRows = Collections.singletonList(rows);
}
for (List<Row> rowList : batchRows) {
table.batch(rowList);
}
}
} catch (RetriesExhaustedWithDetailsException rewde) {
for (Throwable ex : rewde.getCauses()) {

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.junit.Before;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -75,8 +76,8 @@ public class TestMultiLogThreshold {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.security.SecureRandom;
import java.util.ArrayList;
@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@ -76,7 +75,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
@Category(LargeTests.class)
public class TestReplicationSink {
private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
@ -123,10 +122,8 @@ public class TestReplicationSink {
HConstants.REPLICATION_ENABLE_DEFAULT);
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
TEST_UTIL.startMiniCluster(3);
SINK =
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@ -199,6 +196,40 @@ public class TestReplicationSink {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}
@Test
public void testLargeEditsPutDelete() throws Exception {
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 5510; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
ResultScanner resultScanner = table1.getScanner(new Scan());
int totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5510, totalRows);
entries = new ArrayList<>();
cells = new ArrayList<>();
for (int i = 0; i < 11000; i++) {
entries.add(
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
resultScanner = table1.getScanner(new Scan());
totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5500, totalRows);
}
/**
* Insert to 2 different tables
* @throws Exception
@ -217,7 +248,11 @@ public class TestReplicationSink {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
assertEquals(0, Bytes.toInt(res.getRow()) % 2);
}
scanRes = table1.getScanner(scan);
for(Result res : scanRes) {
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}