HBASE-24779 Report on the WAL edit buffer usage/limit for replication

Closes #2193

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Josh Elser 2020-08-07 12:59:17 -04:00
parent 6ef90aad7d
commit 124af6392c
11 changed files with 388 additions and 243 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -15,239 +15,25 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms; public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
private final MutableHistogram ageOfLastShippedOpHist; /**
private final MutableGaugeLong sizeOfLogQueueGauge; * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
private final MutableFastCounter logReadInEditsCounter; * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
private final MutableFastCounter walEditsFilteredCounter; * multiple times for the sake of replicating them to multiple peers..
private final MutableFastCounter shippedBatchesCounter; * @param usage The memory used by edits in bytes
private final MutableFastCounter shippedOpsCounter; */
private final MutableFastCounter shippedBytesCounter; void setWALReaderEditsBufferBytes(long usage);
private final MutableFastCounter logReadInBytesCounter;
private final MutableFastCounter shippedHFilesCounter;
private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
private final MutableFastCounter unknownFileLengthForClosedWAL;
private final MutableFastCounter uncleanlyClosedWAL;
private final MutableFastCounter uncleanlyClosedSkippedBytes;
private final MutableFastCounter restartWALReading;
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { /**
this.rms = rms; * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
*/
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); long getWALReaderEditsBufferBytes();
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
sizeOfHFileRefsQueueGauge =
rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
.getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
.getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
ageOfLastShippedOpHist.add(age);
}
@Override public void incrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.incr(size);
}
@Override public void decrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.decr(size);
}
@Override public void incrLogReadInEdits(long size) {
logReadInEditsCounter.incr(size);
}
@Override public void incrLogEditsFiltered(long size) {
walEditsFilteredCounter.incr(size);
}
@Override public void incrBatchesShipped(int batches) {
shippedBatchesCounter.incr(batches);
}
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
@Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size);
}
@Override public void incrLogReadInBytes(long size) {
logReadInBytesCounter.incr(size);
}
@Override public void clear() {
}
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpHist.getMax();
}
@Override public void incrHFilesShipped(long hfiles) {
shippedHFilesCounter.incr(hfiles);
}
@Override
public void incrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.incr(size);
}
@Override
public void decrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.decr(size);
}
@Override
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
@Override
public void incrUnknownFileLengthForClosedWAL() {
unknownFileLengthForClosedWAL.incr(1L);
}
@Override
public void incrUncleanlyClosedWALs() {
uncleanlyClosedWAL.incr(1L);
}
@Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes);
}
@Override
public void incrRestartedWALReading() {
restartWALReading.incr(1L);
}
@Override
public void incrRepeatedFileBytes(final long bytes) {
repeatedFileBytes.incr(bytes);
}
@Override
public void incrCompletedWAL() {
completedWAL.incr(1L);
}
@Override
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
@Override
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}
@Override
public void init() {
rms.init();
}
@Override
public void setGauge(String gaugeName, long value) {
rms.setGauge(KEY_PREFIX + gaugeName, value);
}
@Override
public void incGauge(String gaugeName, long delta) {
rms.incGauge(KEY_PREFIX + gaugeName, delta);
}
@Override
public void decGauge(String gaugeName, long delta) {
rms.decGauge(KEY_PREFIX + gaugeName, delta);
}
@Override
public void removeMetric(String key) {
rms.removeMetric(KEY_PREFIX + key);
}
@Override
public void incCounters(String counterName, long delta) {
rms.incCounters(KEY_PREFIX + counterName, delta);
}
@Override
public void updateHistogram(String name, long value) {
rms.updateHistogram(KEY_PREFIX + name, value);
}
@Override
public String getMetricsContext() {
return rms.getMetricsContext();
}
@Override
public String getMetricsDescription() {
return rms.getMetricsDescription();
}
@Override
public String getMetricsJmxContext() {
return rms.getMetricsJmxContext();
}
@Override
public String getMetricsName() {
return rms.getMetricsName();
}
@Override
public long getWALEditsRead() {
return this.logReadInEditsCounter.value();
}
@Override
public long getShippedOps() {
return this.shippedOpsCounter.value();
}
@Override
public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}
} }

View File

@ -0,0 +1,268 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSourceImpl
implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms;
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter;
private final MutableFastCounter shippedHFilesCounter;
private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
private final MutableFastCounter unknownFileLengthForClosedWAL;
private final MutableFastCounter uncleanlyClosedWAL;
private final MutableFastCounter uncleanlyClosedSkippedBytes;
private final MutableFastCounter restartWALReading;
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes;
public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms;
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
sizeOfHFileRefsQueueGauge =
rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
.getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
.getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
}
@Override public void setLastShippedAge(long age) {
ageOfLastShippedOpHist.add(age);
}
@Override public void incrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.incr(size);
}
@Override public void decrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.decr(size);
}
@Override public void incrLogReadInEdits(long size) {
logReadInEditsCounter.incr(size);
}
@Override public void incrLogEditsFiltered(long size) {
walEditsFilteredCounter.incr(size);
}
@Override public void incrBatchesShipped(int batches) {
shippedBatchesCounter.incr(batches);
}
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
@Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size);
}
@Override public void incrLogReadInBytes(long size) {
logReadInBytesCounter.incr(size);
}
@Override public void clear() {
}
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpHist.getMax();
}
@Override public void incrHFilesShipped(long hfiles) {
shippedHFilesCounter.incr(hfiles);
}
@Override
public void incrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.incr(size);
}
@Override
public void decrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.decr(size);
}
@Override
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
@Override
public void incrUnknownFileLengthForClosedWAL() {
unknownFileLengthForClosedWAL.incr(1L);
}
@Override
public void incrUncleanlyClosedWALs() {
uncleanlyClosedWAL.incr(1L);
}
@Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes);
}
@Override
public void incrRestartedWALReading() {
restartWALReading.incr(1L);
}
@Override
public void incrRepeatedFileBytes(final long bytes) {
repeatedFileBytes.incr(bytes);
}
@Override
public void incrCompletedWAL() {
completedWAL.incr(1L);
}
@Override
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
@Override
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}
@Override
public void init() {
rms.init();
}
@Override
public void setGauge(String gaugeName, long value) {
rms.setGauge(KEY_PREFIX + gaugeName, value);
}
@Override
public void incGauge(String gaugeName, long delta) {
rms.incGauge(KEY_PREFIX + gaugeName, delta);
}
@Override
public void decGauge(String gaugeName, long delta) {
rms.decGauge(KEY_PREFIX + gaugeName, delta);
}
@Override
public void removeMetric(String key) {
rms.removeMetric(KEY_PREFIX + key);
}
@Override
public void incCounters(String counterName, long delta) {
rms.incCounters(KEY_PREFIX + counterName, delta);
}
@Override
public void updateHistogram(String name, long value) {
rms.updateHistogram(KEY_PREFIX + name, value);
}
@Override
public String getMetricsContext() {
return rms.getMetricsContext();
}
@Override
public String getMetricsDescription() {
return rms.getMetricsDescription();
}
@Override
public String getMetricsJmxContext() {
return rms.getMetricsJmxContext();
}
@Override
public String getMetricsName() {
return rms.getMetricsName();
}
@Override
public long getWALEditsRead() {
return this.logReadInEditsCounter.value();
}
@Override
public long getShippedOps() {
return this.shippedOpsCounter.value();
}
@Override
public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}
@Override
public void setWALReaderEditsBufferBytes(long usage) {
this.walReaderBufferUsageBytes.set(usage);
}
@Override
public long getWALReaderEditsBufferBytes() {
return this.walReaderBufferUsageBytes.value();
}
}

View File

@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink(); public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id); public MetricsReplicationSourceSource getSource(String id);
public MetricsReplicationTableSource getTableSource(String tableName); public MetricsReplicationTableSource getTableSource(String tableName);
public MetricsReplicationSourceSource getGlobalSource(); public MetricsReplicationGlobalSourceSourceImpl getGlobalSource();
} }

View File

@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
} }
@Override public MetricsReplicationSourceSource getGlobalSource() { @Override public MetricsReplicationGlobalSourceSourceImpl getGlobalSource() {
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
} }
} }

View File

@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
private long timeStampNextToReplicate; private long timeStampNextToReplicate;
private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource; private final MetricsReplicationGlobalSourceSource globalSourceSource;
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
/** /**
@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource {
* @param globalSourceSource Class to monitor global-scoped metrics * @param globalSourceSource Class to monitor global-scoped metrics
*/ */
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
MetricsReplicationSourceSource globalSourceSource, MetricsReplicationGlobalSourceSource globalSourceSource,
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id; this.id = id;
this.singleSourceSource = singleSourceSource; this.singleSourceSource = singleSourceSource;
@ -454,4 +454,19 @@ public class MetricsSource implements BaseSource {
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable; return singleSourceSourceByTable;
} }
/**
* Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
*/
public void setWALReaderEditsBufferUsage(long usageInBytes) {
globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
}
/**
* Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
* @return
*/
public long getWALReaderEditsBufferUsage() {
return globalSourceSource.getWALReaderEditsBufferBytes();
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -76,6 +77,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod; private int statsThreadPeriod;
// ReplicationLoad to access replication metrics // ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad; private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler; private PeerProcedureHandler peerProcedureHandler;
@ -124,10 +126,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
throw new IOException("Could not read cluster id", ke); throw new IOException("Could not read cluster id", ke);
} }
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
mapping); mapping, globalMetricsSource);
this.syncReplicationPeerInfoProvider = this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY; PeerActionListener peerActionListener = PeerActionListener.DUMMY;

