[HBASE-25539] Add age of oldest wal metric (#2963)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
f9a91488b2
commit
9e9301a242
|
@ -50,6 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
|
||||
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
|
||||
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
|
||||
/* Used to track the age of oldest wal in ms since its creation time */
|
||||
String OLDEST_WAL_AGE = "source.oldestWalAge";
|
||||
|
||||
void setLastShippedAge(long age);
|
||||
void incrSizeOfLogQueue(int size);
|
||||
|
@ -74,5 +76,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
void incrCompletedWAL();
|
||||
void incrCompletedRecoveryQueue();
|
||||
void incrFailedRecoveryQueue();
|
||||
|
||||
void setOldestWalAge(long age);
|
||||
long getOldestWalAge();
|
||||
}
|
||||
|
|
|
@ -196,6 +196,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
|||
public void incrFailedRecoveryQueue() {
|
||||
failedRecoveryQueue.incr(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOldestWalAge(long age) {
|
||||
// Not implemented
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOldestWalAge() {
|
||||
// Not implemented
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
|
|
|
@ -40,6 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final String logReadInBytesKey;
|
||||
private final String shippedHFilesKey;
|
||||
private final String sizeOfHFileRefsQueueKey;
|
||||
private final String oldestWalAgeKey;
|
||||
|
||||
private final MutableHistogram ageOfLastShippedOpHist;
|
||||
private final MutableGaugeLong sizeOfLogQueueGauge;
|
||||
|
@ -67,6 +68,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final MutableFastCounter repeatedFileBytes;
|
||||
private final MutableFastCounter completedWAL;
|
||||
private final MutableFastCounter completedRecoveryQueue;
|
||||
private final MutableGaugeLong oldestWalAge;
|
||||
|
||||
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
|
||||
this.rms = rms;
|
||||
|
@ -126,6 +128,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
|
||||
completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
|
||||
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
|
||||
|
||||
oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
|
||||
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
|
||||
}
|
||||
|
||||
@Override public void setLastShippedAge(long age) {
|
||||
|
@ -191,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
rms.removeMetric(repeatedBytesKey);
|
||||
rms.removeMetric(completedLogsKey);
|
||||
rms.removeMetric(completedRecoveryKey);
|
||||
rms.removeMetric(oldestWalAgeKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -256,6 +262,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
@Override
|
||||
public void incrFailedRecoveryQueue() {/*no op*/}
|
||||
|
||||
@Override public void setOldestWalAge(long age) {
|
||||
oldestWalAge.set(age);
|
||||
}
|
||||
|
||||
@Override public long getOldestWalAge() {
|
||||
return oldestWalAge.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
|
|
|
@ -395,6 +395,17 @@ public class MetricsSource implements BaseSource {
|
|||
globalSourceSource.updateHistogram(name, value);
|
||||
}
|
||||
|
||||
/*
|
||||
Sets the age of oldest log file just for source.
|
||||
*/
|
||||
public void setOldestWalAge(long age) {
|
||||
singleSourceSource.setOldestWalAge(age);
|
||||
}
|
||||
|
||||
public long getOldestWalAge() {
|
||||
return singleSourceSource.getOldestWalAge();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsContext() {
|
||||
return globalSourceSource.getMetricsContext();
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
@ -90,10 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
|
||||
// Queues of logs to process, entry in format of walGroupId->queue,
|
||||
// each presents a queue for one wal group
|
||||
private Map<String, PriorityBlockingQueue<Path>> queues =
|
||||
new HashMap<String, PriorityBlockingQueue<Path>>();
|
||||
protected ReplicationSourceLogQueue logQueue;
|
||||
// per group queue size, keep no more than this number of logs in each wal group
|
||||
private int queueSizePerGroup;
|
||||
private ReplicationQueues replicationQueues;
|
||||
|
@ -126,8 +122,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
private volatile boolean sourceRunning = false;
|
||||
// Metrics for this source
|
||||
private MetricsSource metrics;
|
||||
//WARN threshold for the number of queued logs, defaults to 2
|
||||
private int logQueueWarnThreshold;
|
||||
// ReplicationEndpoint which will handle the actual replication
|
||||
private ReplicationEndpoint replicationEndpoint;
|
||||
// A filter (or a chain of filters) for the WAL entries.
|
||||
|
@ -176,6 +170,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
this.maxRetriesMultiplier =
|
||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.logQueue = new ReplicationSourceLogQueue(conf, metrics);
|
||||
this.replicationQueues = replicationQueues;
|
||||
this.replicationPeers = replicationPeers;
|
||||
this.manager = manager;
|
||||
|
@ -187,7 +182,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
|
||||
// ReplicationQueueInfo parses the peerId out of the znode for us
|
||||
this.peerId = this.replicationQueueInfo.getPeerId();
|
||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||
this.replicationEndpoint = replicationEndpoint;
|
||||
|
||||
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||
|
@ -208,16 +202,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
@Override
|
||||
public void enqueueLog(Path log) {
|
||||
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
|
||||
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
|
||||
if (queue == null) {
|
||||
queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
|
||||
queues.put(logPrefix, queue);
|
||||
boolean queueExists = logQueue.enqueueLog(log, logPrefix);
|
||||
if (!queueExists) {
|
||||
if (this.sourceRunning) {
|
||||
// new wal group observed after source startup, start a new worker thread to track it
|
||||
// notice: it's possible that log enqueued when this.running is set but worker thread
|
||||
// still not launched, so it's necessary to check workerThreads before start the worker
|
||||
final ReplicationSourceShipperThread worker =
|
||||
new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
|
||||
new ReplicationSourceShipperThread(logPrefix, logQueue, replicationQueueInfo, this);
|
||||
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
|
||||
if (extant != null) {
|
||||
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
|
||||
|
@ -227,14 +219,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
}
|
||||
}
|
||||
queue.put(log);
|
||||
this.metrics.incrSizeOfLogQueue();
|
||||
// This will log a warning for each new log that gets created above the warn threshold
|
||||
int queueSize = queue.size();
|
||||
if (queueSize > this.logQueueWarnThreshold) {
|
||||
LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
|
||||
+ " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -326,11 +310,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
|
||||
// start workers
|
||||
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
|
||||
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : logQueue.getQueues().entrySet()) {
|
||||
String walGroupId = entry.getKey();
|
||||
PriorityBlockingQueue<Path> queue = entry.getValue();
|
||||
final ReplicationSourceShipperThread worker =
|
||||
new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
|
||||
new ReplicationSourceShipperThread(walGroupId, logQueue, replicationQueueInfo, this);
|
||||
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
|
||||
if (extant != null) {
|
||||
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
|
||||
|
@ -483,7 +467,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
* @param p path to split
|
||||
* @return start time
|
||||
*/
|
||||
private static long getTS(Path p) {
|
||||
public static long getTS(Path p) {
|
||||
int tsIndex = p.getName().lastIndexOf('.') + 1;
|
||||
return Long.parseLong(p.getName().substring(tsIndex));
|
||||
}
|
||||
|
@ -530,7 +514,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
String walGroupId = worker.getWalGroupId();
|
||||
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
|
||||
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
|
||||
int queueSize = queues.get(walGroupId).size();
|
||||
int queueSize = logQueue.getQueueSize(walGroupId);
|
||||
replicationDelay =
|
||||
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
|
||||
Path currentPath = worker.getLastLoggedPath();
|
||||
|
@ -566,7 +550,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
public class ReplicationSourceShipperThread extends Thread {
|
||||
ReplicationSourceInterface source;
|
||||
String walGroupId;
|
||||
PriorityBlockingQueue<Path> queue;
|
||||
ReplicationSourceLogQueue logQueue;
|
||||
ReplicationQueueInfo replicationQueueInfo;
|
||||
// Last position in the log that we sent to ZooKeeper
|
||||
private long lastLoggedPosition = -1;
|
||||
|
@ -577,10 +561,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
ReplicationSourceWALReaderThread entryReader;
|
||||
|
||||
public ReplicationSourceShipperThread(String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
|
||||
ReplicationSourceLogQueue logQueue, ReplicationQueueInfo replicationQueueInfo,
|
||||
ReplicationSourceInterface source) {
|
||||
this.walGroupId = walGroupId;
|
||||
this.queue = queue;
|
||||
this.logQueue = logQueue;
|
||||
this.replicationQueueInfo = replicationQueueInfo;
|
||||
this.source = source;
|
||||
}
|
||||
|
@ -842,11 +826,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
// normally has a position (unless the RS failed between 2 logs)
|
||||
private long getRecoveredQueueStartPos(long startPosition) {
|
||||
try {
|
||||
startPosition =
|
||||
(replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
|
||||
startPosition = (replicationQueues.getLogPosition(peerClusterZnode,
|
||||
this.logQueue.getQueue(walGroupId).peek().getName()));
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
|
||||
+ startPosition);
|
||||
LOG.trace("Recovered queue started with log " +
|
||||
this.logQueue.getQueue(walGroupId).peek() + " at position " + startPosition);
|
||||
}
|
||||
} catch (ReplicationException e) {
|
||||
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
|
||||
|
@ -860,8 +844,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
|
||||
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
|
||||
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
|
||||
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
|
||||
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
|
||||
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, logQueue,
|
||||
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this,
|
||||
this.walGroupId);
|
||||
Threads.setDaemonThreadRunning(entryReader, threadName
|
||||
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
|
||||
handler);
|
||||
|
@ -873,6 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
boolean hasPathChanged = false;
|
||||
PriorityBlockingQueue<Path> newPaths =
|
||||
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
pathsLoop: for (Path path : queue) {
|
||||
if (fs.exists(path)) { // still in same location, don't need to do anything
|
||||
newPaths.add(path);
|
||||
|
@ -922,9 +908,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
// put the correct locations in the queue
|
||||
// since this is a recovered queue with no new incoming logs,
|
||||
// there shouldn't be any concurrency issues
|
||||
queue.clear();
|
||||
logQueue.clear(walGroupId);
|
||||
for (Path path : newPaths) {
|
||||
queue.add(path);
|
||||
logQueue.enqueueLog(path, walGroupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/*
|
||||
Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
|
||||
just at one place.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ReplicationSourceLogQueue {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
|
||||
// Queues of logs to process, entry in format of walGroupId->queue,
|
||||
// each presents a queue for one wal group
|
||||
private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
|
||||
private MetricsSource metrics;
|
||||
private Configuration conf;
|
||||
// per group queue size, keep no more than this number of logs in each wal group
|
||||
private int queueSizePerGroup;
|
||||
// WARN threshold for the number of queued logs, defaults to 2
|
||||
private int logQueueWarnThreshold;
|
||||
|
||||
public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics) {
|
||||
this.conf = conf;
|
||||
this.metrics = metrics;
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the wal
|
||||
* @param wal wal to be enqueued
|
||||
* @param walGroupId Key for the wal in @queues map
|
||||
* @return boolean whether this is the first time we are seeing this walGroupId.
|
||||
*/
|
||||
public boolean enqueueLog(Path wal, String walGroupId) {
|
||||
boolean exists = false;
|
||||
PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
|
||||
if (queue == null) {
|
||||
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
|
||||
new ReplicationSource.LogsComparator());
|
||||
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
||||
// the shipper may quit immediately
|
||||
queue.put(wal);
|
||||
queues.put(walGroupId, queue);
|
||||
} else {
|
||||
exists = true;
|
||||
queue.put(wal);
|
||||
}
|
||||
// Increment size of logQueue
|
||||
this.metrics.incrSizeOfLogQueue();
|
||||
// Compute oldest wal age
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
// This will wal a warning for each new wal that gets created above the warn threshold
|
||||
int queueSize = queue.size();
|
||||
if (queueSize > this.logQueueWarnThreshold) {
|
||||
LOG.warn("WAL group " + walGroupId + " queue size: " + queueSize
|
||||
+ " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
|
||||
}
|
||||
return exists;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue size for the given walGroupId.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public int getQueueSize(String walGroupId) {
|
||||
Queue queue = queues.get(walGroupId);
|
||||
if (queue == null) {
|
||||
return 0;
|
||||
}
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of queues.
|
||||
*/
|
||||
public int getNumQueues() {
|
||||
return queues.size();
|
||||
}
|
||||
|
||||
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return queue for the given walGroupId
|
||||
* Please don't add or remove elements from the returned queue.
|
||||
* Use @enqueueLog and @remove methods respectively.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
|
||||
return queues.get(walGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove head from the queue corresponding to given walGroupId.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public void remove(String walGroupId) {
|
||||
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
queue.remove();
|
||||
// Decrease size logQueue.
|
||||
this.metrics.decrSizeOfLogQueue();
|
||||
// Re-compute age of oldest wal metric.
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all the elements from the queue corresponding to walGroupId
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public void clear(String walGroupId) {
|
||||
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
|
||||
while (!queue.isEmpty()) {
|
||||
// Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
|
||||
queue.remove();
|
||||
metrics.decrSizeOfLogQueue();
|
||||
}
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
}
|
||||
|
||||
/*
|
||||
Returns the age of oldest wal.
|
||||
*/
|
||||
long getOldestWalAge() {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long timestamp = getOldestWalTimestamp();
|
||||
if (timestamp == Long.MAX_VALUE) {
|
||||
// If there are no wals in the queue then set the oldest wal timestamp to current time
|
||||
// so that the oldest wal age will be 0.
|
||||
timestamp = now;
|
||||
}
|
||||
long age = now - timestamp;
|
||||
return age;
|
||||
}
|
||||
|
||||
/*
|
||||
Get the oldest wal timestamp from all the queues.
|
||||
*/
|
||||
private long getOldestWalTimestamp() {
|
||||
long oldestWalTimestamp = Long.MAX_VALUE;
|
||||
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
|
||||
PriorityBlockingQueue<Path> queue = entry.getValue();
|
||||
Path path = queue.peek();
|
||||
// Can path ever be null ?
|
||||
if (path != null) {
|
||||
oldestWalTimestamp = Math.min(oldestWalTimestamp,
|
||||
ReplicationSource.LogsComparator.getTS(path));
|
||||
}
|
||||
}
|
||||
return oldestWalTimestamp;
|
||||
}
|
||||
}
|
|
@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
|
|||
public class ReplicationSourceWALReaderThread extends Thread {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
|
||||
|
||||
private PriorityBlockingQueue<Path> logQueue;
|
||||
private ReplicationSourceLogQueue logQueue;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private BlockingQueue<WALEntryBatch> entryBatchQueue;
|
||||
|
@ -79,6 +79,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
|
||||
private AtomicLong totalBufferUsed;
|
||||
private long totalBufferQuota;
|
||||
private final String walGroupId;
|
||||
|
||||
private ReplicationSource source;
|
||||
private ReplicationSourceManager manager;
|
||||
|
@ -96,12 +97,13 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
* @param metrics replication metrics
|
||||
*/
|
||||
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
|
||||
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
|
||||
ReplicationQueueInfo replicationQueueInfo, ReplicationSourceLogQueue logQueue,
|
||||
long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
|
||||
MetricsSource metrics, ReplicationSource source) {
|
||||
MetricsSource metrics, ReplicationSource source, String walGroupId) {
|
||||
this.replicationQueueInfo = replicationQueueInfo;
|
||||
this.logQueue = logQueue;
|
||||
this.lastReadPath = logQueue.peek();
|
||||
this.walGroupId = walGroupId;
|
||||
this.lastReadPath = logQueue.getQueue(walGroupId).peek();
|
||||
this.lastReadPosition = startPosition;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
|
@ -135,7 +137,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
|
||||
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
|
@ -232,24 +234,26 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
// enabled, then dump the log
|
||||
private void handleEofException(Exception e) {
|
||||
boolean isRecoveredSource = manager.getOldSources().contains(source);
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
|
||||
// add current log to recovered source queue so it is safe to remove.
|
||||
if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1)
|
||||
if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1)
|
||||
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
|
||||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
lastReadPath = logQueue.remove();
|
||||
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
|
||||
lastReadPath = queue.peek();
|
||||
logQueue.remove(walGroupId);
|
||||
lastReadPosition = 0;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + logQueue.peek());
|
||||
LOG.warn("Couldn't get file length information about log " + queue.peek());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
return logQueue.peek();
|
||||
return logQueue.getQueue(walGroupId).peek();
|
||||
}
|
||||
|
||||
//returns false if we've already exceeded the global quota
|
||||
|
|
|
@ -59,7 +59,8 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
private Entry currentEntry;
|
||||
// position after reading current entry
|
||||
private long currentPosition = 0;
|
||||
private PriorityBlockingQueue<Path> logQueue;
|
||||
private final ReplicationSourceLogQueue logQueue;
|
||||
private final String walGroupId;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private MetricsSource metrics;
|
||||
|
@ -70,12 +71,13 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
|
||||
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param metrics replication metrics
|
||||
* @param walGroupId wal prefix
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
MetricsSource metrics)
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
|
||||
MetricsSource metrics, String walGroupId)
|
||||
throws IOException {
|
||||
this(logQueue, fs, conf, 0, metrics);
|
||||
this(logQueue, fs, conf, 0, metrics, walGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,18 +85,18 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
* @param logQueue the queue of WAL paths
|
||||
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param startPosition the position in the first WAL to start reading at
|
||||
* @param walFileLengthProvider provides the length of the WAL file
|
||||
* @param serverName the server name which all WALs belong to
|
||||
* @param metrics the replication metrics
|
||||
* @param walGroupId wal prefix
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
long startPosition, MetricsSource metrics) throws IOException {
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
|
||||
long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
|
||||
this.logQueue = logQueue;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.currentPosition = startPosition;
|
||||
this.metrics = metrics;
|
||||
this.walGroupId = walGroupId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -198,7 +200,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
if (checkReader()) {
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
|
||||
if (logQueue.size() > 1) { // log was rolled
|
||||
if (logQueue.getQueue(walGroupId).size() > 1) { // log was rolled
|
||||
// Before dequeueing, we should always get one more attempt at reading.
|
||||
// This is in case more entries came in after we opened the reader,
|
||||
// and a new log was enqueued while we were reading. See HBASE-6758
|
||||
|
@ -266,7 +268,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
LOG.debug("Reached the end of log " + currentPath);
|
||||
}
|
||||
closeReader();
|
||||
logQueue.remove();
|
||||
logQueue.remove(walGroupId);
|
||||
setCurrentPath(null);
|
||||
setPosition(0);
|
||||
metrics.decrSizeOfLogQueue();
|
||||
|
@ -300,7 +302,8 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
|
||||
// open a reader on the next log in queue
|
||||
private boolean openNextLog() throws IOException {
|
||||
Path nextPath = logQueue.peek();
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
Path nextPath = queue.peek();
|
||||
if (nextPath != null) {
|
||||
openReader(nextPath);
|
||||
if (reader != null) return true;
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
|
@ -76,7 +79,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -299,6 +304,15 @@ public class TestReplicationSource {
|
|||
return source;
|
||||
}
|
||||
|
||||
ReplicationSource createReplicationSourceWithMocks(MetricsSource metrics,
|
||||
ReplicationEndpoint endpoint) throws IOException {
|
||||
final ReplicationSource source = new ReplicationSource();
|
||||
endpoint.init(context);
|
||||
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
|
||||
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
|
||||
return source;
|
||||
}
|
||||
|
||||
public AtomicLong getTotalBufferUsed() {
|
||||
return totalBufferUsed;
|
||||
}
|
||||
|
@ -648,5 +662,44 @@ public class TestReplicationSource {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
/*
|
||||
Test age of oldest wal metric.
|
||||
*/
|
||||
@Test
|
||||
public void testAgeOfOldestWal() throws Exception {
|
||||
try {
|
||||
ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(manualEdge);
|
||||
|
||||
String id = "1";
|
||||
MetricsSource metrics = new MetricsSource(id);
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
Mocks mocks = new Mocks();
|
||||
ReplicationEndpoint endpoint = mock(ReplicationEndpoint.class);
|
||||
ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics, endpoint);
|
||||
|
||||
final Path log1 = new Path(logDir, "log-walgroup-a.8");
|
||||
manualEdge.setValue(10);
|
||||
// Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
|
||||
source.enqueueLog(log1);
|
||||
MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
|
||||
assertEquals(2, metricsSource1.getOldestWalAge());
|
||||
|
||||
final Path log2 = new Path(logDir, "log-walgroup-b.4");
|
||||
// Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
|
||||
source.enqueueLog(log2);
|
||||
assertEquals(6, metricsSource1.getOldestWalAge());
|
||||
// Clear all metrics.
|
||||
metrics.clear();
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
|
||||
MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class);
|
||||
return factory.getSource(sourceId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({SmallTests.class,ReplicationTests.class})
|
||||
public class TestReplicationSourceLogQueue {
|
||||
|
||||
/*
|
||||
Testing enqueue and dequeuing of wal and check age of oldest wal.
|
||||
*/
|
||||
@Test
|
||||
public void testEnqueueDequeue() {
|
||||
try {
|
||||
String walGroupId1 = "fake-walgroup-id-1";
|
||||
String walGroupId2 = "fake-walgroup-id-2";
|
||||
|
||||
ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(manualEdge);
|
||||
|
||||
MetricsSource metrics = new MetricsSource("1");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics);
|
||||
final Path log1 = new Path("log-walgroup-a.8");
|
||||
manualEdge.setValue(10);
|
||||
// Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
|
||||
logQueue.enqueueLog(log1, walGroupId1);
|
||||
assertEquals(2, logQueue.getOldestWalAge());
|
||||
|
||||
final Path log2 = new Path("log-walgroup-b.4");
|
||||
// Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
|
||||
logQueue.enqueueLog(log2, walGroupId2);
|
||||
assertEquals(6, logQueue.getOldestWalAge());
|
||||
|
||||
// Remove an element from walGroupId2.
|
||||
// After this op, there will be only one element in the queue log-walgroup-a.8
|
||||
logQueue.remove(walGroupId2);
|
||||
assertEquals(2, logQueue.getOldestWalAge());
|
||||
|
||||
// Remove last element from the queue.
|
||||
logQueue.remove(walGroupId1);
|
||||
// This will test the case where there are no elements in the queue.
|
||||
assertEquals(0, logQueue.getOldestWalAge());
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALRead
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -109,9 +111,10 @@ public class TestWALEntryStream {
|
|||
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
|
||||
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
private static NavigableMap<byte[], Integer> scopes;
|
||||
private final String fakeWalGroupId = "fake-wal-group-id";
|
||||
|
||||
private WAL log;
|
||||
PriorityBlockingQueue<Path> walQueue;
|
||||
ReplicationSourceLogQueue logQueue;
|
||||
private PathWatcher pathWatcher;
|
||||
|
||||
@Rule
|
||||
|
@ -139,7 +142,7 @@ public class TestWALEntryStream {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
walQueue = new PriorityBlockingQueue<>();
|
||||
logQueue = new ReplicationSourceLogQueue(conf, new MetricsSource("2"));
|
||||
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
|
||||
pathWatcher = new PathWatcher();
|
||||
listeners.add(pathWatcher);
|
||||
|
@ -174,7 +177,7 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
int i = 0;
|
||||
for (WAL.Entry e : entryStream) {
|
||||
assertNotNull(e);
|
||||
|
@ -202,7 +205,7 @@ public class TestWALEntryStream {
|
|||
|
||||
long oldPos;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
// There's one edit in the log, read it. Reading past it needs to throw exception
|
||||
assertTrue(entryStream.hasNext());
|
||||
WAL.Entry entry = entryStream.next();
|
||||
|
@ -220,7 +223,7 @@ public class TestWALEntryStream {
|
|||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
// Read the newly added entry, make sure we made progress
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
|
@ -234,7 +237,7 @@ public class TestWALEntryStream {
|
|||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
|
@ -259,7 +262,7 @@ public class TestWALEntryStream {
|
|||
appendToLog("1");
|
||||
appendToLog("2");// 2
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertEquals("1", getRow(entryStream.next()));
|
||||
|
||||
appendToLog("3"); // 3 - comes in after reader opened
|
||||
|
@ -267,12 +270,12 @@ public class TestWALEntryStream {
|
|||
appendToLog("4"); // 4 - this append is in the rolled log
|
||||
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
|
||||
assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
|
||||
// entry in first log
|
||||
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
|
||||
// and 3 would be skipped
|
||||
assertEquals("4", getRow(entryStream.next())); // 4
|
||||
assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
|
||||
assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
@ -284,7 +287,7 @@ public class TestWALEntryStream {
|
|||
public void testNewEntriesWhileStreaming() throws Exception {
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
|
||||
// some new entries come in while we're streaming
|
||||
|
@ -307,7 +310,7 @@ public class TestWALEntryStream {
|
|||
long lastPosition = 0;
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
appendToLog("2");
|
||||
appendToLog("3");
|
||||
|
@ -315,11 +318,12 @@ public class TestWALEntryStream {
|
|||
}
|
||||
// next stream should picks up where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
|
||||
fakeWalGroupId)) {
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals("3", getRow(entryStream.next()));
|
||||
assertFalse(entryStream.hasNext()); // done
|
||||
assertEquals(1, walQueue.size());
|
||||
assertEquals(1, getQueue().size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,13 +337,15 @@ public class TestWALEntryStream {
|
|||
appendEntriesToLog(3);
|
||||
// read only one element
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
|
||||
fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
lastPosition = entryStream.getPosition();
|
||||
}
|
||||
// there should still be two more entries from where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
|
||||
fakeWalGroupId)) {
|
||||
assertNotNull(entryStream.next());
|
||||
assertNotNull(entryStream.next());
|
||||
assertFalse(entryStream.hasNext());
|
||||
|
@ -350,7 +356,7 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testEmptyStream() throws Exception {
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +367,7 @@ public class TestWALEntryStream {
|
|||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -374,9 +380,9 @@ public class TestWALEntryStream {
|
|||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
ReplicationSourceWALReaderThread batcher =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
|
||||
fs, conf, getDummyFilter(), new MetricsSource("1"), source);
|
||||
Path walPath = walQueue.peek();
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),logQueue, 0,
|
||||
fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId);
|
||||
Path walPath = getQueue().peek();
|
||||
batcher.start();
|
||||
WALEntryBatch entryBatch = batcher.take();
|
||||
|
||||
|
@ -400,8 +406,13 @@ public class TestWALEntryStream {
|
|||
appendEntriesToLog(2);
|
||||
|
||||
long position;
|
||||
try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue),
|
||||
fs, conf, new MetricsSource("1"))) {
|
||||
ReplicationSourceLogQueue tempQueue = new ReplicationSourceLogQueue(conf,
|
||||
getMockMetrics());
|
||||
for (Path path : getQueue()) {
|
||||
tempQueue.enqueueLog(path, fakeWalGroupId);
|
||||
}
|
||||
try (WALEntryStream entryStream = new WALEntryStream(tempQueue,
|
||||
fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -416,9 +427,9 @@ public class TestWALEntryStream {
|
|||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
walQueue, 0, fs, conf, getDummyFilter(),
|
||||
new MetricsSource("1"), source);
|
||||
Path walPath = walQueue.toArray(new Path[2])[1];
|
||||
logQueue, 0, fs, conf, getDummyFilter(),
|
||||
new MetricsSource("1"), source, fakeWalGroupId);
|
||||
Path walPath = getQueue().toArray(new Path[2])[1];
|
||||
reader.start();
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
|
@ -476,8 +487,8 @@ public class TestWALEntryStream {
|
|||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
final ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"), source);
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId);
|
||||
reader.start();
|
||||
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
@ -504,7 +515,7 @@ public class TestWALEntryStream {
|
|||
|
||||
appendToLogPlus(3, notReplicatedCf);
|
||||
|
||||
Path firstWAL = walQueue.peek();
|
||||
Path firstWAL = getQueue().peek();
|
||||
final long eof = getPosition(firstWAL);
|
||||
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
|
@ -512,8 +523,8 @@ public class TestWALEntryStream {
|
|||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
final ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"), source);
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId);
|
||||
reader.start();
|
||||
|
||||
// reader won't put any batch, even if EOF reached.
|
||||
|
@ -529,21 +540,24 @@ public class TestWALEntryStream {
|
|||
// should get empty batch with current wal position, after wal rolled
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
Path lastWAL= walQueue.peek();
|
||||
Path lastWAL= getQueue().peek();
|
||||
long positionToBeLogged = getPosition(lastWAL);
|
||||
|
||||
assertNotNull(entryBatch);
|
||||
assertTrue(entryBatch.isEmpty());
|
||||
assertEquals(1, walQueue.size());
|
||||
assertEquals(1, getQueue().size());
|
||||
assertNotEquals(firstWAL, entryBatch.getLastWalPath());
|
||||
assertEquals(lastWAL, entryBatch.getLastWalPath());
|
||||
assertEquals(positionToBeLogged, entryBatch.getLastWalPosition());
|
||||
}
|
||||
|
||||
private long getPosition(Path walPath) throws IOException {
|
||||
ReplicationSourceLogQueue tempQueue =
|
||||
new ReplicationSourceLogQueue(conf, getMockMetrics());
|
||||
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(walPath.getName());
|
||||
tempQueue.enqueueLog(walPath, walPrefix);
|
||||
WALEntryStream entryStream =
|
||||
new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)),
|
||||
fs, conf, new MetricsSource("1"));
|
||||
new WALEntryStream(tempQueue, fs, conf, getMockMetrics(), walPrefix);
|
||||
entryStream.hasNext();
|
||||
return entryStream.getPosition();
|
||||
}
|
||||
|
@ -628,8 +642,8 @@ public class TestWALEntryStream {
|
|||
Path currentPath;
|
||||
|
||||
@Override
|
||||
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
||||
walQueue.add(newPath);
|
||||
public void preLogRoll(Path oldPath, Path newPath) {
|
||||
logQueue.enqueueLog(newPath, fakeWalGroupId);
|
||||
currentPath = newPath;
|
||||
}
|
||||
}
|
||||
|
@ -644,7 +658,7 @@ public class TestWALEntryStream {
|
|||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -652,7 +666,7 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
// start up a reader
|
||||
Path walPath = walQueue.peek();
|
||||
Path walPath = getQueue().peek();
|
||||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
|
||||
|
||||
|
@ -667,8 +681,8 @@ public class TestWALEntryStream {
|
|||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
final ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
|
||||
0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
|
||||
0, fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId);
|
||||
|
||||
reader.start();
|
||||
Future<WALEntryBatch> future =
|
||||
|
@ -701,13 +715,11 @@ public class TestWALEntryStream {
|
|||
*/
|
||||
@Test
|
||||
public void testEOFExceptionForRecoveredQueue() throws Exception {
|
||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path("emptyLog");
|
||||
Path emptyLog = new Path("emptyLog.1");
|
||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||
queue.add(emptyLog);
|
||||
|
||||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||
|
||||
|
@ -720,14 +732,29 @@ public class TestWALEntryStream {
|
|||
// Override the max retries multiplier to fail fast.
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
|
||||
ReplicationSourceLogQueue localLogQueue =
|
||||
new ReplicationSourceLogQueue(conf, getMockMetrics());
|
||||
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
||||
// Create a reader thread.
|
||||
ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
queue, 0, fs, conf, getDummyFilter(),
|
||||
new MetricsSource("1"), (ReplicationSource) source);
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
|
||||
reader.run();
|
||||
// ReplicationSourceWALReaderThread#handleEofException method will
|
||||
// remove empty log from logQueue.
|
||||
assertEquals(0, queue.size());
|
||||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
private PriorityBlockingQueue<Path> getQueue() {
|
||||
return logQueue.getQueue(fakeWalGroupId);
|
||||
}
|
||||
|
||||
private MetricsSource getMockMetrics() {
|
||||
MetricsSource source = mock(MetricsSource.class);
|
||||
doNothing().when(source).incrSizeOfLogQueue();
|
||||
doNothing().when(source).decrSizeOfLogQueue();
|
||||
doNothing().when(source).setOldestWalAge(Mockito.anyInt());
|
||||
return source;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue