From 6a4c9be967f1b0c29cf937927e0b6aec929721f9 Mon Sep 17 00:00:00 2001 From: shahrs87 Date: Thu, 18 Feb 2021 20:59:07 -0800 Subject: [PATCH] HBASE-25539: Add age of oldest wal metric (#2962) Signed-off-by: Bharath Vissapragada --- .../MetricsReplicationSourceSource.java | 4 + ...ricsReplicationGlobalSourceSourceImpl.java | 12 ++ .../MetricsReplicationSourceSourceImpl.java | 14 ++ .../regionserver/MetricsSource.java | 11 + .../RecoveredReplicationSource.java | 12 +- .../RecoveredReplicationSourceShipper.java | 14 +- .../regionserver/ReplicationSource.java | 65 ++---- .../ReplicationSourceLogQueue.java | 189 ++++++++++++++++++ .../ReplicationSourceShipper.java | 7 +- .../ReplicationSourceWALReader.java | 23 ++- .../SerialReplicationSourceWALReader.java | 7 +- .../regionserver/WALEntryStream.java | 13 +- .../hbase/wal/AbstractFSWALProvider.java | 2 +- .../regionserver/TestReplicationSource.java | 71 ++++++- .../TestReplicationSourceLogQueue.java | 83 ++++++++ .../regionserver/TestWALEntryStream.java | 104 ++++++---- 16 files changed, 504 insertions(+), 127 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 3fd5ac62b8d..b5eb0aa190d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -52,6 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; + /* Used to track the age of oldest wal in ms since its creation time */ + String OLDEST_WAL_AGE = "source.oldestWalAge"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -79,4 +81,6 @@ public interface MetricsReplicationSourceSource extends BaseSource { long getWALEditsRead(); long getShippedOps(); long getEditsFiltered(); + void setOldestWalAge(long age); + long getOldestWalAge(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 52aa1b03d8f..9e69f188f55 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -210,6 +210,18 @@ public class MetricsReplicationGlobalSourceSourceImpl public void incrFailedRecoveryQueue() { failedRecoveryQueue.incr(1L); } + + @Override + public void setOldestWalAge(long age) { + // Not implemented + } + + @Override + public long getOldestWalAge() { + // Not implemented + return 0; + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 2af3ac92f4d..f9a907ff235 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -44,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logReadInBytesKey; private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; + private final String oldestWalAgeKey; private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -71,6 +72,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter repeatedFileBytes; private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; + private final MutableGaugeLong oldestWalAge; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -130,6 +132,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou completedRecoveryKey = this.keyPrefix + "completedRecoverQueues"; completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L); + + oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; + oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); } @Override public void setLastShippedAge(long age) { @@ -195,6 +200,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(repeatedBytesKey); rms.removeMetric(completedLogsKey); rms.removeMetric(completedRecoveryKey); + rms.removeMetric(oldestWalAgeKey); } @Override @@ -260,6 +266,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou @Override public void incrFailedRecoveryQueue() {/*no op*/} + @Override public void setOldestWalAge(long age) { + oldestWalAge.set(age); + } + + @Override public long getOldestWalAge() { + return oldestWalAge.value(); + } + @Override public void init() { rms.init(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 650af2e103a..f11b7de9c54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -397,6 +397,17 @@ public class MetricsSource implements BaseSource { globalSourceSource.incrFailedRecoveryQueue(); } + /* + Sets the age of oldest log file just for source. + */ + public void setOldestWalAge(long age) { + singleSourceSource.setOldestWalAge(age); + } + + public long getOldestWalAge() { + return singleSourceSource.getOldestWalAge(); + } + @Override public void init() { singleSourceSource.init(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 39c4beb0436..516ccf14929 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -58,13 +58,13 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { - return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) { + return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage); } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { + public void locateRecoveredPaths(String walGroupId) throws IOException { boolean hasPathChanged = false; + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); PriorityBlockingQueue newPaths = new PriorityBlockingQueue(queueSizePerGroup, new AbstractFSWALProvider.WALStartTimeComparator()); pathsLoop: for (Path path : queue) { @@ -117,9 +117,9 @@ public class RecoveredReplicationSource extends ReplicationSource { // put the correct locations in the queue // since this is a recovered queue with no new incoming logs, // there shouldn't be any concurrency issues - queue.clear(); + logQueue.clear(walGroupId); for (Path path : newPaths) { - queue.add(path); + logQueue.enqueueLog(path, walGroupId); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 91109cf76fa..83256c9b436 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; @@ -40,9 +38,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, RecoveredReplicationSource source, + ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source, ReplicationQueueStorage queueStorage) { - super(conf, walGroupId, queue, source); + super(conf, walGroupId, logQueue, source); this.source = source; this.replicationQueues = queueStorage; } @@ -65,7 +63,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { try { - source.locateRecoveredPaths(queue); + source.locateRecoveredPaths(walGroupId); break; } catch (IOException e) { LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); @@ -82,9 +80,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper String peerClusterZNode = source.getQueueId(); try { startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(), - peerClusterZNode, this.queue.peek().getName()); - LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(), - startPosition); + peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName()); + LOG.trace("Recovered queue started with log {} at position {}", + this.logQueue.getQueue(walGroupId).peek(), startPosition); } catch (ReplicationException e) { terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6a64dd864f9..063b3d458f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -24,14 +24,12 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,11 +84,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); - // Queues of logs to process, entry in format of walGroupId->queue, - // each presents a queue for one wal group - private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; + protected ReplicationSourceLogQueue logQueue; protected ReplicationQueueStorage queueStorage; protected ReplicationPeer replicationPeer; @@ -118,8 +114,6 @@ public class ReplicationSource implements ReplicationSourceInterface { volatile boolean sourceRunning = false; // Metrics for this source private MetricsSource metrics; - // WARN threshold for the number of queued logs, defaults to 2 - private int logQueueWarnThreshold; // ReplicationEndpoint which will handle the actual replication private volatile ReplicationEndpoint replicationEndpoint; @@ -213,6 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); + this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; this.manager = manager; @@ -224,7 +219,6 @@ public class ReplicationSource implements ReplicationSourceInterface { this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); - this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); @@ -255,35 +249,20 @@ public class ReplicationSource implements ReplicationSourceInterface { } // Use WAL prefix as the WALGroupId for this peer. String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); - PriorityBlockingQueue queue = queues.get(walPrefix); - if (queue == null) { - queue = new PriorityBlockingQueue<>(queueSizePerGroup, - new AbstractFSWALProvider.WALStartTimeComparator()); - // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise - // the shipper may quit immediately - queue.put(wal); - queues.put(walPrefix, queue); + boolean queueExists = logQueue.enqueueLog(wal, walPrefix); + + if (!queueExists) { if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that wal enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker - tryStartNewShipper(walPrefix, queue); + tryStartNewShipper(walPrefix); } - } else { - queue.put(wal); } if (LOG.isTraceEnabled()) { LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, this.replicationQueueInfo.getQueueId()); } - this.metrics.incrSizeOfLogQueue(); - // This will wal a warning for each new wal that gets created above the warn threshold - int queueSize = queue.size(); - if (queueSize > this.logQueueWarnThreshold) { - LOG.warn("{} WAL group {} queue size: {} exceeds value of " - + "replication.source.log.queue.warn: {}", logPeerId(), - walPrefix, queueSize, logQueueWarnThreshold); - } } @Override @@ -375,16 +354,16 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + private void tryStartNewShipper(String walGroupId) { workerThreads.compute(walGroupId, (key, value) -> { if (value != null) { LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); return value; } else { LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); - ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); + ReplicationSourceShipper worker = createNewShipper(walGroupId); ReplicationSourceWALReader walReader = - createNewWALReader(walGroupId, queue, worker.getStartPosition()); + createNewWALReader(walGroupId, worker.getStartPosition()); Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, @@ -404,7 +383,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); - int queueSize = queues.get(walGroupId).size(); + int queueSize = logQueue.getQueueSize(walGroupId); replicationDelay = metrics.getReplicationDelay(); Path currentPath = shipper.getCurrentPath(); fileSize = -1; @@ -443,16 +422,16 @@ public class ReplicationSource implements ReplicationSourceInterface { return fileSize; } - protected ReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { - return new ReplicationSourceShipper(conf, walGroupId, queue, this); + protected ReplicationSourceShipper createNewShipper(String walGroupId) { + return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); } - private ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { + private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { return replicationPeer.getPeerConfig().isSerial() - ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) - : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, + this, walGroupId) + : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, + this, walGroupId); } /** @@ -621,14 +600,12 @@ public class ReplicationSource implements ReplicationSourceInterface { throw new IllegalStateException("Source should be active."); } LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", - logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId, + logPeerId(), this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // Start workers - for (Map.Entry> entry : queues.entrySet()) { - String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); - tryStartNewShipper(walGroupId, queue); + for (String walGroupId: logQueue.getQueues().keySet()) { + tryStartNewShipper(walGroupId); } this.startupOngoing.set(false); } @@ -857,7 +834,7 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * @return String to use as a log prefix that contains current peerId. */ - private String logPeerId(){ + public String logPeerId(){ return "peerId=" + this.getPeerId() + ","; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java new file mode 100644 index 00000000000..8a774fb18be --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics + just at one place. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplicationSourceLogQueue { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); + // Queues of logs to process, entry in format of walGroupId->queue, + // each presents a queue for one wal group + private Map> queues = new ConcurrentHashMap<>(); + private MetricsSource metrics; + private Configuration conf; + // per group queue size, keep no more than this number of logs in each wal group + private int queueSizePerGroup; + // WARN threshold for the number of queued logs, defaults to 2 + private int logQueueWarnThreshold; + private ReplicationSource source; + + public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics, + ReplicationSource source) { + this.conf = conf; + this.metrics = metrics; + this.source = source; + this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); + this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + } + + /** + * Enqueue the wal + * @param wal wal to be enqueued + * @param walGroupId Key for the wal in @queues map + * @return boolean whether this is the first time we are seeing this walGroupId. + */ + public boolean enqueueLog(Path wal, String walGroupId) { + boolean exists = false; + PriorityBlockingQueue queue = queues.get(walGroupId); + if (queue == null) { + queue = new PriorityBlockingQueue<>(queueSizePerGroup, + new AbstractFSWALProvider.WALStartTimeComparator()); + // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise + // the shipper may quit immediately + queue.put(wal); + queues.put(walGroupId, queue); + } else { + exists = true; + queue.put(wal); + } + // Increment size of logQueue + this.metrics.incrSizeOfLogQueue(); + // Compute oldest wal age + this.metrics.setOldestWalAge(getOldestWalAge()); + // This will wal a warning for each new wal that gets created above the warn threshold + int queueSize = queue.size(); + if (queueSize > this.logQueueWarnThreshold) { + LOG.warn("{} WAL group {} queue size: {} exceeds value of " + + "replication.source.log.queue.warn {}", source.logPeerId(), walGroupId, queueSize, + logQueueWarnThreshold); + } + return exists; + } + + /** + * Get the queue size for the given walGroupId. + * @param walGroupId walGroupId + */ + public int getQueueSize(String walGroupId) { + Queue queue = queues.get(walGroupId); + if (queue == null) { + return 0; + } + return queue.size(); + } + + /** + * Returns number of queues. + */ + public int getNumQueues() { + return queues.size(); + } + + public Map> getQueues() { + return queues; + } + + /** + * Return queue for the given walGroupId + * Please don't add or remove elements from the returned queue. + * Use @enqueueLog and @remove methods respectively. + * @param walGroupId walGroupId + */ + public PriorityBlockingQueue getQueue(String walGroupId) { + return queues.get(walGroupId); + } + + /** + * Remove head from the queue corresponding to given walGroupId. + * @param walGroupId walGroupId + */ + public void remove(String walGroupId) { + PriorityBlockingQueue queue = getQueue(walGroupId); + if (queue == null || queue.isEmpty()) { + return; + } + queue.remove(); + // Decrease size logQueue. + this.metrics.decrSizeOfLogQueue(); + // Re-compute age of oldest wal metric. + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /** + * Remove all the elements from the queue corresponding to walGroupId + * @param walGroupId walGroupId + */ + public void clear(String walGroupId) { + PriorityBlockingQueue queue = getQueue(walGroupId); + while (!queue.isEmpty()) { + // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1. + queue.remove(); + metrics.decrSizeOfLogQueue(); + } + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /* + Returns the age of oldest wal. + */ + long getOldestWalAge() { + long now = EnvironmentEdgeManager.currentTime(); + long timestamp = getOldestWalTimestamp(); + if (timestamp == Long.MAX_VALUE) { + // If there are no wals in the queue then set the oldest wal timestamp to current time + // so that the oldest wal age will be 0. + timestamp = now; + } + long age = now - timestamp; + return age; + } + + /* + Get the oldest wal timestamp from all the queues. + */ + private long getOldestWalTimestamp() { + long oldestWalTimestamp = Long.MAX_VALUE; + for (Map.Entry> entry : queues.entrySet()) { + PriorityBlockingQueue queue = entry.getValue(); + Path path = queue.peek(); + // Can path ever be null ? + if (path != null) { + oldestWalTimestamp = Math.min(oldestWalTimestamp, + AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); + } + } + return oldestWalTimestamp; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d4a71b260c..f188e7ba50d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; import java.io.IOException; import java.util.List; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.conf.Configuration; @@ -55,7 +54,7 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; - protected final PriorityBlockingQueue queue; + protected final ReplicationSourceLogQueue logQueue; private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper @@ -76,10 +75,10 @@ public class ReplicationSourceShipper extends Thread { private final int shipEditsTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSource source) { + ReplicationSourceLogQueue logQueue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; - this.queue = queue; + this.logQueue = logQueue; this.source = source; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 05b34a98c36..52ac144aa14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); - private final PriorityBlockingQueue logQueue; + private final ReplicationSourceLogQueue logQueue; private final FileSystem fs; private final Configuration conf; private final WALEntryFilter filter; @@ -77,6 +77,7 @@ class ReplicationSourceWALReader extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private final String walGroupId; /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the @@ -89,8 +90,8 @@ class ReplicationSourceWALReader extends Thread { * @param source replication source */ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, - ReplicationSource source) { + ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, + ReplicationSource source, String walGroupId) { this.logQueue = logQueue; this.currentPosition = startPosition; this.fs = fs; @@ -111,6 +112,7 @@ class ReplicationSourceWALReader extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); + this.walGroupId = walGroupId; LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity @@ -125,7 +127,7 @@ class ReplicationSourceWALReader extends Thread { try (WALEntryStream entryStream = new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics())) { + source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { Threads.sleep(sleepForRetries); @@ -246,18 +248,19 @@ class ReplicationSourceWALReader extends Thread { // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(IOException e) { + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) { + (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); + if (fs.getFileStatus(queue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek()); + logQueue.remove(walGroupId); currentPosition = 0; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + LOG.warn("Couldn't get file length information about log " + queue.peek()); } } } @@ -269,7 +272,7 @@ class ReplicationSourceWALReader extends Thread { return batchQueueHead.getLastWalPath(); } // otherwise, we must be currently reading from the head of the log queue - return logQueue.peek(); + return logQueue.getQueue(walGroupId).peek(); } //returns false if we've already exceeded the global quota diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a17a5..d0e76fbd77e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,9 +43,9 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, - ReplicationSource source) { - super(fs, conf, logQueue, startPosition, filter, source); + ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, + ReplicationSource source, String walGroupId) { + super(fs, conf, logQueue, startPosition, filter, source, walGroupId); checker = new SerialReplicationChecker(conf, source); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 752872a7868..5b8f0572143 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -63,7 +63,8 @@ class WALEntryStream implements Closeable { private long currentPositionOfEntry = 0; // position after reading current entry private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; + private final ReplicationSourceLogQueue logQueue; + private final String walGroupId; private final FileSystem fs; private final Configuration conf; private final WALFileLengthProvider walFileLengthProvider; @@ -81,9 +82,9 @@ class WALEntryStream implements Closeable { * @param metrics the replication metrics * @throws IOException */ - public WALEntryStream(PriorityBlockingQueue logQueue, Configuration conf, + public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, - MetricsSource metrics) throws IOException { + MetricsSource metrics, String walGroupId) throws IOException { this.logQueue = logQueue; this.fs = CommonFSUtils.getWALFileSystem(conf); this.conf = conf; @@ -91,6 +92,7 @@ class WALEntryStream implements Closeable { this.walFileLengthProvider = walFileLengthProvider; this.serverName = serverName; this.metrics = metrics; + this.walGroupId = walGroupId; } /** @@ -251,7 +253,7 @@ class WALEntryStream implements Closeable { private void dequeueCurrentLog() throws IOException { LOG.debug("EOF, closing {}", currentPath); closeReader(); - logQueue.remove(); + logQueue.remove(walGroupId); setCurrentPath(null); setPosition(0); metrics.decrSizeOfLogQueue(); @@ -301,7 +303,8 @@ class WALEntryStream implements Closeable { // open a reader on the next log in queue private boolean openNextLog() throws IOException { - Path nextPath = logQueue.peek(); + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); + Path nextPath = queue.peek(); if (nextPath != null) { openReader(nextPath); if (reader != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 187bb1191cb..a7e3ef5b8eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -436,7 +436,7 @@ public abstract class AbstractFSWALProvider> implemen * @param p path to split * @return start time */ - private static long getTS(Path p) { + public static long getTS(Path p) { return WAL.getTimestamp(p.getName()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 2b4b1efdc7f..bb2b1da1abd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,7 +34,6 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -62,6 +63,8 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -291,7 +294,7 @@ public class TestReplicationSource { source.init(testConf, null, mockManager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, - conf, null, 0, null, source); + conf, null, 0, null, source, null); ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); shipper.entryReader = reader; @@ -484,8 +487,6 @@ public class TestReplicationSource { String walGroupId = "fake-wal-group-id"; ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - queue.put(new Path("/www/html/test")); RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); Server server = mock(Server.class); when(server.getServerName()).thenReturn(serverName); @@ -498,8 +499,12 @@ public class TestReplicationSource { .thenReturn(-1L); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", -1); + MetricsSource metricsSource = mock(MetricsSource.class); + doNothing().when(metricsSource).incrSizeOfLogQueue(); + ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source); + logQueue.enqueueLog(new Path("/www/html/test"), walGroupId); RecoveredReplicationSourceShipper shipper = - new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); + new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage); assertEquals(1001L, shipper.getStartPosition()); } @@ -592,5 +597,59 @@ public class TestReplicationSource { rss.stop("Done"); } } -} + /* + Test age of oldest wal metric. + */ + @Test + public void testAgeOfOldestWal() throws Exception { + try { + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + + String id = "1"; + MetricsSource metrics = new MetricsSource(id); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + Mockito.when(peerConfig.getReplicationEndpointImpl()). + thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getGlobalMetrics()). + thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + + ReplicationSource source = new ReplicationSource(); + source.init(conf, null, manager, null, mockPeer, rss, id, null, + p -> OptionalLong.empty(), metrics); + + final Path log1 = new Path(logDir, "log-walgroup-a.8"); + manualEdge.setValue(10); + // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. + source.enqueueLog(log1); + MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); + assertEquals(2, metricsSource1.getOldestWalAge()); + + final Path log2 = new Path(logDir, "log-walgroup-b.4"); + // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 + source.enqueueLog(log2); + assertEquals(6, metricsSource1.getOldestWalAge()); + // Clear all metrics. + metrics.clear(); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { + MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class); + return factory.getSource(sourceId); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java new file mode 100644 index 00000000000..c28b18003c5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({SmallTests.class,ReplicationTests.class}) +public class TestReplicationSourceLogQueue { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class); + + /* + Testing enqueue and dequeuing of wal and check age of oldest wal. + */ + @Test + public void testEnqueueDequeue() { + try { + String walGroupId1 = "fake-walgroup-id-1"; + String walGroupId2 = "fake-walgroup-id-2"; + + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + + MetricsSource metrics = new MetricsSource("1"); + Configuration conf = HBaseConfiguration.create(); + ReplicationSource source = mock(ReplicationSource.class); + Mockito.doReturn("peer").when(source).logPeerId(); + ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source); + final Path log1 = new Path("log-walgroup-a.8"); + manualEdge.setValue(10); + // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. + logQueue.enqueueLog(log1, walGroupId1); + assertEquals(2, logQueue.getOldestWalAge()); + + final Path log2 = new Path("log-walgroup-b.4"); + // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 + logQueue.enqueueLog(log2, walGroupId2); + assertEquals(6, logQueue.getOldestWalAge()); + + // Remove an element from walGroupId2. + // After this op, there will be only one element in the queue log-walgroup-a.8 + logQueue.remove(walGroupId2); + assertEquals(2, logQueue.getOldestWalAge()); + + // Remove last element from the queue. + logQueue.remove(walGroupId1); + // This will test the case where there are no elements in the queue. + assertEquals(0, logQueue.getOldestWalAge()); + } finally { + EnvironmentEdgeManager.reset(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 1db9c175e92..9c6fafcf803 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; @@ -99,6 +101,7 @@ public class TestWALEntryStream { private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); private static final NavigableMap scopes = getScopes(); + private final String fakeWalGroupId = "fake-wal-group-id"; private static NavigableMap getScopes() { NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -107,7 +110,7 @@ public class TestWALEntryStream { } private WAL log; - PriorityBlockingQueue walQueue; + ReplicationSourceLogQueue logQueue; private PathWatcher pathWatcher; @Rule @@ -131,7 +134,8 @@ public class TestWALEntryStream { @Before public void setUp() throws Exception { - walQueue = new PriorityBlockingQueue<>(); + ReplicationSource source = mock(ReplicationSource.class); + logQueue = new ReplicationSourceLogQueue(CONF, new MetricsSource("2"), source); pathWatcher = new PathWatcher(); final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); wals.getWALProvider().addWALActionsListener(pathWatcher); @@ -165,7 +169,8 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -192,7 +197,7 @@ public class TestWALEntryStream { appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -206,8 +211,8 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + log, null, new MetricsSource("1"), fakeWalGroupId)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -220,8 +225,8 @@ public class TestWALEntryStream { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + log, null, new MetricsSource("1"), fakeWalGroupId)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -246,7 +251,8 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -254,12 +260,12 @@ public class TestWALEntryStream { appendToLog("4"); // 4 - this append is in the rolled log assertEquals("2", getRow(entryStream.next())); - assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an + assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an // entry in first log assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 // and 3 would be skipped assertEquals("4", getRow(entryStream.next())); // 4 - assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly + assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly assertFalse(entryStream.hasNext()); } } @@ -267,11 +273,13 @@ public class TestWALEntryStream { /** * Tests that if writes come in while we have a stream open, we shouldn't miss them */ + @Test public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -294,7 +302,8 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -302,11 +311,12 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, lastPosition, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done - assertEquals(1, walQueue.size()); + assertEquals(1, getQueue().size()); } } @@ -314,19 +324,21 @@ public class TestWALEntryStream { * Tests that if we stop before hitting the end of a stream, we can continue where we left off * using the last position */ + @Test public void testPosition() throws Exception { long lastPosition = 0; appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, + log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, lastPosition, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -337,7 +349,8 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertFalse(entryStream.hasNext()); } } @@ -391,7 +404,8 @@ public class TestWALEntryStream { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source, + fakeWalGroupId); reader.start(); return reader; } @@ -402,7 +416,8 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -410,7 +425,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + Path walPath = getQueue().peek(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); @@ -430,7 +445,7 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderRecovered() throws Exception { appendEntriesToLogAndSync(10); - Path walPath = walQueue.peek(); + Path walPath = getQueue().peek(); log.rollWriter(); appendEntriesToLogAndSync(5); log.shutdown(); @@ -450,7 +465,7 @@ public class TestWALEntryStream { assertEquals(0, batch.getNbEntries()); assertTrue(batch.isEndOfFile()); - walPath = walQueue.peek(); + walPath = getQueue().peek(); batch = reader.take(); assertEquals(walPath, batch.getLastWalPath()); assertEquals(5, batch.getNbEntries()); @@ -463,7 +478,7 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderWrongPosition() throws Exception { appendEntriesToLogAndSync(1); - Path walPath = walQueue.peek(); + Path walPath = getQueue().peek(); log.rollWriter(); appendEntriesToLogAndSync(20); TEST_UTIL.waitFor(5000, new ExplainingPredicate() { @@ -490,7 +505,7 @@ public class TestWALEntryStream { assertEquals(1, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath2 = walQueue.peek(); + Path walPath2 = getQueue().peek(); entryBatch = reader.take(); assertEquals(walPath2, entryBatch.getLastWalPath()); assertEquals(20, entryBatch.getNbEntries()); @@ -503,7 +518,7 @@ public class TestWALEntryStream { assertEquals(0, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath3 = walQueue.peek(); + Path walPath3 = getQueue().peek(); entryBatch = reader.take(); assertEquals(walPath3, entryBatch.getLastWalPath()); assertEquals(10, entryBatch.getNbEntries()); @@ -517,7 +532,8 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -525,7 +541,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + Path walPath = getQueue().peek(); ReplicationSource source = mockReplicationSource(false, CONF); AtomicInteger invokeCount = new AtomicInteger(0); AtomicBoolean enabled = new AtomicBoolean(false); @@ -535,7 +551,8 @@ public class TestWALEntryStream { }); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(), + source, fakeWalGroupId); reader.start(); Future future = ForkJoinPool.commonPool().submit(() -> { return reader.take(); @@ -621,8 +638,8 @@ public class TestWALEntryStream { Path currentPath; @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - walQueue.add(newPath); + public void preLogRoll(Path oldPath, Path newPath) { + logQueue.enqueueLog(newPath, fakeWalGroupId); currentPath = newPath; } } @@ -631,10 +648,10 @@ public class TestWALEntryStream { public void testReadBeyondCommittedLength() throws IOException, InterruptedException { appendToLog("1"); appendToLog("2"); - long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); + long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0, - p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0, + p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); // can not get log 2 @@ -660,13 +677,11 @@ public class TestWALEntryStream { */ @Test public void testEOFExceptionForRecoveredQueue() throws Exception { - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); // Create a 0 length log. Path emptyLog = new Path("emptyLog"); FSDataOutputStream fsdos = fs.create(emptyLog); fsdos.close(); assertEquals(0, fs.getFileStatus(emptyLog).getLen()); - queue.add(emptyLog); Configuration conf = new Configuration(CONF); // Override the max retries multiplier to fail fast. @@ -675,11 +690,22 @@ public class TestWALEntryStream { // Create a reader thread with source as recovered source. ReplicationSource source = mockReplicationSource(true, conf); when(source.isPeerEnabled()).thenReturn(true); + + MetricsSource metrics = mock(MetricsSource.class); + doNothing().when(metrics).incrSizeOfLogQueue(); + doNothing().when(metrics).decrSizeOfLogQueue(); + ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); + localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, + getDummyFilter(), source, fakeWalGroupId); reader.run(); // ReplicationSourceWALReaderThread#handleEofException method will // remove empty log from logQueue. - assertEquals(0, queue.size()); + assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); + } + + private PriorityBlockingQueue getQueue() { + return logQueue.getQueue(fakeWalGroupId); } }