diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 188c3a3f658..3aa01ab9526 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -36,7 +36,6 @@ public interface MetricsReplicationSourceSource { public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue"; void setLastShippedAge(long age); - void setSizeOfLogQueue(int size); void incrSizeOfLogQueue(int size); void decrSizeOfLogQueue(int size); void incrLogEditsFiltered(long size); @@ -47,6 +46,7 @@ public interface MetricsReplicationSourceSource { void incrLogReadInEdits(long size); void clear(); long getLastShippedAge(); + int getSizeOfLogQueue(); void incrHFilesShipped(long hfiles); void incrSizeOfHFileRefsQueue(long size); void decrSizeOfHFileRefsQueue(long size); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 93b10b604dd..2526f32c7bb 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -64,10 +64,6 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS ageOfLastShippedOpGauge.set(age); } - @Override public void setSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.set(size); - } - @Override public void incrSizeOfLogQueue(int size) { sizeOfLogQueueGauge.incr(size); } @@ -121,4 +117,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public void decrSizeOfHFileRefsQueue(long size) { sizeOfHFileRefsQueueGauge.decr(size); } + + @Override + public int getSizeOfLogQueue() { + return (int)sizeOfLogQueueGauge.value(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 99417122b8c..03e31167179 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -85,10 +85,6 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou ageOfLastShippedOpGauge.set(age); } - @Override public void setSizeOfLogQueue(int size) { - sizeOfLogQueueGauge.set(size); - } - @Override public void incrSizeOfLogQueue(int size) { sizeOfLogQueueGauge.incr(size); } @@ -158,4 +154,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public void decrSizeOfHFileRefsQueue(long size) { sizeOfHFileRefsQueueGauge.decr(size); } + + @Override + public int getSizeOfLogQueue() { + return (int)sizeOfLogQueueGauge.value(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 9687af7693d..4a044bfb970 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -89,14 +89,16 @@ public class MetricsSource { } /** - * Set the size of the log queue - * - * @param size the size. + * Increment size of the log queue. */ - public void setSizeOfLogQueue(int size) { - singleSourceSource.setSizeOfLogQueue(size); - globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize); - lastQueueSize = size; + public void incrSizeOfLogQueue() { + singleSourceSource.incrSizeOfLogQueue(1); + globalSourceSource.incrSizeOfLogQueue(1); + } + + public void decrSizeOfLogQueue() { + singleSourceSource.decrSizeOfLogQueue(1); + globalSourceSource.decrSizeOfLogQueue(1); } /** @@ -186,7 +188,7 @@ public class MetricsSource { * @return sizeOfLogQueue */ public int getSizeOfLogQueue() { - return this.lastQueueSize; + return singleSourceSource.getSizeOfLogQueue(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 51ca7edcd77..7e58e4102aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -31,7 +31,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; @@ -138,7 +137,6 @@ public class ReplicationSource extends Thread private WALEntryFilter walEntryFilter; // throttler private ReplicationThrottler throttler; - private AtomicInteger logQueueSize = new AtomicInteger(0); private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); @@ -221,10 +219,10 @@ public class ReplicationSource extends Thread } } queue.put(log); - int queueSize = logQueueSize.incrementAndGet(); - this.metrics.setSizeOfLogQueue(queueSize); + this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold - if (queue.size() > this.logQueueWarnThreshold) { + int queueSize = queue.size(); + if (queueSize > this.logQueueWarnThreshold) { LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); } @@ -510,7 +508,8 @@ public class ReplicationSource extends Thread private long currentNbHFiles = 0; public ReplicationSourceWorkerThread(String walGroupId, - PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { + PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, + ReplicationSource source) { this.walGroupId = walGroupId; this.queue = queue; this.replicationQueueInfo = replicationQueueInfo; @@ -769,8 +768,7 @@ public class ReplicationSource extends Thread try { if (this.currentPath == null) { this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS); - int queueSize = logQueueSize.decrementAndGet(); - metrics.setSizeOfLogQueue(queueSize); + metrics.decrSizeOfLogQueue(); if (this.currentPath != null) { // For recovered queue: must use peerClusterZnode since peerId is a parsed value manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,