diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 630fdb8d278..e373a6c1349 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,239 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.metrics2.lib.MutableFastCounter; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; -import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ - private static final String KEY_PREFIX = "source."; +public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource { - private final MetricsReplicationSourceImpl rms; + public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; - private final MutableHistogram ageOfLastShippedOpHist; - private final MutableGaugeLong sizeOfLogQueueGauge; - private final MutableFastCounter logReadInEditsCounter; - private final MutableFastCounter walEditsFilteredCounter; - private final MutableFastCounter shippedBatchesCounter; - private final MutableFastCounter shippedOpsCounter; - private final MutableFastCounter shippedBytesCounter; - private final MutableFastCounter logReadInBytesCounter; - private final MutableFastCounter shippedHFilesCounter; - private final MutableGaugeLong sizeOfHFileRefsQueueGauge; - private final MutableFastCounter unknownFileLengthForClosedWAL; - private final MutableFastCounter uncleanlyClosedWAL; - private final MutableFastCounter uncleanlyClosedSkippedBytes; - private final MutableFastCounter restartWALReading; - private final MutableFastCounter repeatedFileBytes; - private final MutableFastCounter completedWAL; - private final MutableFastCounter completedRecoveryQueue; - private final MutableFastCounter failedRecoveryQueue; + /** + * Sets the total usage of memory used by edits in memory read from WALs. The memory represented + * by this usage measure is across peers/sources. For example, we may batch the same WAL edits + * multiple times for the sake of replicating them to multiple peers.. + * @param usage The memory used by edits in bytes + */ + void setWALReaderEditsBufferBytes(long usage); - public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { - this.rms = rms; - - ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); - - sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); - - shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); - - shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); - - shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); - - logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); - - logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); - - walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); - - shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); - - sizeOfHFileRefsQueueGauge = - rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); - - unknownFileLengthForClosedWAL = rms.getMetricsRegistry() - .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); - uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); - uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() - .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); - restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); - repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); - completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); - completedRecoveryQueue = rms.getMetricsRegistry() - .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); - failedRecoveryQueue = rms.getMetricsRegistry() - .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); - } - - @Override public void setLastShippedAge(long age) { - ageOfLastShippedOpHist.add(age); - } - - @Override public void incrSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.incr(size); - } - - @Override public void decrSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.decr(size); - } - - @Override public void incrLogReadInEdits(long size) { - logReadInEditsCounter.incr(size); - } - - @Override public void incrLogEditsFiltered(long size) { - walEditsFilteredCounter.incr(size); - } - - @Override public void incrBatchesShipped(int batches) { - shippedBatchesCounter.incr(batches); - } - - @Override public void incrOpsShipped(long ops) { - shippedOpsCounter.incr(ops); - } - - @Override public void incrShippedBytes(long size) { - shippedBytesCounter.incr(size); - } - - @Override public void incrLogReadInBytes(long size) { - logReadInBytesCounter.incr(size); - } - - @Override public void clear() { - } - - @Override - public long getLastShippedAge() { - return ageOfLastShippedOpHist.getMax(); - } - - @Override public void incrHFilesShipped(long hfiles) { - shippedHFilesCounter.incr(hfiles); - } - - @Override - public void incrSizeOfHFileRefsQueue(long size) { - sizeOfHFileRefsQueueGauge.incr(size); - } - - @Override - public void decrSizeOfHFileRefsQueue(long size) { - sizeOfHFileRefsQueueGauge.decr(size); - } - - @Override - public int getSizeOfLogQueue() { - return (int)sizeOfLogQueueGauge.value(); - } - - @Override - public void incrUnknownFileLengthForClosedWAL() { - unknownFileLengthForClosedWAL.incr(1L); - } - @Override - public void incrUncleanlyClosedWALs() { - uncleanlyClosedWAL.incr(1L); - } - @Override - public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { - uncleanlyClosedSkippedBytes.incr(bytes); - } - @Override - public void incrRestartedWALReading() { - restartWALReading.incr(1L); - } - @Override - public void incrRepeatedFileBytes(final long bytes) { - repeatedFileBytes.incr(bytes); - } - @Override - public void incrCompletedWAL() { - completedWAL.incr(1L); - } - @Override - public void incrCompletedRecoveryQueue() { - completedRecoveryQueue.incr(1L); - } - @Override - public void incrFailedRecoveryQueue() { - failedRecoveryQueue.incr(1L); - } - @Override - public void init() { - rms.init(); - } - - @Override - public void setGauge(String gaugeName, long value) { - rms.setGauge(KEY_PREFIX + gaugeName, value); - } - - @Override - public void incGauge(String gaugeName, long delta) { - rms.incGauge(KEY_PREFIX + gaugeName, delta); - } - - @Override - public void decGauge(String gaugeName, long delta) { - rms.decGauge(KEY_PREFIX + gaugeName, delta); - } - - @Override - public void removeMetric(String key) { - rms.removeMetric(KEY_PREFIX + key); - } - - @Override - public void incCounters(String counterName, long delta) { - rms.incCounters(KEY_PREFIX + counterName, delta); - } - - @Override - public void updateHistogram(String name, long value) { - rms.updateHistogram(KEY_PREFIX + name, value); - } - - @Override - public String getMetricsContext() { - return rms.getMetricsContext(); - } - - @Override - public String getMetricsDescription() { - return rms.getMetricsDescription(); - } - - @Override - public String getMetricsJmxContext() { - return rms.getMetricsJmxContext(); - } - - @Override - public String getMetricsName() { - return rms.getMetricsName(); - } - - @Override - public long getWALEditsRead() { - return this.logReadInEditsCounter.value(); - } - - @Override - public long getShippedOps() { - return this.shippedOpsCounter.value(); - } - - @Override - public long getEditsFiltered() { - return this.walEditsFilteredCounter.value(); - } + /** + * Returns the size, in bytes, of edits held in memory to be replicated across all peers. + */ + long getWALReaderEditsBufferBytes(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java new file mode 100644 index 00000000000..1c04109ed62 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableHistogram; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsReplicationGlobalSourceSourceImpl + implements MetricsReplicationGlobalSourceSource { + private static final String KEY_PREFIX = "source."; + + private final MetricsReplicationSourceImpl rms; + + private final MutableHistogram ageOfLastShippedOpHist; + private final MutableGaugeLong sizeOfLogQueueGauge; + private final MutableFastCounter logReadInEditsCounter; + private final MutableFastCounter walEditsFilteredCounter; + private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter shippedOpsCounter; + private final MutableFastCounter shippedBytesCounter; + private final MutableFastCounter logReadInBytesCounter; + private final MutableFastCounter shippedHFilesCounter; + private final MutableGaugeLong sizeOfHFileRefsQueueGauge; + private final MutableFastCounter unknownFileLengthForClosedWAL; + private final MutableFastCounter uncleanlyClosedWAL; + private final MutableFastCounter uncleanlyClosedSkippedBytes; + private final MutableFastCounter restartWALReading; + private final MutableFastCounter repeatedFileBytes; + private final MutableFastCounter completedWAL; + private final MutableFastCounter completedRecoveryQueue; + private final MutableFastCounter failedRecoveryQueue; + private final MutableGaugeLong walReaderBufferUsageBytes; + + public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { + this.rms = rms; + + ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); + + sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); + + shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); + + shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); + + shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); + + logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); + + logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); + + walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); + + shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); + + sizeOfHFileRefsQueueGauge = + rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); + + unknownFileLengthForClosedWAL = rms.getMetricsRegistry() + .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); + uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); + uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() + .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); + restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); + repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); + completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); + completedRecoveryQueue = rms.getMetricsRegistry() + .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); + failedRecoveryQueue = rms.getMetricsRegistry() + .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); + + walReaderBufferUsageBytes = rms.getMetricsRegistry() + .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); + } + + @Override public void setLastShippedAge(long age) { + ageOfLastShippedOpHist.add(age); + } + + @Override public void incrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.incr(size); + } + + @Override public void decrSizeOfLogQueue(int size) { + sizeOfLogQueueGauge.decr(size); + } + + @Override public void incrLogReadInEdits(long size) { + logReadInEditsCounter.incr(size); + } + + @Override public void incrLogEditsFiltered(long size) { + walEditsFilteredCounter.incr(size); + } + + @Override public void incrBatchesShipped(int batches) { + shippedBatchesCounter.incr(batches); + } + + @Override public void incrOpsShipped(long ops) { + shippedOpsCounter.incr(ops); + } + + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + } + + @Override public void incrLogReadInBytes(long size) { + logReadInBytesCounter.incr(size); + } + + @Override public void clear() { + } + + @Override + public long getLastShippedAge() { + return ageOfLastShippedOpHist.getMax(); + } + + @Override public void incrHFilesShipped(long hfiles) { + shippedHFilesCounter.incr(hfiles); + } + + @Override + public void incrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.incr(size); + } + + @Override + public void decrSizeOfHFileRefsQueue(long size) { + sizeOfHFileRefsQueueGauge.decr(size); + } + + @Override + public int getSizeOfLogQueue() { + return (int)sizeOfLogQueueGauge.value(); + } + + @Override + public void incrUnknownFileLengthForClosedWAL() { + unknownFileLengthForClosedWAL.incr(1L); + } + @Override + public void incrUncleanlyClosedWALs() { + uncleanlyClosedWAL.incr(1L); + } + @Override + public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { + uncleanlyClosedSkippedBytes.incr(bytes); + } + @Override + public void incrRestartedWALReading() { + restartWALReading.incr(1L); + } + @Override + public void incrRepeatedFileBytes(final long bytes) { + repeatedFileBytes.incr(bytes); + } + @Override + public void incrCompletedWAL() { + completedWAL.incr(1L); + } + @Override + public void incrCompletedRecoveryQueue() { + completedRecoveryQueue.incr(1L); + } + @Override + public void incrFailedRecoveryQueue() { + failedRecoveryQueue.incr(1L); + } + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(KEY_PREFIX + gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(KEY_PREFIX + gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(KEY_PREFIX + gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(KEY_PREFIX + key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(KEY_PREFIX + counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(KEY_PREFIX + name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } + + @Override + public long getWALEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override + public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override + public long getEditsFiltered() { + return this.walEditsFilteredCounter.value(); + } + + @Override + public void setWALReaderEditsBufferBytes(long usage) { + this.walReaderBufferUsageBytes.set(usage); + } + + @Override + public long getWALReaderEditsBufferBytes() { + return this.walReaderBufferUsageBytes.value(); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java index 2816f832edf..73d2cfd62f4 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory { public MetricsReplicationSinkSource getSink(); public MetricsReplicationSourceSource getSource(String id); public MetricsReplicationTableSource getTableSource(String tableName); - public MetricsReplicationSourceSource getGlobalSource(); + public MetricsReplicationGlobalSourceSourceImpl getGlobalSource(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java index a3b34620041..061fc58296e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); } - @Override public MetricsReplicationSourceSource getGlobalSource() { - return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); + @Override public MetricsReplicationGlobalSourceSourceImpl getGlobalSource() { + return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 39fe7b429d3..0f73576feaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource { private long timeStampNextToReplicate; private final MetricsReplicationSourceSource singleSourceSource; - private final MetricsReplicationSourceSource globalSourceSource; + private final MetricsReplicationGlobalSourceSource globalSourceSource; private Map singleSourceSourceByTable; /** @@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationSourceSource globalSourceSource, + MetricsReplicationGlobalSourceSource globalSourceSource, Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; @@ -454,4 +454,19 @@ public class MetricsSource implements BaseSource { public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } + + /** + * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. + */ + public void setWALReaderEditsBufferUsage(long usageInBytes) { + globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); + } + + /** + * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. + * @return + */ + public long getWALReaderEditsBufferUsage() { + return globalSourceSource.getWALReaderEditsBufferBytes(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4cbce8c7273..195877bf5f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; @@ -76,6 +77,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + private MetricsReplicationGlobalSourceSource globalMetricsSource; private PeerProcedureHandler peerProcedureHandler; @@ -124,10 +126,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer throw new IOException("Could not read cluster id", ke); } SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); + this.globalMetricsSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + mapping, globalMetricsSource); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1d9269d4abc..f24ecfa5539 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -771,7 +771,9 @@ public class ReplicationSource implements ReplicationSourceInterface { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - totalBufferUsed.addAndGet(-batchSize); + long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); + // Record the new buffer usage + this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 1a012bd5db4..2cf91ed65b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -169,6 +169,9 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + // Total buffer size on this RegionServer for holding batched edits to be shipped. + private final long totalBufferLimit; + private final MetricsReplicationGlobalSourceSource globalMetrics; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -186,7 +189,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, + MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -222,6 +226,9 @@ public class ReplicationSourceManager implements ReplicationListener { this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); + this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.globalMetrics = globalMetrics; } /** @@ -1069,6 +1076,14 @@ public class ReplicationSourceManager implements ReplicationListener { return totalBufferUsed; } + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer. + */ + public long getTotalBufferLimit() { + return totalBufferLimit; + } + /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -1106,6 +1121,10 @@ public class ReplicationSourceManager implements ReplicationListener { */ public String getStats() { StringBuilder stats = new StringBuilder(); + // Print stats that apply across all Replication Sources + stats.append("Global stats: "); + stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") + .append(getTotalBufferLimit()).append("B\n"); for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append(source.getStats() + "\n"); @@ -1131,4 +1150,8 @@ public class ReplicationSourceManager implements ReplicationListener { int activeFailoverTaskCount() { return executor.getActiveCount(); } + + MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7e0e550106e..c71db1bf785 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread { // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -276,6 +274,8 @@ class ReplicationSourceWALReader extends Thread { private boolean checkQuota() { // try not to go over total quota if (totalBufferUsed.get() > totalBufferQuota) { + LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); Threads.sleep(sleepForRetries); return false; } @@ -404,7 +404,10 @@ class ReplicationSourceWALReader extends Thread { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + long newBufferUsed = totalBufferUsed.addAndGet(size); + // Record the new buffer usage + this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= totalBufferQuota; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 4dd264cd5b2..5a6ac0c4874 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; @@ -329,9 +330,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); - MetricsReplicationSourceSource globalSourceSource = - new MetricsReplicationGlobalSourceSource(globalRms); - MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); + MetricsReplicationGlobalSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSourceImpl(globalRms); + MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); Map singleSourceSourceByTable = @@ -497,6 +498,44 @@ public class TestReplicationEndpoint extends TestReplicationBase { } } + /** + * Not used by unit tests, helpful for manual testing with replication. + *

+ * Snippet for `hbase shell`: + *

+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \
+   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
+   * 
+ */ + public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { + private long duration; + public SleepingReplicationEndpointForTest() { + super(); + } + + @Override + public void init(Context context) throws IOException { + super.init(context); + if (this.ctx != null) { + duration = this.ctx.getConfiguration().getLong( + "hbase.test.sleep.replication.endpoint.duration.millis", 5000L); + } + } + + @Override + public boolean replicate(ReplicateContext context) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return super.replicate(context); + } + } + public static class InterClusterReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2a21660dd47..63e7a8b9049 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -371,6 +371,8 @@ public class TestWALEntryStream { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(mockSourceManager.getTotalBufferLimit()).thenReturn( + (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); @@ -378,6 +380,9 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( + MetricsReplicationGlobalSourceSource.class); + when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; }