View File

@ -771,7 +771,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize); throttler.addPushSize(batchSize);
} }
totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize); long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
} }
@Override @Override

View File

@ -169,6 +169,9 @@ public class ReplicationSourceManager implements ReplicationListener {
// Maximum number of retries before taking bold actions when deleting remote wal files for sync // Maximum number of retries before taking bold actions when deleting remote wal files for sync
// replication peer. // replication peer.
private final int maxRetriesMultiplier; private final int maxRetriesMultiplier;
// Total buffer size on this RegionServer for holding batched edits to be shipped.
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
/** /**
* Creates a replication manager and sets the watch on all the other registered region servers * Creates a replication manager and sets the watch on all the other registered region servers
@ -186,7 +189,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider, WALFileLengthProvider walFileLengthProvider,
SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
this.sources = new ConcurrentHashMap<>(); this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage; this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers; this.replicationPeers = replicationPeers;
@ -222,6 +226,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.globalMetrics = globalMetrics;
} }
/** /**
@ -1069,6 +1076,14 @@ public class ReplicationSourceManager implements ReplicationListener {
return totalBufferUsed; return totalBufferUsed;
} }
/**
* Returns the maximum size in bytes of edits held in memory which are pending replication
* across all sources inside this RegionServer.
*/
public long getTotalBufferLimit() {
return totalBufferLimit;
}
/** /**
* Get the directory where wals are archived * Get the directory where wals are archived
* @return the directory where wals are archived * @return the directory where wals are archived
@ -1106,6 +1121,10 @@ public class ReplicationSourceManager implements ReplicationListener {
*/ */
public String getStats() { public String getStats() {
StringBuilder stats = new StringBuilder(); StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) { for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n"); stats.append(source.getStats() + "\n");
@ -1131,4 +1150,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() { int activeFailoverTaskCount() {
return executor.getActiveCount(); return executor.getActiveCount();
} }
MetricsReplicationGlobalSourceSource getGlobalMetrics() {
return this.globalMetrics;
}
} }

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
// the +1 is for the current thread reading before placing onto the queue // the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1); int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
@ -276,6 +274,8 @@ class ReplicationSourceWALReader extends Thread {
private boolean checkQuota() { private boolean checkQuota() {
// try not to go over total quota // try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) { if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries); Threads.sleep(sleepForRetries);
return false; return false;
} }
@ -404,7 +404,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all * @return true if we should clear buffer and push all
*/ */
private boolean acquireBufferQuota(long size) { private boolean acquireBufferQuota(long size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota; long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed >= totalBufferQuota;
} }
/** /**

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
@ -329,9 +330,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource = MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id); new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = MetricsReplicationGlobalSourceSource globalSourceSource =
new MetricsReplicationGlobalSourceSource(globalRms); new MetricsReplicationGlobalSourceSourceImpl(globalRms);
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
@ -497,6 +498,44 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
} }
/**
* Not used by unit tests, helpful for manual testing with replication.
* <p>
* Snippet for `hbase shell`:
* <pre>
* create 't', 'f'
* add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
* 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
* alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
* </pre>
*/
public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
private long duration;
public SleepingReplicationEndpointForTest() {
super();
}
@Override
public void init(Context context) throws IOException {
super.init(context);
if (this.ctx != null) {
duration = this.ctx.getConfiguration().getLong(
"hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
}
}
@Override
public boolean replicate(ReplicateContext context) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return super.replicate(context);
}
}
public static class InterClusterReplicationEndpointForTest public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint { extends HBaseInterClusterReplicationEndpoint {

View File

@ -371,6 +371,8 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit()).thenReturn(
(long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class); Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class); ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceManager()).thenReturn(mockSourceManager);
@ -378,6 +380,9 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer); when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered); when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
MetricsReplicationGlobalSourceSource.class);
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source; return source;
} }