HBASE-25539: Add age of oldest wal metric (#2962)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
ec680c52e0
commit
6a4c9be967
|
@ -52,6 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
|
||||
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
|
||||
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
|
||||
/* Used to track the age of oldest wal in ms since its creation time */
|
||||
String OLDEST_WAL_AGE = "source.oldestWalAge";
|
||||
|
||||
void setLastShippedAge(long age);
|
||||
void incrSizeOfLogQueue(int size);
|
||||
|
@ -79,4 +81,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
long getWALEditsRead();
|
||||
long getShippedOps();
|
||||
long getEditsFiltered();
|
||||
void setOldestWalAge(long age);
|
||||
long getOldestWalAge();
|
||||
}
|
||||
|
|
|
@ -210,6 +210,18 @@ public class MetricsReplicationGlobalSourceSourceImpl
|
|||
public void incrFailedRecoveryQueue() {
|
||||
failedRecoveryQueue.incr(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOldestWalAge(long age) {
|
||||
// Not implemented
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOldestWalAge() {
|
||||
// Not implemented
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
|
|
|
@ -44,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final String logReadInBytesKey;
|
||||
private final String shippedHFilesKey;
|
||||
private final String sizeOfHFileRefsQueueKey;
|
||||
private final String oldestWalAgeKey;
|
||||
|
||||
private final MutableHistogram ageOfLastShippedOpHist;
|
||||
private final MutableGaugeLong sizeOfLogQueueGauge;
|
||||
|
@ -71,6 +72,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final MutableFastCounter repeatedFileBytes;
|
||||
private final MutableFastCounter completedWAL;
|
||||
private final MutableFastCounter completedRecoveryQueue;
|
||||
private final MutableGaugeLong oldestWalAge;
|
||||
|
||||
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
|
||||
this.rms = rms;
|
||||
|
@ -130,6 +132,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
|
||||
completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
|
||||
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
|
||||
|
||||
oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
|
||||
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
|
||||
}
|
||||
|
||||
@Override public void setLastShippedAge(long age) {
|
||||
|
@ -195,6 +200,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
rms.removeMetric(repeatedBytesKey);
|
||||
rms.removeMetric(completedLogsKey);
|
||||
rms.removeMetric(completedRecoveryKey);
|
||||
rms.removeMetric(oldestWalAgeKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -260,6 +266,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
@Override
|
||||
public void incrFailedRecoveryQueue() {/*no op*/}
|
||||
|
||||
@Override public void setOldestWalAge(long age) {
|
||||
oldestWalAge.set(age);
|
||||
}
|
||||
|
||||
@Override public long getOldestWalAge() {
|
||||
return oldestWalAge.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
|
|
|
@ -397,6 +397,17 @@ public class MetricsSource implements BaseSource {
|
|||
globalSourceSource.incrFailedRecoveryQueue();
|
||||
}
|
||||
|
||||
/*
|
||||
Sets the age of oldest log file just for source.
|
||||
*/
|
||||
public void setOldestWalAge(long age) {
|
||||
singleSourceSource.setOldestWalAge(age);
|
||||
}
|
||||
|
||||
public long getOldestWalAge() {
|
||||
return singleSourceSource.getOldestWalAge();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
singleSourceSource.init();
|
||||
|
|
|
@ -58,13 +58,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue) {
|
||||
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
|
||||
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
|
||||
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
|
||||
}
|
||||
|
||||
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
|
||||
public void locateRecoveredPaths(String walGroupId) throws IOException {
|
||||
boolean hasPathChanged = false;
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
|
||||
new AbstractFSWALProvider.WALStartTimeComparator());
|
||||
pathsLoop: for (Path path : queue) {
|
||||
|
@ -117,9 +117,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
|||
// put the correct locations in the queue
|
||||
// since this is a recovered queue with no new incoming logs,
|
||||
// there shouldn't be any concurrency issues
|
||||
queue.clear();
|
||||
logQueue.clear(walGroupId);
|
||||
for (Path path : newPaths) {
|
||||
queue.add(path);
|
||||
logQueue.enqueueLog(path, walGroupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,7 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -40,9 +38,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
private final ReplicationQueueStorage replicationQueues;
|
||||
|
||||
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
|
||||
ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
|
||||
ReplicationQueueStorage queueStorage) {
|
||||
super(conf, walGroupId, queue, source);
|
||||
super(conf, walGroupId, logQueue, source);
|
||||
this.source = source;
|
||||
this.replicationQueues = queueStorage;
|
||||
}
|
||||
|
@ -65,7 +63,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
int numRetries = 0;
|
||||
while (numRetries <= maxRetriesMultiplier) {
|
||||
try {
|
||||
source.locateRecoveredPaths(queue);
|
||||
source.locateRecoveredPaths(walGroupId);
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
|
||||
|
@ -82,9 +80,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
String peerClusterZNode = source.getQueueId();
|
||||
try {
|
||||
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
|
||||
peerClusterZNode, this.queue.peek().getName());
|
||||
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
|
||||
startPosition);
|
||||
peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
|
||||
LOG.trace("Recovered queue started with log {} at position {}",
|
||||
this.logQueue.getQueue(walGroupId).peek(), startPosition);
|
||||
} catch (ReplicationException e) {
|
||||
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
|
||||
}
|
||||
|
|
|
@ -24,14 +24,12 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -86,11 +84,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
public class ReplicationSource implements ReplicationSourceInterface {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
|
||||
// Queues of logs to process, entry in format of walGroupId->queue,
|
||||
// each presents a queue for one wal group
|
||||
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
|
||||
// per group queue size, keep no more than this number of logs in each wal group
|
||||
protected int queueSizePerGroup;
|
||||
protected ReplicationSourceLogQueue logQueue;
|
||||
protected ReplicationQueueStorage queueStorage;
|
||||
protected ReplicationPeer replicationPeer;
|
||||
|
||||
|
@ -118,8 +114,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
volatile boolean sourceRunning = false;
|
||||
// Metrics for this source
|
||||
private MetricsSource metrics;
|
||||
// WARN threshold for the number of queued logs, defaults to 2
|
||||
private int logQueueWarnThreshold;
|
||||
// ReplicationEndpoint which will handle the actual replication
|
||||
private volatile ReplicationEndpoint replicationEndpoint;
|
||||
|
||||
|
@ -213,6 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
this.maxRetriesMultiplier =
|
||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
|
||||
this.queueStorage = queueStorage;
|
||||
this.replicationPeer = replicationPeer;
|
||||
this.manager = manager;
|
||||
|
@ -224,7 +219,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
|
||||
// ReplicationQueueInfo parses the peerId out of the znode for us
|
||||
this.peerId = this.replicationQueueInfo.getPeerId();
|
||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||
|
||||
// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
|
||||
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
|
||||
|
@ -255,35 +249,20 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
}
|
||||
// Use WAL prefix as the WALGroupId for this peer.
|
||||
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
|
||||
PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
|
||||
if (queue == null) {
|
||||
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
|
||||
new AbstractFSWALProvider.WALStartTimeComparator());
|
||||
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
||||
// the shipper may quit immediately
|
||||
queue.put(wal);
|
||||
queues.put(walPrefix, queue);
|
||||
boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
|
||||
|
||||
if (!queueExists) {
|
||||
if (this.isSourceActive() && this.walEntryFilter != null) {
|
||||
// new wal group observed after source startup, start a new worker thread to track it
|
||||
// notice: it's possible that wal enqueued when this.running is set but worker thread
|
||||
// still not launched, so it's necessary to check workerThreads before start the worker
|
||||
tryStartNewShipper(walPrefix, queue);
|
||||
tryStartNewShipper(walPrefix);
|
||||
}
|
||||
} else {
|
||||
queue.put(wal);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
|
||||
this.replicationQueueInfo.getQueueId());
|
||||
}
|
||||
this.metrics.incrSizeOfLogQueue();
|
||||
// This will wal a warning for each new wal that gets created above the warn threshold
|
||||
int queueSize = queue.size();
|
||||
if (queueSize > this.logQueueWarnThreshold) {
|
||||
LOG.warn("{} WAL group {} queue size: {} exceeds value of "
|
||||
+ "replication.source.log.queue.warn: {}", logPeerId(),
|
||||
walPrefix, queueSize, logQueueWarnThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -375,16 +354,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
this.walEntryFilter = new ChainWALEntryFilter(filters);
|
||||
}
|
||||
|
||||
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
||||
private void tryStartNewShipper(String walGroupId) {
|
||||
workerThreads.compute(walGroupId, (key, value) -> {
|
||||
if (value != null) {
|
||||
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
|
||||
return value;
|
||||
} else {
|
||||
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
|
||||
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
||||
ReplicationSourceShipper worker = createNewShipper(walGroupId);
|
||||
ReplicationSourceWALReader walReader =
|
||||
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
||||
createNewWALReader(walGroupId, worker.getStartPosition());
|
||||
Threads.setDaemonThreadRunning(
|
||||
walReader, Thread.currentThread().getName()
|
||||
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
|
||||
|
@ -404,7 +383,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
String walGroupId = walGroupShipper.getKey();
|
||||
ReplicationSourceShipper shipper = walGroupShipper.getValue();
|
||||
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
|
||||
int queueSize = queues.get(walGroupId).size();
|
||||
int queueSize = logQueue.getQueueSize(walGroupId);
|
||||
replicationDelay = metrics.getReplicationDelay();
|
||||
Path currentPath = shipper.getCurrentPath();
|
||||
fileSize = -1;
|
||||
|
@ -443,16 +422,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
return fileSize;
|
||||
}
|
||||
|
||||
protected ReplicationSourceShipper createNewShipper(String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue) {
|
||||
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
|
||||
protected ReplicationSourceShipper createNewShipper(String walGroupId) {
|
||||
return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
|
||||
}
|
||||
|
||||
private ReplicationSourceWALReader createNewWALReader(String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, long startPosition) {
|
||||
private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
|
||||
return replicationPeer.getPeerConfig().isSerial()
|
||||
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
|
||||
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
|
||||
? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
|
||||
this, walGroupId)
|
||||
: new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
|
||||
this, walGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -621,14 +600,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
throw new IllegalStateException("Source should be active.");
|
||||
}
|
||||
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
|
||||
logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
|
||||
logPeerId(), this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId,
|
||||
peerClusterId);
|
||||
initializeWALEntryFilter(peerClusterId);
|
||||
// Start workers
|
||||
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
|
||||
String walGroupId = entry.getKey();
|
||||
PriorityBlockingQueue<Path> queue = entry.getValue();
|
||||
tryStartNewShipper(walGroupId, queue);
|
||||
for (String walGroupId: logQueue.getQueues().keySet()) {
|
||||
tryStartNewShipper(walGroupId);
|
||||
}
|
||||
this.startupOngoing.set(false);
|
||||
}
|
||||
|
@ -857,7 +834,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
/**
|
||||
* @return String to use as a log prefix that contains current peerId.
|
||||
*/
|
||||
private String logPeerId(){
|
||||
public String logPeerId(){
|
||||
return "peerId=" + this.getPeerId() + ",";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/*
|
||||
Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
|
||||
just at one place.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ReplicationSourceLogQueue {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
|
||||
// Queues of logs to process, entry in format of walGroupId->queue,
|
||||
// each presents a queue for one wal group
|
||||
private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
|
||||
private MetricsSource metrics;
|
||||
private Configuration conf;
|
||||
// per group queue size, keep no more than this number of logs in each wal group
|
||||
private int queueSizePerGroup;
|
||||
// WARN threshold for the number of queued logs, defaults to 2
|
||||
private int logQueueWarnThreshold;
|
||||
private ReplicationSource source;
|
||||
|
||||
public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics,
|
||||
ReplicationSource source) {
|
||||
this.conf = conf;
|
||||
this.metrics = metrics;
|
||||
this.source = source;
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the wal
|
||||
* @param wal wal to be enqueued
|
||||
* @param walGroupId Key for the wal in @queues map
|
||||
* @return boolean whether this is the first time we are seeing this walGroupId.
|
||||
*/
|
||||
public boolean enqueueLog(Path wal, String walGroupId) {
|
||||
boolean exists = false;
|
||||
PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
|
||||
if (queue == null) {
|
||||
queue = new PriorityBlockingQueue<>(queueSizePerGroup,
|
||||
new AbstractFSWALProvider.WALStartTimeComparator());
|
||||
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
|
||||
// the shipper may quit immediately
|
||||
queue.put(wal);
|
||||
queues.put(walGroupId, queue);
|
||||
} else {
|
||||
exists = true;
|
||||
queue.put(wal);
|
||||
}
|
||||
// Increment size of logQueue
|
||||
this.metrics.incrSizeOfLogQueue();
|
||||
// Compute oldest wal age
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
// This will wal a warning for each new wal that gets created above the warn threshold
|
||||
int queueSize = queue.size();
|
||||
if (queueSize > this.logQueueWarnThreshold) {
|
||||
LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
|
||||
"replication.source.log.queue.warn {}", source.logPeerId(), walGroupId, queueSize,
|
||||
logQueueWarnThreshold);
|
||||
}
|
||||
return exists;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue size for the given walGroupId.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public int getQueueSize(String walGroupId) {
|
||||
Queue queue = queues.get(walGroupId);
|
||||
if (queue == null) {
|
||||
return 0;
|
||||
}
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of queues.
|
||||
*/
|
||||
public int getNumQueues() {
|
||||
return queues.size();
|
||||
}
|
||||
|
||||
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return queue for the given walGroupId
|
||||
* Please don't add or remove elements from the returned queue.
|
||||
* Use @enqueueLog and @remove methods respectively.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
|
||||
return queues.get(walGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove head from the queue corresponding to given walGroupId.
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public void remove(String walGroupId) {
|
||||
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
queue.remove();
|
||||
// Decrease size logQueue.
|
||||
this.metrics.decrSizeOfLogQueue();
|
||||
// Re-compute age of oldest wal metric.
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all the elements from the queue corresponding to walGroupId
|
||||
* @param walGroupId walGroupId
|
||||
*/
|
||||
public void clear(String walGroupId) {
|
||||
PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
|
||||
while (!queue.isEmpty()) {
|
||||
// Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
|
||||
queue.remove();
|
||||
metrics.decrSizeOfLogQueue();
|
||||
}
|
||||
this.metrics.setOldestWalAge(getOldestWalAge());
|
||||
}
|
||||
|
||||
/*
|
||||
Returns the age of oldest wal.
|
||||
*/
|
||||
long getOldestWalAge() {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long timestamp = getOldestWalTimestamp();
|
||||
if (timestamp == Long.MAX_VALUE) {
|
||||
// If there are no wals in the queue then set the oldest wal timestamp to current time
|
||||
// so that the oldest wal age will be 0.
|
||||
timestamp = now;
|
||||
}
|
||||
long age = now - timestamp;
|
||||
return age;
|
||||
}
|
||||
|
||||
/*
|
||||
Get the oldest wal timestamp from all the queues.
|
||||
*/
|
||||
private long getOldestWalTimestamp() {
|
||||
long oldestWalTimestamp = Long.MAX_VALUE;
|
||||
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
|
||||
PriorityBlockingQueue<Path> queue = entry.getValue();
|
||||
Path path = queue.peek();
|
||||
// Can path ever be null ?
|
||||
if (path != null) {
|
||||
oldestWalTimestamp = Math.min(oldestWalTimestamp,
|
||||
AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
|
||||
}
|
||||
}
|
||||
return oldestWalTimestamp;
|
||||
}
|
||||
}
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -55,7 +54,7 @@ public class ReplicationSourceShipper extends Thread {
|
|||
|
||||
private final Configuration conf;
|
||||
protected final String walGroupId;
|
||||
protected final PriorityBlockingQueue<Path> queue;
|
||||
protected final ReplicationSourceLogQueue logQueue;
|
||||
private final ReplicationSource source;
|
||||
|
||||
// Last position in the log that we sent to ZooKeeper
|
||||
|
@ -76,10 +75,10 @@ public class ReplicationSourceShipper extends Thread {
|
|||
private final int shipEditsTimeout;
|
||||
|
||||
public ReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
|
||||
ReplicationSourceLogQueue logQueue, ReplicationSource source) {
|
||||
this.conf = conf;
|
||||
this.walGroupId = walGroupId;
|
||||
this.queue = queue;
|
||||
this.logQueue = logQueue;
|
||||
this.source = source;
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
|
||||
|
|
|
@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
|
|||
class ReplicationSourceWALReader extends Thread {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
|
||||
|
||||
private final PriorityBlockingQueue<Path> logQueue;
|
||||
private final ReplicationSourceLogQueue logQueue;
|
||||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private final WALEntryFilter filter;
|
||||
|
@ -77,6 +77,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
|
||||
private AtomicLong totalBufferUsed;
|
||||
private long totalBufferQuota;
|
||||
private final String walGroupId;
|
||||
|
||||
/**
|
||||
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
|
||||
|
@ -89,8 +90,8 @@ class ReplicationSourceWALReader extends Thread {
|
|||
* @param source replication source
|
||||
*/
|
||||
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
|
||||
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
|
||||
ReplicationSource source) {
|
||||
ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
|
||||
ReplicationSource source, String walGroupId) {
|
||||
this.logQueue = logQueue;
|
||||
this.currentPosition = startPosition;
|
||||
this.fs = fs;
|
||||
|
@ -111,6 +112,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
|
||||
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
|
||||
this.walGroupId = walGroupId;
|
||||
LOG.info("peerClusterZnode=" + source.getQueueId()
|
||||
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
|
||||
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
|
||||
|
@ -125,7 +127,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, conf, currentPosition,
|
||||
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
|
||||
source.getSourceMetrics())) {
|
||||
source.getSourceMetrics(), walGroupId)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
|
@ -246,18 +248,19 @@ class ReplicationSourceWALReader extends Thread {
|
|||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
private void handleEofException(IOException e) {
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
||||
// since we don't add current log to recovered source queue so it is safe to remove.
|
||||
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
|
||||
(source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
|
||||
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
|
||||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
logQueue.remove();
|
||||
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
|
||||
logQueue.remove(walGroupId);
|
||||
currentPosition = 0;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + logQueue.peek());
|
||||
LOG.warn("Couldn't get file length information about log " + queue.peek());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -269,7 +272,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return batchQueueHead.getLastWalPath();
|
||||
}
|
||||
// otherwise, we must be currently reading from the head of the log queue
|
||||
return logQueue.peek();
|
||||
return logQueue.getQueue(walGroupId).peek();
|
||||
}
|
||||
|
||||
//returns false if we've already exceeded the global quota
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -44,9 +43,9 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
|
|||
private final SerialReplicationChecker checker;
|
||||
|
||||
public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
|
||||
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
|
||||
ReplicationSource source) {
|
||||
super(fs, conf, logQueue, startPosition, filter, source);
|
||||
ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
|
||||
ReplicationSource source, String walGroupId) {
|
||||
super(fs, conf, logQueue, startPosition, filter, source, walGroupId);
|
||||
checker = new SerialReplicationChecker(conf, source);
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,8 @@ class WALEntryStream implements Closeable {
|
|||
private long currentPositionOfEntry = 0;
|
||||
// position after reading current entry
|
||||
private long currentPositionOfReader = 0;
|
||||
private final PriorityBlockingQueue<Path> logQueue;
|
||||
private final ReplicationSourceLogQueue logQueue;
|
||||
private final String walGroupId;
|
||||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private final WALFileLengthProvider walFileLengthProvider;
|
||||
|
@ -81,9 +82,9 @@ class WALEntryStream implements Closeable {
|
|||
* @param metrics the replication metrics
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
|
||||
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
||||
MetricsSource metrics) throws IOException {
|
||||
MetricsSource metrics, String walGroupId) throws IOException {
|
||||
this.logQueue = logQueue;
|
||||
this.fs = CommonFSUtils.getWALFileSystem(conf);
|
||||
this.conf = conf;
|
||||
|
@ -91,6 +92,7 @@ class WALEntryStream implements Closeable {
|
|||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
this.serverName = serverName;
|
||||
this.metrics = metrics;
|
||||
this.walGroupId = walGroupId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,7 +253,7 @@ class WALEntryStream implements Closeable {
|
|||
private void dequeueCurrentLog() throws IOException {
|
||||
LOG.debug("EOF, closing {}", currentPath);
|
||||
closeReader();
|
||||
logQueue.remove();
|
||||
logQueue.remove(walGroupId);
|
||||
setCurrentPath(null);
|
||||
setPosition(0);
|
||||
metrics.decrSizeOfLogQueue();
|
||||
|
@ -301,7 +303,8 @@ class WALEntryStream implements Closeable {
|
|||
|
||||
// open a reader on the next log in queue
|
||||
private boolean openNextLog() throws IOException {
|
||||
Path nextPath = logQueue.peek();
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
Path nextPath = queue.peek();
|
||||
if (nextPath != null) {
|
||||
openReader(nextPath);
|
||||
if (reader != null) {
|
||||
|
|
|
@ -436,7 +436,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
* @param p path to split
|
||||
* @return start time
|
||||
*/
|
||||
private static long getTS(Path p) {
|
||||
public static long getTS(Path p) {
|
||||
return WAL.getTimestamp(p.getName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -33,7 +34,6 @@ import java.util.UUID;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -62,6 +63,8 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -291,7 +294,7 @@ public class TestReplicationSource {
|
|||
source.init(testConf, null, mockManager, null, mockPeer, null,
|
||||
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
|
||||
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
|
||||
conf, null, 0, null, source);
|
||||
conf, null, 0, null, source, null);
|
||||
ReplicationSourceShipper shipper =
|
||||
new ReplicationSourceShipper(conf, null, null, source);
|
||||
shipper.entryReader = reader;
|
||||
|
@ -484,8 +487,6 @@ public class TestReplicationSource {
|
|||
String walGroupId = "fake-wal-group-id";
|
||||
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
|
||||
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||
queue.put(new Path("/www/html/test"));
|
||||
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
|
||||
Server server = mock(Server.class);
|
||||
when(server.getServerName()).thenReturn(serverName);
|
||||
|
@ -498,8 +499,12 @@ public class TestReplicationSource {
|
|||
.thenReturn(-1L);
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setInt("replication.source.maxretriesmultiplier", -1);
|
||||
MetricsSource metricsSource = mock(MetricsSource.class);
|
||||
doNothing().when(metricsSource).incrSizeOfLogQueue();
|
||||
ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
|
||||
logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
|
||||
RecoveredReplicationSourceShipper shipper =
|
||||
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
|
||||
new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
|
||||
assertEquals(1001L, shipper.getStartPosition());
|
||||
}
|
||||
|
||||
|
@ -592,5 +597,59 @@ public class TestReplicationSource {
|
|||
rss.stop("Done");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Test age of oldest wal metric.
|
||||
*/
|
||||
@Test
|
||||
public void testAgeOfOldestWal() throws Exception {
|
||||
try {
|
||||
ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(manualEdge);
|
||||
|
||||
String id = "1";
|
||||
MetricsSource metrics = new MetricsSource(id);
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
|
||||
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
|
||||
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
|
||||
Mockito.when(peerConfig.getReplicationEndpointImpl()).
|
||||
thenReturn(DoNothingReplicationEndpoint.class.getName());
|
||||
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
|
||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||
Mockito.when(manager.getGlobalMetrics()).
|
||||
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
|
||||
RegionServerServices rss =
|
||||
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
|
||||
|
||||
ReplicationSource source = new ReplicationSource();
|
||||
source.init(conf, null, manager, null, mockPeer, rss, id, null,
|
||||
p -> OptionalLong.empty(), metrics);
|
||||
|
||||
final Path log1 = new Path(logDir, "log-walgroup-a.8");
|
||||
manualEdge.setValue(10);
|
||||
// Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
|
||||
source.enqueueLog(log1);
|
||||
MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
|
||||
assertEquals(2, metricsSource1.getOldestWalAge());
|
||||
|
||||
final Path log2 = new Path(logDir, "log-walgroup-b.4");
|
||||
// Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
|
||||
source.enqueueLog(log2);
|
||||
assertEquals(6, metricsSource1.getOldestWalAge());
|
||||
// Clear all metrics.
|
||||
metrics.clear();
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
|
||||
MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class);
|
||||
return factory.getSource(sourceId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@Category({SmallTests.class,ReplicationTests.class})
|
||||
public class TestReplicationSourceLogQueue {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class);
|
||||
|
||||
/*
|
||||
Testing enqueue and dequeuing of wal and check age of oldest wal.
|
||||
*/
|
||||
@Test
|
||||
public void testEnqueueDequeue() {
|
||||
try {
|
||||
String walGroupId1 = "fake-walgroup-id-1";
|
||||
String walGroupId2 = "fake-walgroup-id-2";
|
||||
|
||||
ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(manualEdge);
|
||||
|
||||
MetricsSource metrics = new MetricsSource("1");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
ReplicationSource source = mock(ReplicationSource.class);
|
||||
Mockito.doReturn("peer").when(source).logPeerId();
|
||||
ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
||||
final Path log1 = new Path("log-walgroup-a.8");
|
||||
manualEdge.setValue(10);
|
||||
// Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
|
||||
logQueue.enqueueLog(log1, walGroupId1);
|
||||
assertEquals(2, logQueue.getOldestWalAge());
|
||||
|
||||
final Path log2 = new Path("log-walgroup-b.4");
|
||||
// Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
|
||||
logQueue.enqueueLog(log2, walGroupId2);
|
||||
assertEquals(6, logQueue.getOldestWalAge());
|
||||
|
||||
// Remove an element from walGroupId2.
|
||||
// After this op, there will be only one element in the queue log-walgroup-a.8
|
||||
logQueue.remove(walGroupId2);
|
||||
assertEquals(2, logQueue.getOldestWalAge());
|
||||
|
||||
// Remove last element from the queue.
|
||||
logQueue.remove(walGroupId1);
|
||||
// This will test the case where there are no elements in the queue.
|
||||
assertEquals(0, logQueue.getOldestWalAge());
|
||||
} finally {
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -99,6 +101,7 @@ public class TestWALEntryStream {
|
|||
private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
|
||||
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
||||
private final String fakeWalGroupId = "fake-wal-group-id";
|
||||
|
||||
private static NavigableMap<byte[], Integer> getScopes() {
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -107,7 +110,7 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
private WAL log;
|
||||
PriorityBlockingQueue<Path> walQueue;
|
||||
ReplicationSourceLogQueue logQueue;
|
||||
private PathWatcher pathWatcher;
|
||||
|
||||
@Rule
|
||||
|
@ -131,7 +134,8 @@ public class TestWALEntryStream {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
walQueue = new PriorityBlockingQueue<>();
|
||||
ReplicationSource source = mock(ReplicationSource.class);
|
||||
logQueue = new ReplicationSourceLogQueue(CONF, new MetricsSource("2"), source);
|
||||
pathWatcher = new PathWatcher();
|
||||
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
|
||||
wals.getWALProvider().addWALActionsListener(pathWatcher);
|
||||
|
@ -165,7 +169,8 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
int i = 0;
|
||||
while (entryStream.hasNext()) {
|
||||
assertNotNull(entryStream.next());
|
||||
|
@ -192,7 +197,7 @@ public class TestWALEntryStream {
|
|||
appendToLogAndSync();
|
||||
long oldPos;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
// There's one edit in the log, read it. Reading past it needs to throw exception
|
||||
assertTrue(entryStream.hasNext());
|
||||
WAL.Entry entry = entryStream.peek();
|
||||
|
@ -206,8 +211,8 @@ public class TestWALEntryStream {
|
|||
|
||||
appendToLogAndSync();
|
||||
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
// Read the newly added entry, make sure we made progress
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
|
@ -220,8 +225,8 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
appendToLogAndSync();
|
||||
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
|
||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
|
@ -246,7 +251,8 @@ public class TestWALEntryStream {
|
|||
appendToLog("1");
|
||||
appendToLog("2");// 2
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertEquals("1", getRow(entryStream.next()));
|
||||
|
||||
appendToLog("3"); // 3 - comes in after reader opened
|
||||
|
@ -254,12 +260,12 @@ public class TestWALEntryStream {
|
|||
appendToLog("4"); // 4 - this append is in the rolled log
|
||||
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
|
||||
assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
|
||||
// entry in first log
|
||||
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
|
||||
// and 3 would be skipped
|
||||
assertEquals("4", getRow(entryStream.next())); // 4
|
||||
assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
|
||||
assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
@ -267,11 +273,13 @@ public class TestWALEntryStream {
|
|||
/**
|
||||
* Tests that if writes come in while we have a stream open, we shouldn't miss them
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testNewEntriesWhileStreaming() throws Exception {
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
|
||||
// some new entries come in while we're streaming
|
||||
|
@ -294,7 +302,8 @@ public class TestWALEntryStream {
|
|||
long lastPosition = 0;
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
appendToLog("2");
|
||||
appendToLog("3");
|
||||
|
@ -302,11 +311,12 @@ public class TestWALEntryStream {
|
|||
}
|
||||
// next stream should picks up where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals("3", getRow(entryStream.next()));
|
||||
assertFalse(entryStream.hasNext()); // done
|
||||
assertEquals(1, walQueue.size());
|
||||
assertEquals(1, getQueue().size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -314,19 +324,21 @@ public class TestWALEntryStream {
|
|||
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
|
||||
* using the last position
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testPosition() throws Exception {
|
||||
long lastPosition = 0;
|
||||
appendEntriesToLogAndSync(3);
|
||||
// read only one element
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
|
||||
log, null, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition,
|
||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
lastPosition = entryStream.getPosition();
|
||||
}
|
||||
// there should still be two more entries from where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertNotNull(entryStream.next());
|
||||
assertNotNull(entryStream.next());
|
||||
assertFalse(entryStream.hasNext());
|
||||
|
@ -337,7 +349,8 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testEmptyStream() throws Exception {
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
@ -391,7 +404,8 @@ public class TestWALEntryStream {
|
|||
ReplicationSource source = mockReplicationSource(recovered, conf);
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
ReplicationSourceWALReader reader =
|
||||
new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
|
||||
new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source,
|
||||
fakeWalGroupId);
|
||||
reader.start();
|
||||
return reader;
|
||||
}
|
||||
|
@ -402,7 +416,8 @@ public class TestWALEntryStream {
|
|||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -410,7 +425,7 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
// start up a reader
|
||||
Path walPath = walQueue.peek();
|
||||
Path walPath = getQueue().peek();
|
||||
ReplicationSourceWALReader reader = createReader(false, CONF);
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
|
@ -430,7 +445,7 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testReplicationSourceWALReaderRecovered() throws Exception {
|
||||
appendEntriesToLogAndSync(10);
|
||||
Path walPath = walQueue.peek();
|
||||
Path walPath = getQueue().peek();
|
||||
log.rollWriter();
|
||||
appendEntriesToLogAndSync(5);
|
||||
log.shutdown();
|
||||
|
@ -450,7 +465,7 @@ public class TestWALEntryStream {
|
|||
assertEquals(0, batch.getNbEntries());
|
||||
assertTrue(batch.isEndOfFile());
|
||||
|
||||
walPath = walQueue.peek();
|
||||
walPath = getQueue().peek();
|
||||
batch = reader.take();
|
||||
assertEquals(walPath, batch.getLastWalPath());
|
||||
assertEquals(5, batch.getNbEntries());
|
||||
|
@ -463,7 +478,7 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
|
||||
appendEntriesToLogAndSync(1);
|
||||
Path walPath = walQueue.peek();
|
||||
Path walPath = getQueue().peek();
|
||||
log.rollWriter();
|
||||
appendEntriesToLogAndSync(20);
|
||||
TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
|
||||
|
@ -490,7 +505,7 @@ public class TestWALEntryStream {
|
|||
assertEquals(1, entryBatch.getNbEntries());
|
||||
assertTrue(entryBatch.isEndOfFile());
|
||||
|
||||
Path walPath2 = walQueue.peek();
|
||||
Path walPath2 = getQueue().peek();
|
||||
entryBatch = reader.take();
|
||||
assertEquals(walPath2, entryBatch.getLastWalPath());
|
||||
assertEquals(20, entryBatch.getNbEntries());
|
||||
|
@ -503,7 +518,7 @@ public class TestWALEntryStream {
|
|||
assertEquals(0, entryBatch.getNbEntries());
|
||||
assertTrue(entryBatch.isEndOfFile());
|
||||
|
||||
Path walPath3 = walQueue.peek();
|
||||
Path walPath3 = getQueue().peek();
|
||||
entryBatch = reader.take();
|
||||
assertEquals(walPath3, entryBatch.getLastWalPath());
|
||||
assertEquals(10, entryBatch.getNbEntries());
|
||||
|
@ -517,7 +532,8 @@ public class TestWALEntryStream {
|
|||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
|
||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||
new MetricsSource("1"), fakeWalGroupId)) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -525,7 +541,7 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
// start up a reader
|
||||
Path walPath = walQueue.peek();
|
||||
Path walPath = getQueue().peek();
|
||||
ReplicationSource source = mockReplicationSource(false, CONF);
|
||||
AtomicInteger invokeCount = new AtomicInteger(0);
|
||||
AtomicBoolean enabled = new AtomicBoolean(false);
|
||||
|
@ -535,7 +551,8 @@ public class TestWALEntryStream {
|
|||
});
|
||||
|
||||
ReplicationSourceWALReader reader =
|
||||
new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
|
||||
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(),
|
||||
source, fakeWalGroupId);
|
||||
reader.start();
|
||||
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
|
||||
return reader.take();
|
||||
|
@ -621,8 +638,8 @@ public class TestWALEntryStream {
|
|||
Path currentPath;
|
||||
|
||||
@Override
|
||||
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
||||
walQueue.add(newPath);
|
||||
public void preLogRoll(Path oldPath, Path newPath) {
|
||||
logQueue.enqueueLog(newPath, fakeWalGroupId);
|
||||
currentPath = newPath;
|
||||
}
|
||||
}
|
||||
|
@ -631,10 +648,10 @@ public class TestWALEntryStream {
|
|||
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
|
||||
appendToLog("1");
|
||||
appendToLog("2");
|
||||
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
|
||||
long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
|
||||
AtomicLong fileLength = new AtomicLong(size - 1);
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0,
|
||||
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
|
||||
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||
assertTrue(entryStream.hasNext());
|
||||
assertNotNull(entryStream.next());
|
||||
// can not get log 2
|
||||
|
@ -660,13 +677,11 @@ public class TestWALEntryStream {
|
|||
*/
|
||||
@Test
|
||||
public void testEOFExceptionForRecoveredQueue() throws Exception {
|
||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path("emptyLog");
|
||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||
queue.add(emptyLog);
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Override the max retries multiplier to fail fast.
|
||||
|
@ -675,11 +690,22 @@ public class TestWALEntryStream {
|
|||
// Create a reader thread with source as recovered source.
|
||||
ReplicationSource source = mockReplicationSource(true, conf);
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
|
||||
MetricsSource metrics = mock(MetricsSource.class);
|
||||
doNothing().when(metrics).incrSizeOfLogQueue();
|
||||
doNothing().when(metrics).decrSizeOfLogQueue();
|
||||
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
||||
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
||||
ReplicationSourceWALReader reader =
|
||||
new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
|
||||
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
||||
getDummyFilter(), source, fakeWalGroupId);
|
||||
reader.run();
|
||||
// ReplicationSourceWALReaderThread#handleEofException method will
|
||||
// remove empty log from logQueue.
|
||||
assertEquals(0, queue.size());
|
||||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
private PriorityBlockingQueue<Path> getQueue() {
|
||||
return logQueue.getQueue(fakeWalGroupId);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue