HBASE-24779 Report on the WAL edit buffer usage/limit for replication

Closes #2193

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Josh Elser 2020-08-07 12:59:17 -04:00
parent cb3dd990f7
commit 303db63b76
12 changed files with 164 additions and 19 deletions

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
/**
* Sets the total usage of memory used by edits in memory read from WALs. The memory represented
* by this usage measure is across peers/sources. For example, we may batch the same WAL edits
* multiple times for the sake of replicating them to multiple peers..
* @param usage The memory used by edits in bytes
*/
void setWALReaderEditsBufferBytes(long usage);
/**
* Returns the size, in bytes, of edits held in memory to be replicated across all peers.
*/
long getWALReaderEditsBufferBytes();
}

View File

@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink(); public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id); public MetricsReplicationSourceSource getSource(String id);
public MetricsReplicationTableSource getTableSource(String tableName); public MetricsReplicationTableSource getTableSource(String tableName);
public MetricsReplicationSourceSource getGlobalSource(); public MetricsReplicationGlobalSourceSource getGlobalSource();
} }

View File

@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ public class MetricsReplicationGlobalSourceSourceImpl
implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source."; private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms; private final MetricsReplicationSourceImpl rms;
@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL; private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms; this.rms = rms;
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@ -92,6 +94,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry() failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
} }
@Override public void setLastShippedAge(long age) { @Override public void setLastShippedAge(long age) {
@ -142,7 +147,6 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
} }
} }
} }
@Override public void incrLogReadInBytes(long size) { @Override public void incrLogReadInBytes(long size) {
logReadInBytesCounter.incr(size); logReadInBytesCounter.incr(size);
} }
@ -275,4 +279,14 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public long getEditsFiltered() { public long getEditsFiltered() {
return this.walEditsFilteredCounter.value(); return this.walEditsFilteredCounter.value();
} }
@Override
public void setWALReaderEditsBufferBytes(long usage) {
this.walReaderBufferUsageBytes.set(usage);
}
@Override
public long getWALReaderEditsBufferBytes() {
return this.walReaderBufferUsageBytes.value();
}
} }

View File

@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
} }
@Override public MetricsReplicationSourceSource getGlobalSource() { @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
} }
} }

View File

@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
@Override public void incrShippedBytes(long size) { @Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size); shippedBytesCounter.incr(size);
MetricsReplicationGlobalSourceSource MetricsReplicationGlobalSourceSourceImpl
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
} }

View File

@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
private long timeStampNextToReplicate; private long timeStampNextToReplicate;
private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource; private final MetricsReplicationGlobalSourceSource globalSourceSource;
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
/** /**
@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource {
* @param globalSourceSource Class to monitor global-scoped metrics * @param globalSourceSource Class to monitor global-scoped metrics
*/ */
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
MetricsReplicationSourceSource globalSourceSource, MetricsReplicationGlobalSourceSource globalSourceSource,
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id; this.id = id;
this.singleSourceSource = singleSourceSource; this.singleSourceSource = singleSourceSource;
@ -465,4 +465,19 @@ public class MetricsSource implements BaseSource {
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable; return singleSourceSourceByTable;
} }
/**
* Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
*/
public void setWALReaderEditsBufferUsage(long usageInBytes) {
globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
}
/**
* Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
* @return
*/
public long getWALReaderEditsBufferUsage() {
return globalSourceSource.getWALReaderEditsBufferBytes();
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod; private int statsThreadPeriod;
// ReplicationLoad to access replication metrics // ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad; private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler; private PeerProcedureHandler peerProcedureHandler;
@ -119,9 +121,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke); throw new IOException("Could not read cluster id", ke);
} }
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
globalMetricsSource);
if (walProvider != null) { if (walProvider != null) {
walProvider walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));

View File

@ -775,7 +775,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize); throttler.addPushSize(batchSize);
} }
totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize); long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
} }
@Override @Override

View File

@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong(); private AtomicLong totalBufferUsed = new AtomicLong();
// Total buffer size on this RegionServer for holding batched edits to be shipped.
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
/** /**
* Creates a replication manager and sets the watch on all the other registered region servers * Creates a replication manager and sets the watch on all the other registered region servers
@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException { WALFileLengthProvider walFileLengthProvider,
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
// CopyOnWriteArrayList is thread-safe. // CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying. // Generally, reading is more than modifying.
this.sources = new ConcurrentHashMap<>(); this.sources = new ConcurrentHashMap<>();
@ -205,6 +209,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.latestPaths = new HashSet<Path>(); this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.globalMetrics = globalMetrics;
} }
/** /**
@ -879,6 +886,14 @@ public class ReplicationSourceManager implements ReplicationListener {
return totalBufferUsed; return totalBufferUsed;
} }
/**
* Returns the maximum size in bytes of edits held in memory which are pending replication
* across all sources inside this RegionServer.
*/
public long getTotalBufferLimit() {
return totalBufferLimit;
}
/** /**
* Get the directory where wals are archived * Get the directory where wals are archived
* @return the directory where wals are archived * @return the directory where wals are archived
@ -916,6 +931,10 @@ public class ReplicationSourceManager implements ReplicationListener {
*/ */
public String getStats() { public String getStats() {
StringBuilder stats = new StringBuilder(); StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) { for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n"); stats.append(source.getStats() + "\n");
@ -941,4 +960,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() { int activeFailoverTaskCount() {
return executor.getActiveCount(); return executor.getActiveCount();
} }
MetricsReplicationGlobalSourceSource getGlobalMetrics() {
return this.globalMetrics;
}
} }

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
// the +1 is for the current thread reading before placing onto the queue // the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1); int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
@ -275,6 +273,8 @@ class ReplicationSourceWALReader extends Thread {
private boolean checkQuota() { private boolean checkQuota() {
// try not to go over total quota // try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) { if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries); Threads.sleep(sleepForRetries);
return false; return false;
} }
@ -403,7 +403,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all * @return true if we should clear buffer and push all
*/ */
private boolean acquireBufferQuota(long size) { private boolean acquireBufferQuota(long size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota; long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed >= totalBufferQuota;
} }
/** /**

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
@ -339,9 +340,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource = MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id); new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = MetricsReplicationGlobalSourceSource globalSourceSource =
new MetricsReplicationGlobalSourceSource(globalRms); new MetricsReplicationGlobalSourceSourceImpl(globalRms);
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
@ -507,6 +508,44 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
} }
/**
* Not used by unit tests, helpful for manual testing with replication.
* <p>
* Snippet for `hbase shell`:
* <pre>
* create 't', 'f'
* add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
* 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
* alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
* </pre>
*/
public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
private long duration;
public SleepingReplicationEndpointForTest() {
super();
}
@Override
public void init(Context context) throws IOException {
super.init(context);
if (this.ctx != null) {
duration = this.ctx.getConfiguration().getLong(
"hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
}
}
@Override
public boolean replicate(ReplicateContext context) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return super.replicate(context);
}
}
public static class InterClusterReplicationEndpointForTest public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint { extends HBaseInterClusterReplicationEndpoint {

View File

@ -371,6 +371,8 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit()).thenReturn(
(long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class); Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class); ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceManager()).thenReturn(mockSourceManager);
@ -378,6 +380,9 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer); when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered); when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
MetricsReplicationGlobalSourceSource.class);
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source; return source;
} }