diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 15794c4310d..2b337ecc395 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -148,6 +148,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; @@ -1107,11 +1108,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } status.setStatus("Initializaing region replication sink"); - regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> { - rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()), - FlushLifeCycleTracker.DUMMY); - }, rss.getAsyncClusterConnection())); - + regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, + rss.getRegionReplicationBufferManager(), () -> rss.getFlushRequester().requestFlush(this, + new ArrayList<>(td.getColumnFamilyNames()), FlushLifeCycleTracker.DUMMY), + rss.getAsyncClusterConnection())); } /** @@ -2494,7 +2494,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean isCompactionNeeded(); } - FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker, + public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException { List families = null; if (flushAllStores) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8ebc2f89e7b..16e88fcafe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -461,6 +462,7 @@ public class HRegionServer extends HBaseServerBase // A timer to shutdown the process if abort takes too long private Timer abortMonitor; + private RegionReplicationBufferManager regionReplicationBufferManager; /** * Starts a HRegionServer at the default location. *

@@ -645,6 +647,7 @@ public class HRegionServer extends HBaseServerBase initializeZooKeeper(); setupClusterConnection(); bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); + regionReplicationBufferManager = new RegionReplicationBufferManager(this); // Setup RPC client for master communication this.rpcClient = asyncClusterConnection.getRpcClient(); } catch (Throwable t) { @@ -883,7 +886,7 @@ public class HRegionServer extends HBaseServerBase closeUserRegions(abortRequested.get()); LOG.info("stopping server " + this.serverName); } - + regionReplicationBufferManager.stop(); closeClusterConnection(); // Closing the compactSplit thread before closing meta regions if (!this.killed && containsMetaTableRegions()) { @@ -3547,4 +3550,9 @@ public class HRegionServer extends HBaseServerBase shutdownChore(slowLogTableOpsChore); shutdownChore(brokenStoreFileCleaner); } + + @Override + public RegionReplicationBufferManager getRegionReplicationBufferManager() { + return regionReplicationBufferManager; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index fce8df17264..ec27fb2bb35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionSizeStore; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -317,4 +318,15 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo * @return {@link ZKPermissionWatcher} */ ZKPermissionWatcher getZKPermissionWatcher(); + + RegionReplicationBufferManager getRegionReplicationBufferManager(); + + @Override + HRegion getRegion(String encodedRegionName); + + @Override + List getRegions(TableName tableName) throws IOException; + + @Override + List getRegions(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java new file mode 100644 index 00000000000..bda3cb42b33 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Manager the buffer size for all {@link RegionReplicationSink}. + *

+ * If the buffer size exceeds the soft limit, we will find out the region with largest pending size + * and trigger a flush, so it can drop all the pending entries and save memories. + *

+ * If the buffer size exceeds the hard limit, we will return {@code false} for + * {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately. + */ +@InterfaceAudience.Private +public class RegionReplicationBufferManager { + + private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class); + + /** + * This is the total size of pending entries for all the sinks. + */ + public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size"; + + public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024; + + public static final String SOFT_LIMIT_PERCENTAGE = + "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage"; + + public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f; + + private final RegionServerServices rsServices; + + private final long maxPendingSize; + + private final long softMaxPendingSize; + + private final AtomicLong pendingSize = new AtomicLong(); + + private final ThreadPoolExecutor executor; + + public RegionReplicationBufferManager(RegionServerServices rsServices) { + this.rsServices = rsServices; + Configuration conf = rsServices.getConfiguration(); + this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT); + this.softMaxPendingSize = + (long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize); + this.executor = new ThreadPoolExecutor( + 1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(), + (r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one")); + executor.allowCoreThreadTimeOut(true); + } + + private void flush() { + long max = Long.MIN_VALUE; + HRegion toFlush = null; + for (HRegion region : rsServices.getRegions()) { + Optional sink = region.getRegionReplicationSink(); + if (sink.isPresent()) { + RegionReplicationSink s = sink.get(); + long p = s.pendingSize(); + if (p > max) { + max = p; + toFlush = region; + } + } + } + if (toFlush != null) { + // here we need to write flush marker out, so we can drop all the pending edits in the region + // replication sink. + try { + LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(), + StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1)); + FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY); + if (!result.isFlushSucceeded()) { + LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(), + result.getResult()); + } + } catch (IOException e) { + LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e); + } + } else { + // usually this should not happen but since the flush operation is async, theoretically it + // could happen. Let's log it, no real harm. + LOG.warn("Can not find a region to flush"); + } + } + + /** + * Return whether we should just drop all the edits, if we have reached the hard limit of max + * pending size. + * @return {@code true} means OK, {@code false} means drop all the edits. + */ + public boolean increase(long size) { + long sz = pendingSize.addAndGet(size); + if (sz > softMaxPendingSize) { + executor.execute(this::flush); + } + return sz <= maxPendingSize; + } + + /** + * Called after you ship the edits out. + */ + public void decrease(long size) { + pendingSize.addAndGet(-size); + } + + public void stop() { + executor.shutdown(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java similarity index 74% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index cdd77e8ea61..9c6f6e281d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.regionserver; +package org.apache.hadoop.hbase.regionserver.regionreplication; import java.io.IOException; import java.util.ArrayDeque; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +61,6 @@ public class RegionReplicationSink { private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class); - public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size"; - - public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024; - public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number"; public static final int RETRIES_NUMBER_DEFAULT = 3; @@ -85,10 +82,13 @@ public class RegionReplicationSink { final ServerCall rpcCall; + final long size; + SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall rpcCall) { this.key = key; this.edit = edit; this.rpcCall = rpcCall; + this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf(); if (rpcCall != null) { // increase the reference count to avoid the rpc framework free the memory before we // actually sending them out. @@ -112,6 +112,11 @@ public class RegionReplicationSink { private final TableDescriptor tableDesc; + // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication. + private final int regionReplication; + + private final RegionReplicationBufferManager manager; + private final Runnable flushRequester; private final AsyncClusterConnection conn; @@ -128,20 +133,24 @@ public class RegionReplicationSink { private final long operationTimeoutNs; + private volatile long pendingSize; + private boolean sending; private boolean stopping; private boolean stopped; - RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, - Runnable flushRequester, AsyncClusterConnection conn) { + public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, + RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) { Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary", primary); - Preconditions.checkArgument(td.getRegionReplication() > 1, - "region replication should be greater than 1 but got %s", td.getRegionReplication()); + this.regionReplication = td.getRegionReplication(); + Preconditions.checkArgument(this.regionReplication > 1, + "region replication should be greater than 1 but got %s", this.regionReplication); this.primary = primary; this.tableDesc = td; + this.manager = manager; this.flushRequester = flushRequester; this.conn = conn; this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT); @@ -153,7 +162,12 @@ public class RegionReplicationSink { private void onComplete(List sent, Map> replica2Error) { - sent.forEach(SinkEntry::replicated); + long toReleaseSize = 0; + for (SinkEntry entry : sent) { + entry.replicated(); + toReleaseSize += entry.size; + } + manager.decrease(toReleaseSize); Set failed = new HashSet<>(); for (Map.Entry> entry : replica2Error.entrySet()) { Integer replicaId = entry.getKey(); @@ -165,6 +179,7 @@ public class RegionReplicationSink { } } synchronized (entries) { + pendingSize -= toReleaseSize; if (!failed.isEmpty()) { failedReplicas.addAll(failed); flushRequester.run(); @@ -190,7 +205,7 @@ public class RegionReplicationSink { } toSend.add(entry); } - int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size(); + int toSendReplicaCount = regionReplication - 1 - failedReplicas.size(); if (toSendReplicaCount <= 0) { return; } @@ -199,7 +214,7 @@ public class RegionReplicationSink { toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList()); AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); Map> replica2Error = new HashMap<>(); - for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) { + for (int replicaId = 1; replicaId < regionReplication; replicaId++) { MutableObject error = new MutableObject<>(); replica2Error.put(replicaId, error); RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); @@ -223,6 +238,17 @@ public class RegionReplicationSink { return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); } + private void clearAllEntries() { + long toClearSize = 0; + for (SinkEntry entry : entries) { + toClearSize += entry.size; + entry.replicated(); + } + entries.clear(); + pendingSize -= toClearSize; + manager.decrease(toClearSize); + } + /** * Add this edit to replication queue. *

@@ -251,31 +277,67 @@ public class RegionReplicationSink { continue; } if (flushDesc != null && flushAllStores(flushDesc)) { - LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" + - " replication entries", failedReplicas, entries.size()); - entries.clear(); + int toClearCount = 0; + long toClearSize = 0; + for (;;) { + SinkEntry e = entries.peek(); + if (e == null) { + break; + } + if (e.key.getSequenceId() < flushDesc.getFlushSequenceNumber()) { + entries.poll(); + toClearCount++; + toClearSize += e.size; + } else { + break; + } + } failedReplicas.clear(); + LOG.debug( + "Got a flush all request with sequence id {}, clear failed replicas {}" + + " and {} pending entries with size {}", + flushDesc.getFlushSequenceNumber(), failedReplicas, toClearCount, + StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1)); } } } } - // TODO: limit the total cached entries here, and we should have a global limitation, not for - // only this region. - entries.add(new SinkEntry(key, edit, rpcCall)); - if (!sending) { - send(); + if (failedReplicas.size() == regionReplication - 1) { + // this means we have marked all the replicas as failed, so just give up here + return; + } + SinkEntry entry = new SinkEntry(key, edit, rpcCall); + entries.add(entry); + pendingSize += entry.size; + if (manager.increase(entry.size)) { + if (!sending) { + send(); + } + } else { + // we have run out of the max pending size, drop all the edits, and mark all replicas as + // failed + clearAllEntries(); + for (int replicaId = 1; replicaId < regionReplication; replicaId++) { + failedReplicas.add(replicaId); + } + flushRequester.run(); } } } + long pendingSize() { + return pendingSize; + } + /** * Stop the replication sink. *

* Usually this should only be called when you want to close a region. */ - void stop() { + public void stop() { synchronized (entries) { stopping = true; + clearAllEntries(); if (!sending) { stopped = true; entries.notifyAll(); @@ -291,7 +353,7 @@ public class RegionReplicationSink { *

* This is used to keep the replicating order the same with the WAL edit order when writing. */ - void waitUntilStopped() throws InterruptedException { + public void waitUntilStopped() throws InterruptedException { synchronized (entries) { while (!stopped) { entries.wait(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 15be95eda2f..2076dd4fb35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; -import org.apache.hadoop.hbase.regionserver.RegionReplicationSink; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 82f7a85df39..ebe6edd73c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -47,13 +47,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; */ public class MockRegionServerServices implements RegionServerServices { protected static final Logger LOG = LoggerFactory.getLogger(MockRegionServerServices.class); - private final Map regions = new HashMap<>(); + private final Map regions = new HashMap<>(); private final ConcurrentSkipListMap rit = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); private HFileSystem hfs = null; @@ -108,17 +108,17 @@ public class MockRegionServerServices implements RegionServerServices { } @Override - public Region getRegion(String encodedRegionName) { + public HRegion getRegion(String encodedRegionName) { return this.regions.get(encodedRegionName); } @Override - public List getRegions(TableName tableName) throws IOException { + public List getRegions(TableName tableName) throws IOException { return null; } @Override - public List getRegions() { + public List getRegions() { return null; } @@ -379,4 +379,9 @@ public class MockRegionServerServices implements RegionServerServices { public AsyncClusterConnection getAsyncClusterConnection() { return null; } + + @Override + public RegionReplicationBufferManager getRegionReplicationBufferManager() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 69a7a79644e..56813af6b60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -61,17 +61,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.LeaseManager; import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -138,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; @@ -463,7 +463,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, } @Override - public List getRegions() { + public List getRegions() { return null; } @@ -527,7 +527,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, } @Override - public List getRegions(TableName tableName) throws IOException { + public List getRegions(TableName tableName) throws IOException { return null; } @@ -750,4 +750,9 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, public AsyncClusterConnection getAsyncClusterConnection() { return null; } + + @Override + public RegionReplicationBufferManager getRegionReplicationBufferManager() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java new file mode 100644 index 00000000000..8b56d09de18 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRegionReplicationBufferManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionReplicationBufferManager.class); + + private Configuration conf; + + private RegionServerServices rsServices; + + private RegionReplicationBufferManager manager; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + rsServices = mock(RegionServerServices.class); + when(rsServices.getConfiguration()).thenReturn(conf); + } + + @After + public void tearDown() { + if (manager != null) { + manager.stop(); + } + } + + private HRegion mockRegion(RegionInfo regionInfo, long pendingSize) throws IOException { + HRegion region = mock(HRegion.class); + when(region.getRegionInfo()).thenReturn(regionInfo); + if (pendingSize < 0) { + when(region.getRegionReplicationSink()).thenReturn(Optional.empty()); + } else { + RegionReplicationSink sink = mock(RegionReplicationSink.class); + when(sink.pendingSize()).thenReturn(pendingSize); + when(region.getRegionReplicationSink()).thenReturn(Optional.of(sink)); + } + return region; + } + + @Test + public void testScheduleFlush() throws IOException, InterruptedException { + conf.setLong(RegionReplicationBufferManager.MAX_PENDING_SIZE, 1024 * 1024); + manager = new RegionReplicationBufferManager(rsServices); + RegionInfo info1 = RegionInfoBuilder.newBuilder(TableName.valueOf("info1")).build(); + RegionInfo info2 = RegionInfoBuilder.newBuilder(TableName.valueOf("info2")).build(); + HRegion region1 = mockRegion(info1, 1000); + HRegion region2 = mockRegion(info2, 10000); + when(rsServices.getRegions()).thenReturn(Arrays.asList(region1, region2)); + CountDownLatch arrive = new CountDownLatch(1); + CountDownLatch resume = new CountDownLatch(1); + when(region2.flushcache(anyBoolean(), anyBoolean(), any())).then(i -> { + arrive.countDown(); + resume.await(); + FlushResultImpl result = mock(FlushResultImpl.class); + when(result.isFlushSucceeded()).thenReturn(true); + return result; + }); + // hit the soft limit, should trigger a flush + assertTrue(manager.increase(1000 * 1024)); + arrive.await(); + + // we should have called getRegions once to find the region to flush + verify(rsServices, times(1)).getRegions(); + + // hit the hard limit, but since the background thread is running as we haven't call the + // resume.countDown yet, the schedule of the new background flush task should be discard + // silently. + assertFalse(manager.increase(100 * 1024)); + resume.countDown(); + + // wait several seconds and then check the getRegions call, we should not call it second time + Thread.sleep(2000); + verify(rsServices, times(1)).getRegions(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java new file mode 100644 index 00000000000..19b16982d1d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.ipc.ServerCall; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRegionReplicationSink { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionReplicationSink.class); + + private Configuration conf; + + private TableDescriptor td; + + private RegionInfo primary; + + private Runnable flushRequester; + + private AsyncClusterConnection conn; + + private RegionReplicationBufferManager manager; + + @Rule + public final TableNameTestRule name = new TableNameTestRule(); + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + td = TableDescriptorBuilder.newBuilder(name.getTableName()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build(); + primary = RegionInfoBuilder.newBuilder(name.getTableName()).build(); + flushRequester = mock(Runnable.class); + conn = mock(AsyncClusterConnection.class); + manager = mock(RegionReplicationBufferManager.class); + } + + private RegionReplicationSink create() { + return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn); + } + + @Test + public void testNormal() { + RegionReplicationSink sink = create(); + MutableInt next = new MutableInt(0); + List> futures = + Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + ServerCall rpcCall = mock(ServerCall.class); + WALKeyImpl key = mock(WALKeyImpl.class); + when(key.estimatedSerializedSizeOf()).thenReturn(100L); + WALEdit edit = mock(WALEdit.class); + when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); + when(manager.increase(anyLong())).thenReturn(true); + + sink.add(key, edit, rpcCall); + // should call increase on manager + verify(manager, times(1)).increase(anyLong()); + // should have been retained + verify(rpcCall, times(1)).retainByWAL(); + assertEquals(1100, sink.pendingSize()); + + futures.get(0).complete(null); + // should not call decrease yet + verify(manager, never()).decrease(anyLong()); + // should not call release yet + verify(rpcCall, never()).releaseByWAL(); + assertEquals(1100, sink.pendingSize()); + + futures.get(1).complete(null); + // should call decrease + verify(manager, times(1)).decrease(anyLong()); + // should call release + verify(rpcCall, times(1)).releaseByWAL(); + assertEquals(0, sink.pendingSize()); + } + + @Test + public void testDropEdits() { + RegionReplicationSink sink = create(); + MutableInt next = new MutableInt(0); + List> futures = + Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + ServerCall rpcCall1 = mock(ServerCall.class); + WALKeyImpl key1 = mock(WALKeyImpl.class); + when(key1.estimatedSerializedSizeOf()).thenReturn(100L); + WALEdit edit1 = mock(WALEdit.class); + when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); + when(manager.increase(anyLong())).thenReturn(true); + + sink.add(key1, edit1, rpcCall1); + verify(manager, times(1)).increase(anyLong()); + verify(manager, never()).decrease(anyLong()); + verify(rpcCall1, times(1)).retainByWAL(); + assertEquals(1100, sink.pendingSize()); + + ServerCall rpcCall2 = mock(ServerCall.class); + WALKeyImpl key2 = mock(WALKeyImpl.class); + when(key2.estimatedSerializedSizeOf()).thenReturn(200L); + WALEdit edit2 = mock(WALEdit.class); + when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); + + sink.add(key2, edit2, rpcCall2); + verify(manager, times(2)).increase(anyLong()); + verify(manager, never()).decrease(anyLong()); + verify(rpcCall2, times(1)).retainByWAL(); + assertEquals(3300, sink.pendingSize()); + + ServerCall rpcCall3 = mock(ServerCall.class); + WALKeyImpl key3 = mock(WALKeyImpl.class); + when(key3.estimatedSerializedSizeOf()).thenReturn(200L); + WALEdit edit3 = mock(WALEdit.class); + when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L); + when(manager.increase(anyLong())).thenReturn(false); + + // should not buffer this edit + sink.add(key3, edit3, rpcCall3); + verify(manager, times(3)).increase(anyLong()); + verify(manager, times(1)).decrease(anyLong()); + // should retain and then release immediately + verify(rpcCall3, times(1)).retainByWAL(); + verify(rpcCall3, times(1)).releaseByWAL(); + // should also clear the pending edit + verify(rpcCall2, times(1)).releaseByWAL(); + assertEquals(1100, sink.pendingSize()); + // should have request flush + verify(flushRequester, times(1)).run(); + + // finish the replication for first edit, we should decrease the size, release the rpc call,and + // the pendingSize should be 0 as there are no pending entries + futures.forEach(f -> f.complete(null)); + verify(manager, times(2)).decrease(anyLong()); + verify(rpcCall1, times(1)).releaseByWAL(); + assertEquals(0, sink.pendingSize()); + + // should only call replicate 2 times for replicating the first edit, as we have 2 secondary + // replicas + verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java index b501ab27fdf..a4da6407b51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java @@ -86,11 +86,6 @@ public class TestMetaRegionReplicaReplication { @Before public void before() throws Exception { Configuration conf = HTU.getConfiguration(); - conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); - conf.setInt("replication.source.size.capacity", 10240); - conf.setLong("replication.source.sleepforretries", 100); - conf.setInt("hbase.regionserver.maxlogs", 10); - conf.setLong("hbase.master.logcleaner.ttl", 10); conf.setInt("zookeeper.recovery.retry", 1); conf.setInt("zookeeper.recovery.retry.intervalmill", 10); conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);