HBASE-15093 Replication can report incorrect size of log queue for the global source when multiwal is enabled (Ashu Pachauri)

This commit is contained in:
tedyu 2016-04-11 08:17:20 -07:00
parent a395922ad5
commit 8541fe4ad1
5 changed files with 27 additions and 25 deletions

View File

@ -36,7 +36,6 @@ public interface MetricsReplicationSourceSource {
public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue"; public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
void setLastShippedAge(long age); void setLastShippedAge(long age);
void setSizeOfLogQueue(int size);
void incrSizeOfLogQueue(int size); void incrSizeOfLogQueue(int size);
void decrSizeOfLogQueue(int size); void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size); void incrLogEditsFiltered(long size);
@ -47,6 +46,7 @@ public interface MetricsReplicationSourceSource {
void incrLogReadInEdits(long size); void incrLogReadInEdits(long size);
void clear(); void clear();
long getLastShippedAge(); long getLastShippedAge();
int getSizeOfLogQueue();
void incrHFilesShipped(long hfiles); void incrHFilesShipped(long hfiles);
void incrSizeOfHFileRefsQueue(long size); void incrSizeOfHFileRefsQueue(long size);
void decrSizeOfHFileRefsQueue(long size); void decrSizeOfHFileRefsQueue(long size);

View File

@ -64,10 +64,6 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
ageOfLastShippedOpGauge.set(age); ageOfLastShippedOpGauge.set(age);
} }
@Override public void setSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.set(size);
}
@Override public void incrSizeOfLogQueue(int size) { @Override public void incrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.incr(size); sizeOfLogQueueGauge.incr(size);
} }
@ -121,4 +117,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void decrSizeOfHFileRefsQueue(long size) { public void decrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.decr(size); sizeOfHFileRefsQueueGauge.decr(size);
} }
@Override
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
} }

View File

@ -85,10 +85,6 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
ageOfLastShippedOpGauge.set(age); ageOfLastShippedOpGauge.set(age);
} }
@Override public void setSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.set(size);
}
@Override public void incrSizeOfLogQueue(int size) { @Override public void incrSizeOfLogQueue(int size) {
sizeOfLogQueueGauge.incr(size); sizeOfLogQueueGauge.incr(size);
} }
@ -158,4 +154,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public void decrSizeOfHFileRefsQueue(long size) { public void decrSizeOfHFileRefsQueue(long size) {
sizeOfHFileRefsQueueGauge.decr(size); sizeOfHFileRefsQueueGauge.decr(size);
} }
@Override
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
} }

View File

@ -89,14 +89,16 @@ public class MetricsSource {
} }
/** /**
* Set the size of the log queue * Increment size of the log queue.
*
* @param size the size.
*/ */
public void setSizeOfLogQueue(int size) { public void incrSizeOfLogQueue() {
singleSourceSource.setSizeOfLogQueue(size); singleSourceSource.incrSizeOfLogQueue(1);
globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize); globalSourceSource.incrSizeOfLogQueue(1);
lastQueueSize = size; }
public void decrSizeOfLogQueue() {
singleSourceSource.decrSizeOfLogQueue(1);
globalSourceSource.decrSizeOfLogQueue(1);
} }
/** /**
@ -186,7 +188,7 @@ public class MetricsSource {
* @return sizeOfLogQueue * @return sizeOfLogQueue
*/ */
public int getSizeOfLogQueue() { public int getSizeOfLogQueue() {
return this.lastQueueSize; return singleSourceSource.getSizeOfLogQueue();
} }
/** /**

View File

@ -31,7 +31,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -138,7 +137,6 @@ public class ReplicationSource extends Thread
private WALEntryFilter walEntryFilter; private WALEntryFilter walEntryFilter;
// throttler // throttler
private ReplicationThrottler throttler; private ReplicationThrottler throttler;
private AtomicInteger logQueueSize = new AtomicInteger(0);
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads = private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>(); new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
@ -221,10 +219,10 @@ public class ReplicationSource extends Thread
} }
} }
queue.put(log); queue.put(log);
int queueSize = logQueueSize.incrementAndGet(); this.metrics.incrSizeOfLogQueue();
this.metrics.setSizeOfLogQueue(queueSize);
// This will log a warning for each new log that gets created above the warn threshold // This will log a warning for each new log that gets created above the warn threshold
if (queue.size() > this.logQueueWarnThreshold) { int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
+ " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
} }
@ -510,7 +508,8 @@ public class ReplicationSource extends Thread
private long currentNbHFiles = 0; private long currentNbHFiles = 0;
public ReplicationSourceWorkerThread(String walGroupId, public ReplicationSourceWorkerThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSource source) {
this.walGroupId = walGroupId; this.walGroupId = walGroupId;
this.queue = queue; this.queue = queue;
this.replicationQueueInfo = replicationQueueInfo; this.replicationQueueInfo = replicationQueueInfo;
@ -769,8 +768,7 @@ public class ReplicationSource extends Thread
try { try {
if (this.currentPath == null) { if (this.currentPath == null) {
this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS); this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
int queueSize = logQueueSize.decrementAndGet(); metrics.decrSizeOfLogQueue();
metrics.setSizeOfLogQueue(queueSize);
if (this.currentPath != null) { if (this.currentPath != null) {
// For recovered queue: must use peerClusterZnode since peerId is a parsed value // For recovered queue: must use peerClusterZnode since peerId is a parsed value
manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode, manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,