diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8e82025f0a5..d1f187458a9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1583,6 +1583,16 @@ public final class HConstants { "hbase.regionserver.slowlog.systable.enabled"; public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; + /** + * Number of rows in a batch operation above which a warning will be logged. + */ + public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; + + /** + * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME} + */ + public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d49bf4cba04..dec966ce42d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -294,15 +294,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; - /** - * Number of rows in a batch operation above which a warning will be logged. - */ - static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; - /** - * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} - */ - static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; - /* * Whether to reject rows with size > threshold defined by * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} @@ -1265,7 +1256,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final Configuration conf = rs.getConfiguration(); this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + rowSizeWarnThreshold = conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); rejectRowsWithSizeOverThreshold = conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 82422374cff..4cbce8c7273 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -212,7 +212,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void startReplicationService() throws IOException { this.replicationManager.init(); - this.replicationSink = new ReplicationSink(this.conf, this.server); + this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 18926b7a156..d1ee0220a9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,15 +94,21 @@ public class ReplicationSink { private SourceFSConfigurationProvider provider; private WALEntrySinkFilter walEntrySinkFilter; + /** + * Row size threshold for multi requests above which a warning is logged + */ + private final int rowSizeWarnThreshold; + /** * Create a sink for replication * @param conf conf object - * @param stopper boolean to tell this thread to stop * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf, Stoppable stopper) + public ReplicationSink(Configuration conf) throws IOException { this.conf = HBaseConfiguration.create(conf); + rowSizeWarnThreshold = conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); @@ -211,11 +218,7 @@ public class ReplicationSink { // Map of table name Vs list of pair of family and list of // hfile paths from its namespace Map>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); - } + bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } } else { @@ -250,7 +253,7 @@ public class ReplicationSink { if (!rowMap.isEmpty()) { LOG.debug("Started replicating mutations."); for (Entry, List>> entry : rowMap.entrySet()) { - batch(entry.getKey(), entry.getValue().values()); + batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); } @@ -366,16 +369,8 @@ public class ReplicationSink { */ private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { - Map> innerMap = map.get(key1); - if (innerMap == null) { - innerMap = new HashMap<>(); - map.put(key1, innerMap); - } - List values = innerMap.get(key2); - if (values == null) { - values = new ArrayList<>(); - innerMap.put(key2, values); - } + Map> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); + List values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); values.add(value); return values; } @@ -403,13 +398,24 @@ public class ReplicationSink { * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions + * @param batchRowSizeThreshold rowSize threshold for batch mutation */ - private void batch(TableName tableName, Collection> allRows) throws IOException { + private void batch(TableName tableName, Collection> allRows, int batchRowSizeThreshold) + throws IOException { if (allRows.isEmpty()) { return; } AsyncTable table = getConnection().getTable(tableName); - List> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList()); + List> futures = new ArrayList<>(); + for (List rows : allRows) { + List> batchRows; + if (rows.size() > batchRowSizeThreshold) { + batchRows = Lists.partition(rows, batchRowSizeThreshold); + } else { + batchRows = Collections.singletonList(rows); + } + futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); + } for (Future future : futures) { try { FutureUtils.get(future); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java index 393037a8f99..614b04b8eb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -80,8 +81,8 @@ public class TestMultiLogThreshold { final TableName tableName = TableName.valueOf("tableName"); TEST_UTIL = new HBaseTestingUtility(); CONF = TEST_UTIL.getConfiguration(); - THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, - RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); + THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp); TEST_UTIL.startMiniCluster(); TEST_UTIL.createTable(tableName, TEST_FAM); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 843fe6fd95b..ffe0957a9e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.security.SecureRandom; import java.util.ArrayList; @@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSink { @ClassRule @@ -127,9 +126,8 @@ public class TestReplicationSink { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); - TEST_UTIL.startMiniCluster(3); - SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -202,6 +200,40 @@ public class TestReplicationSink { assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); } + @Test + public void testLargeEditsPutDelete() throws Exception { + List entries = new ArrayList<>(); + List cells = new ArrayList<>(); + for (int i = 0; i < 5510; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + + ResultScanner resultScanner = table1.getScanner(new Scan()); + int totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5510, totalRows); + + entries = new ArrayList<>(); + cells = new ArrayList<>(); + for (int i = 0; i < 11000; i++) { + entries.add( + createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, + cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + resultScanner = table1.getScanner(new Scan()); + totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5500, totalRows); + } + /** * Insert to 2 different tables * @throws Exception @@ -220,7 +252,11 @@ public class TestReplicationSink { Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { - assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); + assertEquals(0, Bytes.toInt(res.getRow()) % 2); + } + scanRes = table1.getScanner(scan); + for(Result res : scanRes) { + assertEquals(1, Bytes.toInt(res.getRow()) % 2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 5e167238c6c..beaa78cd294 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -111,7 +111,7 @@ public class TestWALEntrySinkFilter { IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); - ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); + ReplicationSink sink = new ReplicationSink(conf); // Create some dumb walentries. List entries = new ArrayList<>(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();