diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 25d72afabf1..a7cea254468 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -50,6 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs"; public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues"; public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues"; + /* Used to track the age of oldest wal in ms since its creation time */ + String OLDEST_WAL_AGE = "source.oldestWalAge"; void setLastShippedAge(long age); void incrSizeOfLogQueue(int size); @@ -74,5 +76,6 @@ public interface MetricsReplicationSourceSource extends BaseSource { void incrCompletedWAL(); void incrCompletedRecoveryQueue(); void incrFailedRecoveryQueue(); - + void setOldestWalAge(long age); + long getOldestWalAge(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 64585fac31e..df774d3100f 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -196,6 +196,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public void incrFailedRecoveryQueue() { failedRecoveryQueue.incr(1L); } + + @Override + public void setOldestWalAge(long age) { + // Not implemented + } + + @Override + public long getOldestWalAge() { + // Not implemented + return 0; + } + @Override public void init() { rms.init(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 0078a978763..c593950eba1 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -40,6 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logReadInBytesKey; private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; + private final String oldestWalAgeKey; private final MutableHistogram ageOfLastShippedOpHist; private final MutableGaugeLong sizeOfLogQueueGauge; @@ -67,6 +68,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter repeatedFileBytes; private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; + private final MutableGaugeLong oldestWalAge; public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; @@ -126,6 +128,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou completedRecoveryKey = this.keyPrefix + "completedRecoverQueues"; completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L); + + oldestWalAgeKey = this.keyPrefix + "oldestWalAge"; + oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L); } @Override public void setLastShippedAge(long age) { @@ -191,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(repeatedBytesKey); rms.removeMetric(completedLogsKey); rms.removeMetric(completedRecoveryKey); + rms.removeMetric(oldestWalAgeKey); } @Override @@ -256,6 +262,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou @Override public void incrFailedRecoveryQueue() {/*no op*/} + @Override public void setOldestWalAge(long age) { + oldestWalAge.set(age); + } + + @Override public long getOldestWalAge() { + return oldestWalAge.value(); + } + @Override public void init() { rms.init(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 5dd17e76c4e..83bc6534432 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -395,6 +395,17 @@ public class MetricsSource implements BaseSource { globalSourceSource.updateHistogram(name, value); } + /* + Sets the age of oldest log file just for source. + */ + public void setOldestWalAge(long age) { + singleSourceSource.setOldestWalAge(age); + } + + public long getOldestWalAge() { + return singleSourceSource.getOldestWalAge(); + } + @Override public String getMetricsContext() { return globalSourceSource.getMetricsContext(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7be880d903d..a58289e6baa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -90,10 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ReplicationSource extends Thread implements ReplicationSourceInterface { private static final Log LOG = LogFactory.getLog(ReplicationSource.class); - // Queues of logs to process, entry in format of walGroupId->queue, - // each presents a queue for one wal group - private Map> queues = - new HashMap>(); + protected ReplicationSourceLogQueue logQueue; // per group queue size, keep no more than this number of logs in each wal group private int queueSizePerGroup; private ReplicationQueues replicationQueues; @@ -126,8 +122,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private volatile boolean sourceRunning = false; // Metrics for this source private MetricsSource metrics; - //WARN threshold for the number of queued logs, defaults to 2 - private int logQueueWarnThreshold; // ReplicationEndpoint which will handle the actual replication private ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. @@ -176,6 +170,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); + this.logQueue = new ReplicationSourceLogQueue(conf, metrics); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -187,7 +182,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); - this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); @@ -208,16 +202,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void enqueueLog(Path log) { String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); - if (queue == null) { - queue = new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - queues.put(logPrefix, queue); + boolean queueExists = logQueue.enqueueLog(log, logPrefix); + if (!queueExists) { if (this.sourceRunning) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker final ReplicationSourceShipperThread worker = - new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this); + new ReplicationSourceShipperThread(logPrefix, logQueue, replicationQueueInfo, this); ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix); @@ -227,14 +219,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } } - queue.put(log); - this.metrics.incrSizeOfLogQueue(); - // This will log a warning for each new log that gets created above the warn threshold - int queueSize = queue.size(); - if (queueSize > this.logQueueWarnThreshold) { - LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize - + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); - } } @Override @@ -326,11 +310,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); // start workers - for (Map.Entry> entry : queues.entrySet()) { + for (Map.Entry> entry : logQueue.getQueues().entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue queue = entry.getValue(); final ReplicationSourceShipperThread worker = - new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this); + new ReplicationSourceShipperThread(walGroupId, logQueue, replicationQueueInfo, this); ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); @@ -483,7 +467,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @param p path to split * @return start time */ - private static long getTS(Path p) { + public static long getTS(Path p) { int tsIndex = p.getName().lastIndexOf('.') + 1; return Long.parseLong(p.getName().substring(tsIndex)); } @@ -530,7 +514,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf String walGroupId = worker.getWalGroupId(); lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); - int queueSize = queues.get(walGroupId).size(); + int queueSize = logQueue.getQueueSize(walGroupId); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); Path currentPath = worker.getLastLoggedPath(); @@ -566,7 +550,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public class ReplicationSourceShipperThread extends Thread { ReplicationSourceInterface source; String walGroupId; - PriorityBlockingQueue queue; + ReplicationSourceLogQueue logQueue; ReplicationQueueInfo replicationQueueInfo; // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; @@ -577,10 +561,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf ReplicationSourceWALReaderThread entryReader; public ReplicationSourceShipperThread(String walGroupId, - PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, + ReplicationSourceLogQueue logQueue, ReplicationQueueInfo replicationQueueInfo, ReplicationSourceInterface source) { this.walGroupId = walGroupId; - this.queue = queue; + this.logQueue = logQueue; this.replicationQueueInfo = replicationQueueInfo; this.source = source; } @@ -842,11 +826,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // normally has a position (unless the RS failed between 2 logs) private long getRecoveredQueueStartPos(long startPosition) { try { - startPosition = - (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName())); + startPosition = (replicationQueues.getLogPosition(peerClusterZnode, + this.logQueue.getQueue(walGroupId).peek().getName())); if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " - + startPosition); + LOG.trace("Recovered queue started with log " + + this.logQueue.getQueue(walGroupId).peek() + " at position " + startPosition); } } catch (ReplicationException e) { terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); @@ -860,8 +844,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf ArrayList filters = Lists.newArrayList(walEntryFilter, new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); - entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue, - startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this); + entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, logQueue, + startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this, + this.walGroupId); Threads.setDaemonThreadRunning(entryReader, threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, handler); @@ -873,6 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf boolean hasPathChanged = false; PriorityBlockingQueue newPaths = new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); pathsLoop: for (Path path : queue) { if (fs.exists(path)) { // still in same location, don't need to do anything newPaths.add(path); @@ -922,9 +908,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // put the correct locations in the queue // since this is a recovered queue with no new incoming logs, // there shouldn't be any concurrency issues - queue.clear(); + logQueue.clear(walGroupId); for (Path path : newPaths) { - queue.add(path); + logQueue.enqueueLog(path, walGroupId); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java new file mode 100644 index 00000000000..1cc8cb99c4c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics + just at one place. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplicationSourceLogQueue { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); + // Queues of logs to process, entry in format of walGroupId->queue, + // each presents a queue for one wal group + private Map> queues = new ConcurrentHashMap<>(); + private MetricsSource metrics; + private Configuration conf; + // per group queue size, keep no more than this number of logs in each wal group + private int queueSizePerGroup; + // WARN threshold for the number of queued logs, defaults to 2 + private int logQueueWarnThreshold; + + public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics) { + this.conf = conf; + this.metrics = metrics; + this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); + this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + } + + /** + * Enqueue the wal + * @param wal wal to be enqueued + * @param walGroupId Key for the wal in @queues map + * @return boolean whether this is the first time we are seeing this walGroupId. + */ + public boolean enqueueLog(Path wal, String walGroupId) { + boolean exists = false; + PriorityBlockingQueue queue = queues.get(walGroupId); + if (queue == null) { + queue = new PriorityBlockingQueue<>(queueSizePerGroup, + new ReplicationSource.LogsComparator()); + // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise + // the shipper may quit immediately + queue.put(wal); + queues.put(walGroupId, queue); + } else { + exists = true; + queue.put(wal); + } + // Increment size of logQueue + this.metrics.incrSizeOfLogQueue(); + // Compute oldest wal age + this.metrics.setOldestWalAge(getOldestWalAge()); + // This will wal a warning for each new wal that gets created above the warn threshold + int queueSize = queue.size(); + if (queueSize > this.logQueueWarnThreshold) { + LOG.warn("WAL group " + walGroupId + " queue size: " + queueSize + + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + } + return exists; + } + + /** + * Get the queue size for the given walGroupId. + * @param walGroupId walGroupId + */ + public int getQueueSize(String walGroupId) { + Queue queue = queues.get(walGroupId); + if (queue == null) { + return 0; + } + return queue.size(); + } + + /** + * Returns number of queues. + */ + public int getNumQueues() { + return queues.size(); + } + + public Map> getQueues() { + return queues; + } + + /** + * Return queue for the given walGroupId + * Please don't add or remove elements from the returned queue. + * Use @enqueueLog and @remove methods respectively. + * @param walGroupId walGroupId + */ + public PriorityBlockingQueue getQueue(String walGroupId) { + return queues.get(walGroupId); + } + + /** + * Remove head from the queue corresponding to given walGroupId. + * @param walGroupId walGroupId + */ + public void remove(String walGroupId) { + PriorityBlockingQueue queue = getQueue(walGroupId); + if (queue == null || queue.isEmpty()) { + return; + } + queue.remove(); + // Decrease size logQueue. + this.metrics.decrSizeOfLogQueue(); + // Re-compute age of oldest wal metric. + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /** + * Remove all the elements from the queue corresponding to walGroupId + * @param walGroupId walGroupId + */ + public void clear(String walGroupId) { + PriorityBlockingQueue queue = getQueue(walGroupId); + while (!queue.isEmpty()) { + // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1. + queue.remove(); + metrics.decrSizeOfLogQueue(); + } + this.metrics.setOldestWalAge(getOldestWalAge()); + } + + /* + Returns the age of oldest wal. + */ + long getOldestWalAge() { + long now = EnvironmentEdgeManager.currentTime(); + long timestamp = getOldestWalTimestamp(); + if (timestamp == Long.MAX_VALUE) { + // If there are no wals in the queue then set the oldest wal timestamp to current time + // so that the oldest wal age will be 0. + timestamp = now; + } + long age = now - timestamp; + return age; + } + + /* + Get the oldest wal timestamp from all the queues. + */ + private long getOldestWalTimestamp() { + long oldestWalTimestamp = Long.MAX_VALUE; + for (Map.Entry> entry : queues.entrySet()) { + PriorityBlockingQueue queue = entry.getValue(); + Path path = queue.peek(); + // Can path ever be null ? + if (path != null) { + oldestWalTimestamp = Math.min(oldestWalTimestamp, + ReplicationSource.LogsComparator.getTS(path)); + } + } + return oldestWalTimestamp; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index b67ff53cf3b..bd155d5642b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.wal.WALKey; public class ReplicationSourceWALReaderThread extends Thread { private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class); - private PriorityBlockingQueue logQueue; + private ReplicationSourceLogQueue logQueue; private FileSystem fs; private Configuration conf; private BlockingQueue entryBatchQueue; @@ -79,6 +79,7 @@ public class ReplicationSourceWALReaderThread extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private final String walGroupId; private ReplicationSource source; private ReplicationSourceManager manager; @@ -96,12 +97,13 @@ public class ReplicationSourceWALReaderThread extends Thread { * @param metrics replication metrics */ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, - ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue logQueue, + ReplicationQueueInfo replicationQueueInfo, ReplicationSourceLogQueue logQueue, long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter, - MetricsSource metrics, ReplicationSource source) { + MetricsSource metrics, ReplicationSource source, String walGroupId) { this.replicationQueueInfo = replicationQueueInfo; this.logQueue = logQueue; - this.lastReadPath = logQueue.peek(); + this.walGroupId = walGroupId; + this.lastReadPath = logQueue.getQueue(walGroupId).peek(); this.lastReadPosition = startPosition; this.fs = fs; this.conf = conf; @@ -135,7 +137,7 @@ public class ReplicationSourceWALReaderThread extends Thread { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) { + new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { Threads.sleep(sleepForRetries); @@ -232,24 +234,26 @@ public class ReplicationSourceWALReaderThread extends Thread { // enabled, then dump the log private void handleEofException(Exception e) { boolean isRecoveredSource = manager.getOldSources().contains(source); + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't // add current log to recovered source queue so it is safe to remove. - if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1) + if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1) && conf.getBoolean("replication.source.eof.autorecovery", false)) { try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - lastReadPath = logQueue.remove(); + if (fs.getFileStatus(queue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek()); + lastReadPath = queue.peek(); + logQueue.remove(walGroupId); lastReadPosition = 0; } } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + LOG.warn("Couldn't get file length information about log " + queue.peek()); } } } public Path getCurrentPath() { - return logQueue.peek(); + return logQueue.getQueue(walGroupId).peek(); } //returns false if we've already exceeded the global quota diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 7a440750f63..a0b09dd894a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -59,7 +59,8 @@ public class WALEntryStream implements Iterator, Closeable, Iterable logQueue; + private final ReplicationSourceLogQueue logQueue; + private final String walGroupId; private FileSystem fs; private Configuration conf; private MetricsSource metrics; @@ -70,12 +71,13 @@ public class WALEntryStream implements Iterator, Closeable, Iterable logQueue, FileSystem fs, Configuration conf, - MetricsSource metrics) + public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf, + MetricsSource metrics, String walGroupId) throws IOException { - this(logQueue, fs, conf, 0, metrics); + this(logQueue, fs, conf, 0, metrics, walGroupId); } /** @@ -83,18 +85,18 @@ public class WALEntryStream implements Iterator, Closeable, Iterable logQueue, FileSystem fs, Configuration conf, - long startPosition, MetricsSource metrics) throws IOException { + public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf, + long startPosition, MetricsSource metrics, String walGroupId) throws IOException { this.logQueue = logQueue; this.fs = fs; this.conf = conf; this.currentPosition = startPosition; this.metrics = metrics; + this.walGroupId = walGroupId; } /** @@ -198,7 +200,7 @@ public class WALEntryStream implements Iterator, Closeable, Iterable 1) { // log was rolled + if (logQueue.getQueue(walGroupId).size() > 1) { // log was rolled // Before dequeueing, we should always get one more attempt at reading. // This is in case more entries came in after we opened the reader, // and a new log was enqueued while we were reading. See HBASE-6758 @@ -266,7 +268,7 @@ public class WALEntryStream implements Iterator, Closeable, Iterable, Closeable, Iterable queue = logQueue.getQueue(walGroupId); + Path nextPath = queue.peek(); if (nextPath != null) { openReader(nextPath); if (reader != null) return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index f85a52b74db..ce185f480c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -52,6 +52,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; @@ -76,7 +79,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -299,6 +304,15 @@ public class TestReplicationSource { return source; } + ReplicationSource createReplicationSourceWithMocks(MetricsSource metrics, + ReplicationEndpoint endpoint) throws IOException { + final ReplicationSource source = new ReplicationSource(); + endpoint.init(context); + source.init(conf, FS, manager, queues, peers, mock(Stoppable.class), + "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics); + return source; + } + public AtomicLong getTotalBufferUsed() { return totalBufferUsed; } @@ -648,5 +662,44 @@ public class TestReplicationSource { } } -} + /* + Test age of oldest wal metric. + */ + @Test + public void testAgeOfOldestWal() throws Exception { + try { + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + String id = "1"; + MetricsSource metrics = new MetricsSource(id); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt("replication.source.maxretriesmultiplier", 1); + Mocks mocks = new Mocks(); + ReplicationEndpoint endpoint = mock(ReplicationEndpoint.class); + ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics, endpoint); + + final Path log1 = new Path(logDir, "log-walgroup-a.8"); + manualEdge.setValue(10); + // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. + source.enqueueLog(log1); + MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); + assertEquals(2, metricsSource1.getOldestWalAge()); + + final Path log2 = new Path(logDir, "log-walgroup-b.4"); + // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 + source.enqueueLog(log2); + assertEquals(6, metricsSource1.getOldestWalAge()); + // Clear all metrics. + metrics.clear(); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { + MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class); + return factory.getSource(sourceId); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java new file mode 100644 index 00000000000..57c97b8915b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class,ReplicationTests.class}) +public class TestReplicationSourceLogQueue { + + /* + Testing enqueue and dequeuing of wal and check age of oldest wal. + */ + @Test + public void testEnqueueDequeue() { + try { + String walGroupId1 = "fake-walgroup-id-1"; + String walGroupId2 = "fake-walgroup-id-2"; + + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(manualEdge); + + MetricsSource metrics = new MetricsSource("1"); + Configuration conf = HBaseConfiguration.create(); + ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics); + final Path log1 = new Path("log-walgroup-a.8"); + manualEdge.setValue(10); + // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. + logQueue.enqueueLog(log1, walGroupId1); + assertEquals(2, logQueue.getOldestWalAge()); + + final Path log2 = new Path("log-walgroup-b.4"); + // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 + logQueue.enqueueLog(log2, walGroupId2); + assertEquals(6, logQueue.getOldestWalAge()); + + // Remove an element from walGroupId2. + // After this op, there will be only one element in the queue log-walgroup-a.8 + logQueue.remove(walGroupId2); + assertEquals(2, logQueue.getOldestWalAge()); + + // Remove last element from the queue. + logQueue.remove(walGroupId1); + // This will test the case where there are no elements in the queue. + assertEquals(0, logQueue.getOldestWalAge()); + } finally { + EnvironmentEdgeManager.reset(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index eaf7e0a316a..adf427b39e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALRead import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; @@ -109,9 +111,10 @@ public class TestWALEntryStream { new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); private static final HTableDescriptor htd = new HTableDescriptor(tableName); private static NavigableMap scopes; + private final String fakeWalGroupId = "fake-wal-group-id"; private WAL log; - PriorityBlockingQueue walQueue; + ReplicationSourceLogQueue logQueue; private PathWatcher pathWatcher; @Rule @@ -139,7 +142,7 @@ public class TestWALEntryStream { @Before public void setUp() throws Exception { - walQueue = new PriorityBlockingQueue<>(); + logQueue = new ReplicationSourceLogQueue(conf, new MetricsSource("2")); List listeners = new ArrayList(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); @@ -174,7 +177,7 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) { int i = 0; for (WAL.Entry e : entryStream) { assertNotNull(e); @@ -202,7 +205,7 @@ public class TestWALEntryStream { long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.next(); @@ -220,7 +223,7 @@ public class TestWALEntryStream { appendToLog(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -234,7 +237,7 @@ public class TestWALEntryStream { appendToLog(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -259,7 +262,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -267,12 +270,12 @@ public class TestWALEntryStream { appendToLog("4"); // 4 - this append is in the rolled log assertEquals("2", getRow(entryStream.next())); - assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an + assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an // entry in first log assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 // and 3 would be skipped assertEquals("4", getRow(entryStream.next())); // 4 - assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly + assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly assertFalse(entryStream.hasNext()); } } @@ -284,7 +287,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -307,7 +310,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -315,11 +318,12 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"), + fakeWalGroupId)) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done - assertEquals(1, walQueue.size()); + assertEquals(1, getQueue().size()); } } @@ -333,13 +337,15 @@ public class TestWALEntryStream { appendEntriesToLog(3); // read only one element try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"), + fakeWalGroupId)) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"), + fakeWalGroupId)) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -350,7 +356,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) { assertFalse(entryStream.hasNext()); } } @@ -361,7 +367,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -374,9 +380,9 @@ public class TestWALEntryStream { when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); ReplicationSourceWALReaderThread batcher = - new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, - fs, conf, getDummyFilter(), new MetricsSource("1"), source); - Path walPath = walQueue.peek(); + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),logQueue, 0, + fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId); + Path walPath = getQueue().peek(); batcher.start(); WALEntryBatch entryBatch = batcher.take(); @@ -400,8 +406,13 @@ public class TestWALEntryStream { appendEntriesToLog(2); long position; - try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue), - fs, conf, new MetricsSource("1"))) { + ReplicationSourceLogQueue tempQueue = new ReplicationSourceLogQueue(conf, + getMockMetrics()); + for (Path path : getQueue()) { + tempQueue.enqueueLog(path, fakeWalGroupId); + } + try (WALEntryStream entryStream = new WALEntryStream(tempQueue, + fs, conf, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -416,9 +427,9 @@ public class TestWALEntryStream { when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); ReplicationSourceWALReaderThread reader = new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), - walQueue, 0, fs, conf, getDummyFilter(), - new MetricsSource("1"), source); - Path walPath = walQueue.toArray(new Path[2])[1]; + logQueue, 0, fs, conf, getDummyFilter(), + new MetricsSource("1"), source, fakeWalGroupId); + Path walPath = getQueue().toArray(new Path[2])[1]; reader.start(); WALEntryBatch entryBatch = reader.take(); @@ -476,8 +487,8 @@ public class TestWALEntryStream { when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); final ReplicationSourceWALReaderThread reader = - new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, - 0, fs, conf, filter, new MetricsSource("1"), source); + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue, + 0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId); reader.start(); WALEntryBatch entryBatch = reader.take(); @@ -504,7 +515,7 @@ public class TestWALEntryStream { appendToLogPlus(3, notReplicatedCf); - Path firstWAL = walQueue.peek(); + Path firstWAL = getQueue().peek(); final long eof = getPosition(firstWAL); ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); @@ -512,8 +523,8 @@ public class TestWALEntryStream { when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); final ReplicationSourceWALReaderThread reader = - new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, - 0, fs, conf, filter, new MetricsSource("1"), source); + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue, + 0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId); reader.start(); // reader won't put any batch, even if EOF reached. @@ -529,21 +540,24 @@ public class TestWALEntryStream { // should get empty batch with current wal position, after wal rolled WALEntryBatch entryBatch = reader.take(); - Path lastWAL= walQueue.peek(); + Path lastWAL= getQueue().peek(); long positionToBeLogged = getPosition(lastWAL); assertNotNull(entryBatch); assertTrue(entryBatch.isEmpty()); - assertEquals(1, walQueue.size()); + assertEquals(1, getQueue().size()); assertNotEquals(firstWAL, entryBatch.getLastWalPath()); assertEquals(lastWAL, entryBatch.getLastWalPath()); assertEquals(positionToBeLogged, entryBatch.getLastWalPosition()); } private long getPosition(Path walPath) throws IOException { + ReplicationSourceLogQueue tempQueue = + new ReplicationSourceLogQueue(conf, getMockMetrics()); + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(walPath.getName()); + tempQueue.enqueueLog(walPath, walPrefix); WALEntryStream entryStream = - new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)), - fs, conf, new MetricsSource("1")); + new WALEntryStream(tempQueue, fs, conf, getMockMetrics(), walPrefix); entryStream.hasNext(); return entryStream.getPosition(); } @@ -628,8 +642,8 @@ public class TestWALEntryStream { Path currentPath; @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - walQueue.add(newPath); + public void preLogRoll(Path oldPath, Path newPath) { + logQueue.enqueueLog(newPath, fakeWalGroupId); currentPath = newPath; } } @@ -644,7 +658,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -652,7 +666,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + Path walPath = getQueue().peek(); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); @@ -667,8 +681,8 @@ public class TestWALEntryStream { ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); final ReplicationSourceWALReaderThread reader = - new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, - 0, fs, conf, getDummyFilter(), new MetricsSource("1"), source); + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue, + 0, fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId); reader.start(); Future future = @@ -701,13 +715,11 @@ public class TestWALEntryStream { */ @Test public void testEOFExceptionForRecoveredQueue() throws Exception { - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); // Create a 0 length log. - Path emptyLog = new Path("emptyLog"); + Path emptyLog = new Path("emptyLog.1"); FSDataOutputStream fsdos = fs.create(emptyLog); fsdos.close(); assertEquals(0, fs.getFileStatus(emptyLog).getLen()); - queue.add(emptyLog); ReplicationSource source = Mockito.mock(ReplicationSource.class); @@ -720,14 +732,29 @@ public class TestWALEntryStream { // Override the max retries multiplier to fail fast. conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setBoolean("replication.source.eof.autorecovery", true); + + ReplicationSourceLogQueue localLogQueue = + new ReplicationSourceLogQueue(conf, getMockMetrics()); + localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); // Create a reader thread. ReplicationSourceWALReaderThread reader = - new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), - queue, 0, fs, conf, getDummyFilter(), - new MetricsSource("1"), (ReplicationSource) source); + new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), + localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId); reader.run(); // ReplicationSourceWALReaderThread#handleEofException method will // remove empty log from logQueue. - assertEquals(0, queue.size()); + assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); + } + + private PriorityBlockingQueue getQueue() { + return logQueue.getQueue(fakeWalGroupId); + } + + private MetricsSource getMockMetrics() { + MetricsSource source = mock(MetricsSource.class); + doNothing().when(source).incrSizeOfLogQueue(); + doNothing().when(source).decrSizeOfLogQueue(); + doNothing().when(source).setOldestWalAge(Mockito.anyInt()); + return source; } }