HBASE-26413 Limit the total size of buffered region replication entries (#3844)
Signed-off-by: GeorryHuang <huangzhuoyue@apache.org>
This commit is contained in:
parent
2ee18988e6
commit
0b29a7934a
|
@ -148,6 +148,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
|
import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
||||||
|
@ -1107,11 +1108,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
status.setStatus("Initializaing region replication sink");
|
status.setStatus("Initializaing region replication sink");
|
||||||
regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
|
regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td,
|
||||||
rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
|
rss.getRegionReplicationBufferManager(), () -> rss.getFlushRequester().requestFlush(this,
|
||||||
FlushLifeCycleTracker.DUMMY);
|
new ArrayList<>(td.getColumnFamilyNames()), FlushLifeCycleTracker.DUMMY),
|
||||||
}, rss.getAsyncClusterConnection()));
|
rss.getAsyncClusterConnection()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2494,7 +2494,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
boolean isCompactionNeeded();
|
boolean isCompactionNeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
|
public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
|
||||||
FlushLifeCycleTracker tracker) throws IOException {
|
FlushLifeCycleTracker tracker) throws IOException {
|
||||||
List<byte[]> families = null;
|
List<byte[]> families = null;
|
||||||
if (flushAllStores) {
|
if (flushAllStores) {
|
||||||
|
|
|
@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
|
import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
|
||||||
import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
|
import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||||
|
@ -461,6 +462,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
||||||
// A timer to shutdown the process if abort takes too long
|
// A timer to shutdown the process if abort takes too long
|
||||||
private Timer abortMonitor;
|
private Timer abortMonitor;
|
||||||
|
|
||||||
|
private RegionReplicationBufferManager regionReplicationBufferManager;
|
||||||
/**
|
/**
|
||||||
* Starts a HRegionServer at the default location.
|
* Starts a HRegionServer at the default location.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
@ -645,6 +647,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
||||||
initializeZooKeeper();
|
initializeZooKeeper();
|
||||||
setupClusterConnection();
|
setupClusterConnection();
|
||||||
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
|
bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
|
||||||
|
regionReplicationBufferManager = new RegionReplicationBufferManager(this);
|
||||||
// Setup RPC client for master communication
|
// Setup RPC client for master communication
|
||||||
this.rpcClient = asyncClusterConnection.getRpcClient();
|
this.rpcClient = asyncClusterConnection.getRpcClient();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -883,7 +886,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
||||||
closeUserRegions(abortRequested.get());
|
closeUserRegions(abortRequested.get());
|
||||||
LOG.info("stopping server " + this.serverName);
|
LOG.info("stopping server " + this.serverName);
|
||||||
}
|
}
|
||||||
|
regionReplicationBufferManager.stop();
|
||||||
closeClusterConnection();
|
closeClusterConnection();
|
||||||
// Closing the compactSplit thread before closing meta regions
|
// Closing the compactSplit thread before closing meta regions
|
||||||
if (!this.killed && containsMetaTableRegions()) {
|
if (!this.killed && containsMetaTableRegions()) {
|
||||||
|
@ -3547,4 +3550,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
||||||
shutdownChore(slowLogTableOpsChore);
|
shutdownChore(slowLogTableOpsChore);
|
||||||
shutdownChore(brokenStoreFileCleaner);
|
shutdownChore(brokenStoreFileCleaner);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
|
||||||
|
return regionReplicationBufferManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||||
|
@ -317,4 +318,15 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
|
||||||
* @return {@link ZKPermissionWatcher}
|
* @return {@link ZKPermissionWatcher}
|
||||||
*/
|
*/
|
||||||
ZKPermissionWatcher getZKPermissionWatcher();
|
ZKPermissionWatcher getZKPermissionWatcher();
|
||||||
|
|
||||||
|
RegionReplicationBufferManager getRegionReplicationBufferManager();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
HRegion getRegion(String encodedRegionName);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
List<HRegion> getRegions(TableName tableName) throws IOException;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
List<HRegion> getRegions();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.regionreplication;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manager the buffer size for all {@link RegionReplicationSink}.
|
||||||
|
* <p/>
|
||||||
|
* If the buffer size exceeds the soft limit, we will find out the region with largest pending size
|
||||||
|
* and trigger a flush, so it can drop all the pending entries and save memories.
|
||||||
|
* <p/>
|
||||||
|
* If the buffer size exceeds the hard limit, we will return {@code false} for
|
||||||
|
* {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RegionReplicationBufferManager {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the total size of pending entries for all the sinks.
|
||||||
|
*/
|
||||||
|
public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
|
||||||
|
|
||||||
|
public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024;
|
||||||
|
|
||||||
|
public static final String SOFT_LIMIT_PERCENTAGE =
|
||||||
|
"hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage";
|
||||||
|
|
||||||
|
public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f;
|
||||||
|
|
||||||
|
private final RegionServerServices rsServices;
|
||||||
|
|
||||||
|
private final long maxPendingSize;
|
||||||
|
|
||||||
|
private final long softMaxPendingSize;
|
||||||
|
|
||||||
|
private final AtomicLong pendingSize = new AtomicLong();
|
||||||
|
|
||||||
|
private final ThreadPoolExecutor executor;
|
||||||
|
|
||||||
|
public RegionReplicationBufferManager(RegionServerServices rsServices) {
|
||||||
|
this.rsServices = rsServices;
|
||||||
|
Configuration conf = rsServices.getConfiguration();
|
||||||
|
this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT);
|
||||||
|
this.softMaxPendingSize =
|
||||||
|
(long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize);
|
||||||
|
this.executor = new ThreadPoolExecutor(
|
||||||
|
1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder()
|
||||||
|
.setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(),
|
||||||
|
(r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one"));
|
||||||
|
executor.allowCoreThreadTimeOut(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void flush() {
|
||||||
|
long max = Long.MIN_VALUE;
|
||||||
|
HRegion toFlush = null;
|
||||||
|
for (HRegion region : rsServices.getRegions()) {
|
||||||
|
Optional<RegionReplicationSink> sink = region.getRegionReplicationSink();
|
||||||
|
if (sink.isPresent()) {
|
||||||
|
RegionReplicationSink s = sink.get();
|
||||||
|
long p = s.pendingSize();
|
||||||
|
if (p > max) {
|
||||||
|
max = p;
|
||||||
|
toFlush = region;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (toFlush != null) {
|
||||||
|
// here we need to write flush marker out, so we can drop all the pending edits in the region
|
||||||
|
// replication sink.
|
||||||
|
try {
|
||||||
|
LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(),
|
||||||
|
StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1));
|
||||||
|
FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
|
||||||
|
if (!result.isFlushSucceeded()) {
|
||||||
|
LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(),
|
||||||
|
result.getResult());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// usually this should not happen but since the flush operation is async, theoretically it
|
||||||
|
// could happen. Let's log it, no real harm.
|
||||||
|
LOG.warn("Can not find a region to flush");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return whether we should just drop all the edits, if we have reached the hard limit of max
|
||||||
|
* pending size.
|
||||||
|
* @return {@code true} means OK, {@code false} means drop all the edits.
|
||||||
|
*/
|
||||||
|
public boolean increase(long size) {
|
||||||
|
long sz = pendingSize.addAndGet(size);
|
||||||
|
if (sz > softMaxPendingSize) {
|
||||||
|
executor.execute(this::flush);
|
||||||
|
}
|
||||||
|
return sz <= maxPendingSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after you ship the edits out.
|
||||||
|
*/
|
||||||
|
public void decrease(long size) {
|
||||||
|
pendingSize.addAndGet(-size);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,7 +15,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver.regionreplication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -60,10 +61,6 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
|
||||||
|
|
||||||
public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
|
|
||||||
|
|
||||||
public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
|
|
||||||
|
|
||||||
public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
|
public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
|
||||||
|
|
||||||
public static final int RETRIES_NUMBER_DEFAULT = 3;
|
public static final int RETRIES_NUMBER_DEFAULT = 3;
|
||||||
|
@ -85,10 +82,13 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
final ServerCall<?> rpcCall;
|
final ServerCall<?> rpcCall;
|
||||||
|
|
||||||
|
final long size;
|
||||||
|
|
||||||
SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
|
SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.edit = edit;
|
this.edit = edit;
|
||||||
this.rpcCall = rpcCall;
|
this.rpcCall = rpcCall;
|
||||||
|
this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
|
||||||
if (rpcCall != null) {
|
if (rpcCall != null) {
|
||||||
// increase the reference count to avoid the rpc framework free the memory before we
|
// increase the reference count to avoid the rpc framework free the memory before we
|
||||||
// actually sending them out.
|
// actually sending them out.
|
||||||
|
@ -112,6 +112,11 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private final TableDescriptor tableDesc;
|
private final TableDescriptor tableDesc;
|
||||||
|
|
||||||
|
// store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication.
|
||||||
|
private final int regionReplication;
|
||||||
|
|
||||||
|
private final RegionReplicationBufferManager manager;
|
||||||
|
|
||||||
private final Runnable flushRequester;
|
private final Runnable flushRequester;
|
||||||
|
|
||||||
private final AsyncClusterConnection conn;
|
private final AsyncClusterConnection conn;
|
||||||
|
@ -128,20 +133,24 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private final long operationTimeoutNs;
|
private final long operationTimeoutNs;
|
||||||
|
|
||||||
|
private volatile long pendingSize;
|
||||||
|
|
||||||
private boolean sending;
|
private boolean sending;
|
||||||
|
|
||||||
private boolean stopping;
|
private boolean stopping;
|
||||||
|
|
||||||
private boolean stopped;
|
private boolean stopped;
|
||||||
|
|
||||||
RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
|
public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
|
||||||
Runnable flushRequester, AsyncClusterConnection conn) {
|
RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
|
||||||
Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
|
Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
|
||||||
primary);
|
primary);
|
||||||
Preconditions.checkArgument(td.getRegionReplication() > 1,
|
this.regionReplication = td.getRegionReplication();
|
||||||
"region replication should be greater than 1 but got %s", td.getRegionReplication());
|
Preconditions.checkArgument(this.regionReplication > 1,
|
||||||
|
"region replication should be greater than 1 but got %s", this.regionReplication);
|
||||||
this.primary = primary;
|
this.primary = primary;
|
||||||
this.tableDesc = td;
|
this.tableDesc = td;
|
||||||
|
this.manager = manager;
|
||||||
this.flushRequester = flushRequester;
|
this.flushRequester = flushRequester;
|
||||||
this.conn = conn;
|
this.conn = conn;
|
||||||
this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
|
this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
|
||||||
|
@ -153,7 +162,12 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private void onComplete(List<SinkEntry> sent,
|
private void onComplete(List<SinkEntry> sent,
|
||||||
Map<Integer, MutableObject<Throwable>> replica2Error) {
|
Map<Integer, MutableObject<Throwable>> replica2Error) {
|
||||||
sent.forEach(SinkEntry::replicated);
|
long toReleaseSize = 0;
|
||||||
|
for (SinkEntry entry : sent) {
|
||||||
|
entry.replicated();
|
||||||
|
toReleaseSize += entry.size;
|
||||||
|
}
|
||||||
|
manager.decrease(toReleaseSize);
|
||||||
Set<Integer> failed = new HashSet<>();
|
Set<Integer> failed = new HashSet<>();
|
||||||
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
|
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
|
||||||
Integer replicaId = entry.getKey();
|
Integer replicaId = entry.getKey();
|
||||||
|
@ -165,6 +179,7 @@ public class RegionReplicationSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
synchronized (entries) {
|
synchronized (entries) {
|
||||||
|
pendingSize -= toReleaseSize;
|
||||||
if (!failed.isEmpty()) {
|
if (!failed.isEmpty()) {
|
||||||
failedReplicas.addAll(failed);
|
failedReplicas.addAll(failed);
|
||||||
flushRequester.run();
|
flushRequester.run();
|
||||||
|
@ -190,7 +205,7 @@ public class RegionReplicationSink {
|
||||||
}
|
}
|
||||||
toSend.add(entry);
|
toSend.add(entry);
|
||||||
}
|
}
|
||||||
int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
|
int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
|
||||||
if (toSendReplicaCount <= 0) {
|
if (toSendReplicaCount <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -199,7 +214,7 @@ public class RegionReplicationSink {
|
||||||
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
|
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
|
||||||
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
|
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
|
||||||
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
|
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
|
||||||
for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
|
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
|
||||||
MutableObject<Throwable> error = new MutableObject<>();
|
MutableObject<Throwable> error = new MutableObject<>();
|
||||||
replica2Error.put(replicaId, error);
|
replica2Error.put(replicaId, error);
|
||||||
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
|
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
|
||||||
|
@ -223,6 +238,17 @@ public class RegionReplicationSink {
|
||||||
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
|
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void clearAllEntries() {
|
||||||
|
long toClearSize = 0;
|
||||||
|
for (SinkEntry entry : entries) {
|
||||||
|
toClearSize += entry.size;
|
||||||
|
entry.replicated();
|
||||||
|
}
|
||||||
|
entries.clear();
|
||||||
|
pendingSize -= toClearSize;
|
||||||
|
manager.decrease(toClearSize);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add this edit to replication queue.
|
* Add this edit to replication queue.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
@ -251,31 +277,67 @@ public class RegionReplicationSink {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (flushDesc != null && flushAllStores(flushDesc)) {
|
if (flushDesc != null && flushAllStores(flushDesc)) {
|
||||||
LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
|
int toClearCount = 0;
|
||||||
" replication entries", failedReplicas, entries.size());
|
long toClearSize = 0;
|
||||||
entries.clear();
|
for (;;) {
|
||||||
|
SinkEntry e = entries.peek();
|
||||||
|
if (e == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (e.key.getSequenceId() < flushDesc.getFlushSequenceNumber()) {
|
||||||
|
entries.poll();
|
||||||
|
toClearCount++;
|
||||||
|
toClearSize += e.size;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
failedReplicas.clear();
|
failedReplicas.clear();
|
||||||
|
LOG.debug(
|
||||||
|
"Got a flush all request with sequence id {}, clear failed replicas {}" +
|
||||||
|
" and {} pending entries with size {}",
|
||||||
|
flushDesc.getFlushSequenceNumber(), failedReplicas, toClearCount,
|
||||||
|
StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: limit the total cached entries here, and we should have a global limitation, not for
|
if (failedReplicas.size() == regionReplication - 1) {
|
||||||
// only this region.
|
// this means we have marked all the replicas as failed, so just give up here
|
||||||
entries.add(new SinkEntry(key, edit, rpcCall));
|
return;
|
||||||
if (!sending) {
|
}
|
||||||
send();
|
SinkEntry entry = new SinkEntry(key, edit, rpcCall);
|
||||||
|
entries.add(entry);
|
||||||
|
pendingSize += entry.size;
|
||||||
|
if (manager.increase(entry.size)) {
|
||||||
|
if (!sending) {
|
||||||
|
send();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// we have run out of the max pending size, drop all the edits, and mark all replicas as
|
||||||
|
// failed
|
||||||
|
clearAllEntries();
|
||||||
|
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
|
||||||
|
failedReplicas.add(replicaId);
|
||||||
|
}
|
||||||
|
flushRequester.run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long pendingSize() {
|
||||||
|
return pendingSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the replication sink.
|
* Stop the replication sink.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Usually this should only be called when you want to close a region.
|
* Usually this should only be called when you want to close a region.
|
||||||
*/
|
*/
|
||||||
void stop() {
|
public void stop() {
|
||||||
synchronized (entries) {
|
synchronized (entries) {
|
||||||
stopping = true;
|
stopping = true;
|
||||||
|
clearAllEntries();
|
||||||
if (!sending) {
|
if (!sending) {
|
||||||
stopped = true;
|
stopped = true;
|
||||||
entries.notifyAll();
|
entries.notifyAll();
|
||||||
|
@ -291,7 +353,7 @@ public class RegionReplicationSink {
|
||||||
* <p/>
|
* <p/>
|
||||||
* This is used to keep the replicating order the same with the WAL edit order when writing.
|
* This is used to keep the replicating order the same with the WAL edit order when writing.
|
||||||
*/
|
*/
|
||||||
void waitUntilStopped() throws InterruptedException {
|
public void waitUntilStopped() throws InterruptedException {
|
||||||
synchronized (entries) {
|
synchronized (entries) {
|
||||||
while (!stopped) {
|
while (!stopped) {
|
||||||
entries.wait();
|
entries.wait();
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
|
|
@ -47,13 +47,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.LeaseManager;
|
import org.apache.hadoop.hbase.regionserver.LeaseManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
||||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||||
|
@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
*/
|
*/
|
||||||
public class MockRegionServerServices implements RegionServerServices {
|
public class MockRegionServerServices implements RegionServerServices {
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(MockRegionServerServices.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(MockRegionServerServices.class);
|
||||||
private final Map<String, Region> regions = new HashMap<>();
|
private final Map<String, HRegion> regions = new HashMap<>();
|
||||||
private final ConcurrentSkipListMap<byte[], Boolean> rit =
|
private final ConcurrentSkipListMap<byte[], Boolean> rit =
|
||||||
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
private HFileSystem hfs = null;
|
private HFileSystem hfs = null;
|
||||||
|
@ -108,17 +108,17 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Region getRegion(String encodedRegionName) {
|
public HRegion getRegion(String encodedRegionName) {
|
||||||
return this.regions.get(encodedRegionName);
|
return this.regions.get(encodedRegionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Region> getRegions(TableName tableName) throws IOException {
|
public List<HRegion> getRegions(TableName tableName) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Region> getRegions() {
|
public List<HRegion> getRegions() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,4 +379,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
public AsyncClusterConnection getAsyncClusterConnection() {
|
public AsyncClusterConnection getAsyncClusterConnection() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,17 +61,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.LeaseManager;
|
import org.apache.hadoop.hbase.regionserver.LeaseManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
||||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
@ -138,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
||||||
|
|
||||||
|
@ -463,7 +463,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Region> getRegions() {
|
public List<HRegion> getRegions() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,7 +527,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Region> getRegions(TableName tableName) throws IOException {
|
public List<HRegion> getRegions(TableName tableName) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,4 +750,9 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
|
||||||
public AsyncClusterConnection getAsyncClusterConnection() {
|
public AsyncClusterConnection getAsyncClusterConnection() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.regionreplication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestRegionReplicationBufferManager {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRegionReplicationBufferManager.class);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
private RegionServerServices rsServices;
|
||||||
|
|
||||||
|
private RegionReplicationBufferManager manager;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
conf = HBaseConfiguration.create();
|
||||||
|
rsServices = mock(RegionServerServices.class);
|
||||||
|
when(rsServices.getConfiguration()).thenReturn(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (manager != null) {
|
||||||
|
manager.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegion mockRegion(RegionInfo regionInfo, long pendingSize) throws IOException {
|
||||||
|
HRegion region = mock(HRegion.class);
|
||||||
|
when(region.getRegionInfo()).thenReturn(regionInfo);
|
||||||
|
if (pendingSize < 0) {
|
||||||
|
when(region.getRegionReplicationSink()).thenReturn(Optional.empty());
|
||||||
|
} else {
|
||||||
|
RegionReplicationSink sink = mock(RegionReplicationSink.class);
|
||||||
|
when(sink.pendingSize()).thenReturn(pendingSize);
|
||||||
|
when(region.getRegionReplicationSink()).thenReturn(Optional.of(sink));
|
||||||
|
}
|
||||||
|
return region;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleFlush() throws IOException, InterruptedException {
|
||||||
|
conf.setLong(RegionReplicationBufferManager.MAX_PENDING_SIZE, 1024 * 1024);
|
||||||
|
manager = new RegionReplicationBufferManager(rsServices);
|
||||||
|
RegionInfo info1 = RegionInfoBuilder.newBuilder(TableName.valueOf("info1")).build();
|
||||||
|
RegionInfo info2 = RegionInfoBuilder.newBuilder(TableName.valueOf("info2")).build();
|
||||||
|
HRegion region1 = mockRegion(info1, 1000);
|
||||||
|
HRegion region2 = mockRegion(info2, 10000);
|
||||||
|
when(rsServices.getRegions()).thenReturn(Arrays.asList(region1, region2));
|
||||||
|
CountDownLatch arrive = new CountDownLatch(1);
|
||||||
|
CountDownLatch resume = new CountDownLatch(1);
|
||||||
|
when(region2.flushcache(anyBoolean(), anyBoolean(), any())).then(i -> {
|
||||||
|
arrive.countDown();
|
||||||
|
resume.await();
|
||||||
|
FlushResultImpl result = mock(FlushResultImpl.class);
|
||||||
|
when(result.isFlushSucceeded()).thenReturn(true);
|
||||||
|
return result;
|
||||||
|
});
|
||||||
|
// hit the soft limit, should trigger a flush
|
||||||
|
assertTrue(manager.increase(1000 * 1024));
|
||||||
|
arrive.await();
|
||||||
|
|
||||||
|
// we should have called getRegions once to find the region to flush
|
||||||
|
verify(rsServices, times(1)).getRegions();
|
||||||
|
|
||||||
|
// hit the hard limit, but since the background thread is running as we haven't call the
|
||||||
|
// resume.countDown yet, the schedule of the new background flush task should be discard
|
||||||
|
// silently.
|
||||||
|
assertFalse(manager.increase(100 * 1024));
|
||||||
|
resume.countDown();
|
||||||
|
|
||||||
|
// wait several seconds and then check the getRegions call, we should not call it second time
|
||||||
|
Thread.sleep(2000);
|
||||||
|
verify(rsServices, times(1)).getRegions();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,194 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.regionreplication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.ipc.ServerCall;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestRegionReplicationSink {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRegionReplicationSink.class);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
private TableDescriptor td;
|
||||||
|
|
||||||
|
private RegionInfo primary;
|
||||||
|
|
||||||
|
private Runnable flushRequester;
|
||||||
|
|
||||||
|
private AsyncClusterConnection conn;
|
||||||
|
|
||||||
|
private RegionReplicationBufferManager manager;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final TableNameTestRule name = new TableNameTestRule();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
conf = HBaseConfiguration.create();
|
||||||
|
td = TableDescriptorBuilder.newBuilder(name.getTableName())
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
|
||||||
|
primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
|
||||||
|
flushRequester = mock(Runnable.class);
|
||||||
|
conn = mock(AsyncClusterConnection.class);
|
||||||
|
manager = mock(RegionReplicationBufferManager.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RegionReplicationSink create() {
|
||||||
|
return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNormal() {
|
||||||
|
RegionReplicationSink sink = create();
|
||||||
|
MutableInt next = new MutableInt(0);
|
||||||
|
List<CompletableFuture<Void>> futures =
|
||||||
|
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
||||||
|
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||||
|
.then(i -> futures.get(next.getAndIncrement()));
|
||||||
|
ServerCall<?> rpcCall = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key = mock(WALKeyImpl.class);
|
||||||
|
when(key.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||||
|
WALEdit edit = mock(WALEdit.class);
|
||||||
|
when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(true);
|
||||||
|
|
||||||
|
sink.add(key, edit, rpcCall);
|
||||||
|
// should call increase on manager
|
||||||
|
verify(manager, times(1)).increase(anyLong());
|
||||||
|
// should have been retained
|
||||||
|
verify(rpcCall, times(1)).retainByWAL();
|
||||||
|
assertEquals(1100, sink.pendingSize());
|
||||||
|
|
||||||
|
futures.get(0).complete(null);
|
||||||
|
// should not call decrease yet
|
||||||
|
verify(manager, never()).decrease(anyLong());
|
||||||
|
// should not call release yet
|
||||||
|
verify(rpcCall, never()).releaseByWAL();
|
||||||
|
assertEquals(1100, sink.pendingSize());
|
||||||
|
|
||||||
|
futures.get(1).complete(null);
|
||||||
|
// should call decrease
|
||||||
|
verify(manager, times(1)).decrease(anyLong());
|
||||||
|
// should call release
|
||||||
|
verify(rpcCall, times(1)).releaseByWAL();
|
||||||
|
assertEquals(0, sink.pendingSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDropEdits() {
|
||||||
|
RegionReplicationSink sink = create();
|
||||||
|
MutableInt next = new MutableInt(0);
|
||||||
|
List<CompletableFuture<Void>> futures =
|
||||||
|
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
||||||
|
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||||
|
.then(i -> futures.get(next.getAndIncrement()));
|
||||||
|
ServerCall<?> rpcCall1 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key1 = mock(WALKeyImpl.class);
|
||||||
|
when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||||
|
WALEdit edit1 = mock(WALEdit.class);
|
||||||
|
when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(true);
|
||||||
|
|
||||||
|
sink.add(key1, edit1, rpcCall1);
|
||||||
|
verify(manager, times(1)).increase(anyLong());
|
||||||
|
verify(manager, never()).decrease(anyLong());
|
||||||
|
verify(rpcCall1, times(1)).retainByWAL();
|
||||||
|
assertEquals(1100, sink.pendingSize());
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall2 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key2 = mock(WALKeyImpl.class);
|
||||||
|
when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
|
||||||
|
WALEdit edit2 = mock(WALEdit.class);
|
||||||
|
when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
|
||||||
|
|
||||||
|
sink.add(key2, edit2, rpcCall2);
|
||||||
|
verify(manager, times(2)).increase(anyLong());
|
||||||
|
verify(manager, never()).decrease(anyLong());
|
||||||
|
verify(rpcCall2, times(1)).retainByWAL();
|
||||||
|
assertEquals(3300, sink.pendingSize());
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall3 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key3 = mock(WALKeyImpl.class);
|
||||||
|
when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
|
||||||
|
WALEdit edit3 = mock(WALEdit.class);
|
||||||
|
when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(false);
|
||||||
|
|
||||||
|
// should not buffer this edit
|
||||||
|
sink.add(key3, edit3, rpcCall3);
|
||||||
|
verify(manager, times(3)).increase(anyLong());
|
||||||
|
verify(manager, times(1)).decrease(anyLong());
|
||||||
|
// should retain and then release immediately
|
||||||
|
verify(rpcCall3, times(1)).retainByWAL();
|
||||||
|
verify(rpcCall3, times(1)).releaseByWAL();
|
||||||
|
// should also clear the pending edit
|
||||||
|
verify(rpcCall2, times(1)).releaseByWAL();
|
||||||
|
assertEquals(1100, sink.pendingSize());
|
||||||
|
// should have request flush
|
||||||
|
verify(flushRequester, times(1)).run();
|
||||||
|
|
||||||
|
// finish the replication for first edit, we should decrease the size, release the rpc call,and
|
||||||
|
// the pendingSize should be 0 as there are no pending entries
|
||||||
|
futures.forEach(f -> f.complete(null));
|
||||||
|
verify(manager, times(2)).decrease(anyLong());
|
||||||
|
verify(rpcCall1, times(1)).releaseByWAL();
|
||||||
|
assertEquals(0, sink.pendingSize());
|
||||||
|
|
||||||
|
// should only call replicate 2 times for replicating the first edit, as we have 2 secondary
|
||||||
|
// replicas
|
||||||
|
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||||
|
}
|
||||||
|
}
|
|
@ -86,11 +86,6 @@ public class TestMetaRegionReplicaReplication {
|
||||||
@Before
|
@Before
|
||||||
public void before() throws Exception {
|
public void before() throws Exception {
|
||||||
Configuration conf = HTU.getConfiguration();
|
Configuration conf = HTU.getConfiguration();
|
||||||
conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
|
|
||||||
conf.setInt("replication.source.size.capacity", 10240);
|
|
||||||
conf.setLong("replication.source.sleepforretries", 100);
|
|
||||||
conf.setInt("hbase.regionserver.maxlogs", 10);
|
|
||||||
conf.setLong("hbase.master.logcleaner.ttl", 10);
|
|
||||||
conf.setInt("zookeeper.recovery.retry", 1);
|
conf.setInt("zookeeper.recovery.retry", 1);
|
||||||
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
||||||
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||||
|
|
Loading…
Reference in New Issue