HBASE-6617 ReplicationSourceManager should be able to track multiple WAL paths (Yu Li)

This commit is contained in:
tedyu 2015-09-11 09:30:58 -07:00
parent 84dbe39f5d
commit be96bb6adf
16 changed files with 1027 additions and 613 deletions

View File

@ -3313,4 +3313,13 @@ public class HRegionServer extends HasThread implements
} }
return max; return max;
} }
/**
* For testing
* @return whether all wal roll request finished for this regionserver
*/
@VisibleForTesting
public boolean walRollRequestFinished() {
return this.walRoller.walRollFinished();
}
} }

View File

@ -197,4 +197,18 @@ public class LogRoller extends HasThread {
requester); requester);
} }
} }
/**
* For testing only
* @return true if all WAL roll finished
*/
@VisibleForTesting
public boolean walRollFinished() {
for (boolean needRoll : walNeedsRoll.values()) {
if (needRoll) {
return false;
}
}
return true;
}
} }

View File

@ -137,6 +137,7 @@ public interface ReplicationEndpoint extends Service {
static class ReplicateContext { static class ReplicateContext {
List<Entry> entries; List<Entry> entries;
int size; int size;
String walGroupId;
@InterfaceAudience.Private @InterfaceAudience.Private
public ReplicateContext() { public ReplicateContext() {
} }
@ -149,12 +150,19 @@ public interface ReplicationEndpoint extends Service {
this.size = size; this.size = size;
return this; return this;
} }
public ReplicateContext setWalGroupId(String walGroupId) {
this.walGroupId = walGroupId;
return this;
}
public List<Entry> getEntries() { public List<Entry> getEntries() {
return entries; return entries;
} }
public int getSize() { public int getSize() {
return size; return size;
} }
public String getWalGroupId(){
return walGroupId;
}
} }
/** /**

View File

@ -154,6 +154,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
@Override @Override
public boolean replicate(ReplicateContext replicateContext) { public boolean replicate(ReplicateContext replicateContext) {
List<Entry> entries = replicateContext.getEntries(); List<Entry> entries = replicateContext.getEntries();
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1; int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) { if (!peersSelected && this.isRunning()) {
@ -219,12 +220,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
throw iox; throw iox;
} }
// update metrics // update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
return true; return true;
} catch (IOException ioe) { } catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did // Didn't ship anything, but must still age the last time we did
this.metrics.refreshAgeOfLastShippedOp(); this.metrics.refreshAgeOfLastShippedOp(walGroupId);
if (ioe instanceof RemoteException) { if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException(); ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -34,7 +37,8 @@ public class MetricsSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class); private static final Log LOG = LogFactory.getLog(MetricsSource.class);
private long lastTimestamp = 0; // tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private int lastQueueSize = 0; private int lastQueueSize = 0;
private String id; private String id;
@ -56,23 +60,29 @@ public class MetricsSource {
/** /**
* Set the age of the last edit that was shipped * Set the age of the last edit that was shipped
*
* @param timestamp write time of the edit * @param timestamp write time of the edit
* @param walGroup which group we are setting
*/ */
public void setAgeOfLastShippedOp(long timestamp) { public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
long age = EnvironmentEdgeManager.currentTime() - timestamp; long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age); singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age);
this.lastTimestamp = timestamp; this.lastTimeStamps.put(walGroup, timestamp);
} }
/** /**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate. * when replication fails and need to keep that metric accurate.
* @param walGroupId id of the group to update
*/ */
public void refreshAgeOfLastShippedOp() { public void refreshAgeOfLastShippedOp(String walGroupId) {
if (this.lastTimestamp > 0) { Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
setAgeOfLastShippedOp(this.lastTimestamp); if (lastTimestamp == null) {
this.lastTimeStamps.put(walGroupId, 0L);
lastTimestamp = 0L;
}
if (lastTimestamp > 0) {
setAgeOfLastShippedOp(lastTimestamp, walGroupId);
} }
} }
@ -143,6 +153,7 @@ public class MetricsSource {
public void clear() { public void clear() {
singleSourceSource.clear(); singleSourceSource.clear();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize); globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
lastTimeStamps.clear();
lastQueueSize = 0; lastQueueSize = 0;
} }
@ -163,10 +174,16 @@ public class MetricsSource {
} }
/** /**
* Get the timeStampsOfLastShippedOp * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
* @return lastTimestampForAge * @return lastTimestampForAge
*/ */
public long getTimeStampOfLastShippedOp() { public long getTimeStampOfLastShippedOp() {
long lastTimestamp = 0L;
for (long ts : lastTimeStamps.values()) {
if (ts > lastTimestamp) {
lastTimestamp = ts;
}
}
return lastTimestamp; return lastTimestamp;
} }

View File

@ -22,11 +22,17 @@ import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -77,8 +83,12 @@ public class ReplicationSource extends Thread
implements ReplicationSourceInterface { implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class); private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process // Queues of logs to process, entry in format of walGroupId->queue,
private PriorityBlockingQueue<Path> queue; // each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues =
new HashMap<String, PriorityBlockingQueue<Path>>();
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
private ReplicationQueues replicationQueues; private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers; private ReplicationPeers replicationPeers;
@ -96,35 +106,23 @@ public class ReplicationSource extends Thread
private long replicationQueueSizeCapacity; private long replicationQueueSizeCapacity;
// Max number of entries in entriesArray // Max number of entries in entriesArray
private int replicationQueueNbCapacity; private int replicationQueueNbCapacity;
// Our reader for the current log. open/close handled by repLogReader
private WAL.Reader reader;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
private FileSystem fs; private FileSystem fs;
// id of this cluster // id of this cluster
private UUID clusterId; private UUID clusterId;
// id of the other cluster // id of the other cluster
private UUID peerClusterId; private UUID peerClusterId;
// total number of edits we replicated // total number of edits we replicated
private long totalReplicatedEdits = 0; private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// total number of edits we replicated // total number of edits we replicated
private long totalReplicatedOperations = 0; private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with // The znode we currently play with
private String peerClusterZnode; private String peerClusterZnode;
// Maximum number of retries before taking bold actions // Maximum number of retries before taking bold actions
private int maxRetriesMultiplier; private int maxRetriesMultiplier;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
private int currentSize = 0;
// Indicates if this particular source is running // Indicates if this particular source is running
private volatile boolean running = true; private volatile boolean sourceRunning = false;
// Metrics for this source // Metrics for this source
private MetricsSource metrics; private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationWALReaderManager repLogReader;
//WARN threshold for the number of queued logs, defaults to 2 //WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold; private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
@ -133,6 +131,9 @@ public class ReplicationSource extends Thread
private WALEntryFilter walEntryFilter; private WALEntryFilter walEntryFilter;
// throttler // throttler
private ReplicationThrottler throttler; private ReplicationThrottler throttler;
private AtomicInteger logQueueSize = new AtomicInteger(0);
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
@ -165,10 +166,7 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queue = this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
new PriorityBlockingQueue<Path>(
this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues; this.replicationQueues = replicationQueues;
@ -176,7 +174,6 @@ public class ReplicationSource extends Thread
this.manager = manager; this.manager = manager;
this.fs = fs; this.fs = fs;
this.metrics = metrics; this.metrics = metrics;
this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
this.clusterId = clusterId; this.clusterId = clusterId;
this.peerClusterZnode = peerClusterZnode; this.peerClusterZnode = peerClusterZnode;
@ -196,13 +193,33 @@ public class ReplicationSource extends Thread
@Override @Override
public void enqueueLog(Path log) { public void enqueueLog(Path log) {
this.queue.put(log); String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
int queueSize = queue.size(); PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
if (this.sourceRunning) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
final ReplicationSourceWorkerThread worker =
new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
} else {
LOG.debug("Starting up worker for wal group " + logPrefix);
worker.startup();
}
}
}
queue.put(log);
int queueSize = logQueueSize.incrementAndGet();
this.metrics.setSizeOfLogQueue(queueSize); this.metrics.setSizeOfLogQueue(queueSize);
// This will log a warning for each new log that gets created above the warn threshold // This will log a warning for each new log that gets created above the warn threshold
if (queueSize > this.logQueueWarnThreshold) { if (queue.size() > this.logQueueWarnThreshold) {
LOG.warn("Queue size: " + queueSize + LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
" exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
} }
} }
@ -217,12 +234,8 @@ public class ReplicationSource extends Thread
@Override @Override
public void run() { public void run() {
// We were stopped while looping to connect to sinks, just abort // mark we are running now
if (!this.isActive()) { this.sourceRunning = true;
uninitialize();
return;
}
try { try {
// start the endpoint, connect to the cluster // start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get(); Service.State state = replicationEndpoint.start().get();
@ -247,22 +260,14 @@ public class ReplicationSource extends Thread
int sleepMultiplier = 1; int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
while (this.isActive() && this.peerClusterId == null) { while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID(); this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isActive() && this.peerClusterId == null) { if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
} }
} }
// We were stopped while looping to contact peer's zk ensemble, just abort
if (!this.isActive()) {
uninitialize();
return;
}
// resetting to 1 to reuse later
sleepMultiplier = 1;
// In rare case, zookeeper setting may be messed up. That leads to the incorrect // In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId // peerClusterId value, which is the same as the source clusterId
@ -271,25 +276,230 @@ public class ReplicationSource extends Thread
+ peerClusterId + " which is not allowed by ReplicationEndpoint:" + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false); + replicationEndpoint.getClass().getName(), null, false);
} }
LOG.info("Replicating "+clusterId + " -> " + peerClusterId); LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
final ReplicationSourceWorkerThread worker =
new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup();
}
}
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
@Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
Threads
.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
}
@Override
public void terminate(String reason) {
terminate(reason, null);
}
@Override
public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
for (ReplicationSourceWorkerThread worker : workers) {
worker.setWorkerRunning(false);
worker.interrupt();
}
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceWorkerThread worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
if (future != null) {
try {
future.get();
} catch (Exception e) {
LOG.warn("Got exception:" + e);
}
}
}
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
@Override
public String getPeerClusterId() {
return this.peerId;
}
@Override
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}
private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
/**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
}
/**
* Split a path to get the start time
* For example: 10.20.20.171%3A60020.1277499063250
* @param p path to split
* @return start time
*/
private long getTS(Path p) {
String[] parts = p.getName().split("\\.");
return Long.parseLong(parts[parts.length-1]);
}
}
@Override
public String getStats() {
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceWorkerThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(position).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
}
return sb.toString();
}
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
public MetricsSource getSourceMetrics() {
return this.metrics;
}
public class ReplicationSourceWorkerThread extends Thread {
private ReplicationSource source;
private String walGroupId;
private PriorityBlockingQueue<Path> queue;
private ReplicationQueueInfo replicationQueueInfo;
// Our reader for the current log. open/close handled by repLogReader
private WAL.Reader reader;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
// Handle on the log reader helper
private ReplicationWALReaderManager repLogReader;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
private int currentSize = 0;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
this.walGroupId = walGroupId;
this.queue = queue;
this.replicationQueueInfo = replicationQueueInfo;
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source;
}
@Override
public void run() {
// If this is recovered, the queue is already full and the first log // If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs) // normally has a position (unless the RS failed between 2 logs)
if (this.replicationQueueInfo.isQueueRecovered()) { if (this.replicationQueueInfo.isQueueRecovered()) {
try { try {
this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode, this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
this.queue.peek().getName())); this.queue.peek().getName()));
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
" at position " + this.repLogReader.getPosition()); + this.repLogReader.getPosition());
} }
} catch (ReplicationException e) { } catch (ReplicationException e) {
this.terminate("Couldn't get the position of this recovered queue " + terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
this.peerClusterZnode, e);
} }
} }
// Loop until we close down // Loop until we close down
while (isActive()) { while (isWorkerActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again // Sleep until replication is enabled again
if (!isPeerEnabled()) { if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@ -347,25 +557,25 @@ public class ReplicationSource extends Thread
continue; continue;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn(this.peerClusterZnode + " Got: ", ioe); LOG.warn(peerClusterZnode + " Got: ", ioe);
gotIOE = true; gotIOE = true;
if (ioe.getCause() instanceof EOFException) { if (ioe.getCause() instanceof EOFException) {
boolean considerDumping = false; boolean considerDumping = false;
if (this.replicationQueueInfo.isQueueRecovered()) { if (this.replicationQueueInfo.isQueueRecovered()) {
try { try {
FileStatus stat = this.fs.getFileStatus(this.currentPath); FileStatus stat = fs.getFileStatus(this.currentPath);
if (stat.getLen() == 0) { if (stat.getLen() == 0) {
LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty"); LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
} }
considerDumping = true; considerDumping = true;
} catch (IOException e) { } catch (IOException e) {
LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e); LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
} }
} }
if (considerDumping && if (considerDumping &&
sleepMultiplier == this.maxRetriesMultiplier && sleepMultiplier == maxRetriesMultiplier &&
processEndOfFile()) { processEndOfFile()) {
continue; continue;
} }
@ -383,10 +593,10 @@ public class ReplicationSource extends Thread
// If we didn't get anything to replicate, or if we hit a IOE, // If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry. // wait a bit and retry.
// But if we need to stop, don't bother sleeping // But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || entries.isEmpty())) { if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath, manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(), peerClusterZnode, this.repLogReader.getPosition(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition(); this.lastLoggedPosition = this.repLogReader.getPosition();
} }
@ -395,7 +605,7 @@ public class ReplicationSource extends Thread
sleepMultiplier = 1; sleepMultiplier = 1;
// if there was nothing to ship and it's not an error // if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current // set "ageOfLastShippedOp" to <now> to indicate that we're current
this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis()); metrics.setAgeOfLastShippedOp(System.currentTimeMillis(), walGroupId);
} }
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
@ -405,16 +615,32 @@ public class ReplicationSource extends Thread
sleepMultiplier = 1; sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries); shipEdits(currentWALisBeingWrittenTo, entries);
} }
uninitialize(); if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.isAlive()) {
allOtherTaskDone = false;
break;
}
}
if (allOtherTaskDone) {
manager.closeRecoveredQueue(this.source);
LOG.info("Finished recovering queue " + peerClusterZnode
+ " with the following stats: " + getStats());
}
}
}
} }
/** /**
* Read all the entries from the current log files and retain those * Read all the entries from the current log files and retain those that need to be replicated.
* that need to be replicated. Else, process the end of the current file. * Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to * @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated * @param entries resulting entries to be replicated
* @return true if we got nothing and went to the next file, false if we got * @return true if we got nothing and went to the next file, false if we got entries
* entries
* @throws IOException * @throws IOException
*/ */
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
@ -426,10 +652,9 @@ public class ReplicationSource extends Thread
} }
this.repLogReader.seek(); this.repLogReader.seek();
long positionBeforeRead = this.repLogReader.getPosition(); long positionBeforeRead = this.repLogReader.getPosition();
WAL.Entry entry = WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
this.repLogReader.readNextAndSetPosition();
while (entry != null) { while (entry != null) {
this.metrics.incrLogEditsRead(); metrics.incrLogEditsRead();
seenEntries++; seenEntries++;
// don't replicate if the log entries have already been consumed by the cluster // don't replicate if the log entries have already been consumed by the cluster
@ -445,18 +670,19 @@ public class ReplicationSource extends Thread
} }
if (edit != null && edit.size() != 0) { if (edit != null && edit.size() != 0) {
//Mark that the current cluster has the change // Mark that the current cluster has the change
logKey.addClusterId(clusterId); logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit); currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry); entries.add(entry);
currentSize += entry.getEdit().heapSize(); currentSize += entry.getEdit().heapSize();
} else { } else {
this.metrics.incrLogEditsFiltered(); metrics.incrLogEditsFiltered();
} }
} }
// Stop if too many entries or too big // Stop if too many entries or too big
if (currentSize >= this.replicationQueueSizeCapacity || // FIXME check the relationship between single wal group and overall
entries.size() >= this.replicationQueueNbCapacity) { if (currentSize >= replicationQueueSizeCapacity
|| entries.size() >= replicationQueueNbCapacity) {
break; break;
} }
try { try {
@ -482,11 +708,12 @@ public class ReplicationSource extends Thread
protected boolean getNextPath() { protected boolean getNextPath() {
try { try {
if (this.currentPath == null) { if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
this.metrics.setSizeOfLogQueue(queue.size()); int queueSize = logQueueSize.decrementAndGet();
metrics.setSizeOfLogQueue(queueSize);
if (this.currentPath != null) { if (this.currentPath != null) {
this.manager.cleanOldLogs(this.currentPath.getName(), // For recovered queue: must use peerClusterZnode since peerId is a parsed value
this.peerId, manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
this.replicationQueueInfo.isQueueRecovered()); this.replicationQueueInfo.isQueueRecovered());
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("New log: " + this.currentPath); LOG.trace("New log: " + this.currentPath);
@ -506,6 +733,7 @@ public class ReplicationSource extends Thread
* @return true if we should continue with that file, false if we are over with it * @return true if we should continue with that file, false if we are over with it
*/ */
protected boolean openReader(int sleepMultiplier) { protected boolean openReader(int sleepMultiplier) {
try { try {
try { try {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -519,7 +747,7 @@ public class ReplicationSource extends Thread
// to look at) // to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size()); LOG.info("NB dead servers : " + deadRegionServers.size());
final Path rootDir = FSUtils.getRootDir(this.conf); final Path rootDir = FSUtils.getRootDir(conf);
for (String curDeadServerName : deadRegionServers) { for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory = new Path(rootDir, final Path deadRsDirectory = new Path(rootDir,
DefaultWALProvider.getWALDirectoryName(curDeadServerName)); DefaultWALProvider.getWALDirectoryName(curDeadServerName));
@ -530,7 +758,7 @@ public class ReplicationSource extends Thread
}; };
for (Path possibleLogLocation : locs) { for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString()); LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (this.manager.getFs().exists(possibleLogLocation)) { if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location // We found the right new location
LOG.info("Log " + this.currentPath + " still exists at " + LOG.info("Log " + this.currentPath + " still exists at " +
possibleLogLocation); possibleLogLocation);
@ -575,7 +803,7 @@ public class ReplicationSource extends Thread
// If the log was archived, continue reading from there // If the log was archived, continue reading from there
Path archivedLogLocation = Path archivedLogLocation =
new Path(manager.getOldLogDir(), currentPath.getName()); new Path(manager.getOldLogDir(), currentPath.getName());
if (this.manager.getFs().exists(archivedLogLocation)) { if (manager.getFs().exists(archivedLogLocation)) {
currentPath = archivedLogLocation; currentPath = archivedLogLocation;
LOG.info("Log " + this.currentPath + " was moved to " + LOG.info("Log " + this.currentPath + " was moved to " +
archivedLogLocation); archivedLogLocation);
@ -588,14 +816,14 @@ public class ReplicationSource extends Thread
} }
} catch (IOException ioe) { } catch (IOException ioe) {
if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
LOG.warn(this.peerClusterZnode + " Got: ", ioe); LOG.warn(peerClusterZnode + " Got: ", ioe);
this.reader = null; this.reader = null;
if (ioe.getCause() instanceof NullPointerException) { if (ioe.getCause() instanceof NullPointerException) {
// Workaround for race condition in HDFS-4380 // Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block // which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext. // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry."); LOG.warn("Got NPE opening reader, will retry.");
} else if (sleepMultiplier == this.maxRetriesMultiplier) { } else if (sleepMultiplier == maxRetriesMultiplier) {
// TODO Need a better way to determine if a file is really gone but // TODO Need a better way to determine if a file is really gone but
// TODO without scanning all logs dir // TODO without scanning all logs dir
LOG.warn("Waited too long for this file, considering dumping"); LOG.warn("Waited too long for this file, considering dumping");
@ -617,27 +845,8 @@ public class ReplicationSource extends Thread
} }
/** /**
* Do the sleeping logic * Count the number of different row keys in the given edit because of mini-batching. We assume
* @param msg Why we sleep * that there's at least one Cell in the WALEdit.
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from * @param edit edit to count row keys from
* @return number of different row keys * @return number of different row keys
*/ */
@ -664,10 +873,10 @@ public class ReplicationSource extends Thread
LOG.warn("Was given 0 edits to ship"); LOG.warn("Was given 0 edits to ship");
return; return;
} }
while (this.isActive()) { while (isWorkerActive()) {
try { try {
if (this.throttler.isEnabled()) { if (throttler.isEnabled()) {
long sleepTicks = this.throttler.getNextSleepInterval(currentSize); long sleepTicks = throttler.getNextSleepInterval(currentSize);
if (sleepTicks > 0) { if (sleepTicks > 0) {
try { try {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -682,12 +891,15 @@ public class ReplicationSource extends Thread
continue; continue;
} }
// reset throttler's cycle start tick when sleep for throttling occurs // reset throttler's cycle start tick when sleep for throttling occurs
this.throttler.resetStartTick(); throttler.resetStartTick();
} }
} }
// create replicateContext here, so the entries can be GC'd upon return from this call stack // create replicateContext here, so the entries can be GC'd upon return from this call
ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext(); // stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize); replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
long startTimeNs = System.nanoTime(); long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
@ -697,31 +909,33 @@ public class ReplicationSource extends Thread
if (!replicated) { if (!replicated) {
continue; continue;
} else { } else {
sleepMultiplier = Math.max(sleepMultiplier-1, 0); sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
} }
if (this.lastLoggedPosition != this.repLogReader.getPosition()) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath, manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.peerClusterZnode, this.repLogReader.getPosition(), this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition(); this.lastLoggedPosition = this.repLogReader.getPosition();
} }
if (this.throttler.isEnabled()) { if (throttler.isEnabled()) {
this.throttler.addPushSize(currentSize); throttler.addPushSize(currentSize);
} }
this.totalReplicatedEdits += entries.size(); totalReplicatedEdits.addAndGet(entries.size());
this.totalReplicatedOperations += currentNbOperations; totalReplicatedOperations.addAndGet(currentNbOperations);
this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024); // FIXME check relationship between wal group and overall
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); metrics.shipBatch(currentNbOperations, currentSize / 1024);
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+ this.totalReplicatedOperations + " operations in " + + totalReplicatedOperations + " operations in "
((endTimeNs - startTimeNs)/1000000) + " ms"); + ((endTimeNs - startTimeNs) / 1000000) + " ms");
} }
break; break;
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
org.apache.hadoop.util.StringUtils.stringifyException(ex)); + org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -730,154 +944,79 @@ public class ReplicationSource extends Thread
} }
/** /**
* check whether the peer is enabled or not * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
* * we're done! Else we'll just continue to try reading the log file
* @return true if the peer is enabled, otherwise false * @return true if we're done with the current file, false if we should continue trying to read
* from it
*/ */
protected boolean isPeerEnabled() { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
return this.replicationPeers.getStatusOfPeer(this.peerId); justification = "Yeah, this is how it works")
}
/**
* If the queue isn't empty, switch to the next one
* Else if this is a recovered queue, it means we're done!
* Else we'll just continue to try reading the log file
* @return true if we're done with the current file, false if we should
* continue trying to read from it
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
justification="Yeah, this is how it works")
protected boolean processEndOfFile() { protected boolean processEndOfFile() {
if (this.queue.size() != 0) { if (this.queue.size() != 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
String filesize = "N/A"; String filesize = "N/A";
try { try {
FileStatus stat = this.fs.getFileStatus(this.currentPath); FileStatus stat = fs.getFileStatus(this.currentPath);
filesize = stat.getLen()+""; filesize = stat.getLen() + "";
} catch (IOException ex) {} } catch (IOException ex) {
LOG.trace("Reached the end of a log, stats: " + getStats() + }
", and the length of the file is " + filesize); LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
+ ", and the length of the file is " + filesize);
} }
this.currentPath = null; this.currentPath = null;
this.repLogReader.finishCurrentFile(); this.repLogReader.finishCurrentFile();
this.reader = null; this.reader = null;
return true; return true;
} else if (this.replicationQueueInfo.isQueueRecovered()) { } else if (this.replicationQueueInfo.isQueueRecovered()) {
this.manager.closeRecoveredQueue(this); LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
LOG.info("Finished recovering the queue with the following stats " + getStats()); + peerClusterZnode);
this.running = false; workerRunning = false;
return true; return true;
} }
return false; return false;
} }
@Override
public void startup() { public void startup() {
String n = Thread.currentThread().getName(); String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
new Thread.UncaughtExceptionHandler() {
@Override @Override
public void uncaughtException(final Thread t, final Throwable e) { public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource," + LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
" currentPath=" + currentPath, e); + getCurrentPath(), e);
} }
}; };
Threads.setDaemonThreadRunning( Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
this, n + ".replicationSource," + + peerClusterZnode, handler);
this.peerClusterZnode, handler); workerThreads.put(walGroupId, this);
} }
@Override
public void terminate(String reason) {
terminate(reason, null);
}
@Override
public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
}
this.running = false;
this.interrupt();
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
Threads.shutdown(this, this.sleepForRetries);
if (future != null) {
try {
future.get();
} catch (Exception e) {
LOG.warn("Got exception:" + e);
}
}
}
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
@Override
public String getPeerClusterId() {
return this.peerId;
}
@Override
public Path getCurrentPath() { public Path getCurrentPath() {
return this.currentPath; return this.currentPath;
} }
private boolean isActive() { public long getCurrentPosition() {
return !this.stopper.isStopped() && this.running && !isInterrupted(); return this.repLogReader.getPosition();
} }
/** private boolean isWorkerActive() {
* Comparator used to compare logs together based on their start time return !stopper.isStopped() && workerRunning && !isInterrupted();
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
} }
/** private void terminate(String reason, Exception cause) {
* Split a path to get the start time if (cause == null) {
* For example: 10.20.20.171%3A60020.1277499063250 LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
* @param p path to split
* @return start time } else {
*/ LOG.error("Closing worker for wal group " + this.walGroupId
private long getTS(Path p) { + " because an error occurred: " + reason, cause);
String[] parts = p.getName().split("\\.");
return Long.parseLong(parts[parts.length-1]);
} }
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
} }
@Override public void setWorkerRunning(boolean workerRunning) {
public String getStats() { this.workerRunning = workerRunning;
long position = this.repLogReader.getPosition();
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
" at position: " + position;
} }
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
public MetricsSource getSourceMetrics() {
return this.metrics;
} }
} }

View File

@ -23,9 +23,12 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping // All about stopping
private final Server server; private final Server server;
// All logs we are currently tracking // All logs we are currently tracking
private final Map<String, SortedSet<String>> walsById; // Index structure of the map is: peer_id->logPrefix/logGroup->logs
private final Map<String, Map<String, SortedSet<String>>> walsById;
// Logs for recovered sources we are currently tracking // Logs for recovered sources we are currently tracking
private final Map<String, SortedSet<String>> walsByIdRecoveredQueues; private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
private final Configuration conf; private final Configuration conf;
private final FileSystem fs; private final FileSystem fs;
// The path to the latest log we saw, for new coming sources // The paths to the latest log of each wal group, for new coming peers
private Path latestPath; private Set<Path> latestPaths;
// Path to the wals directories // Path to the wals directories
private final Path logDir; private final Path logDir;
// Path to the wal archive // Path to the wal archive
@ -133,8 +138,8 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationPeers = replicationPeers; this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker; this.replicationTracker = replicationTracker;
this.server = server; this.server = server;
this.walsById = new HashMap<String, SortedSet<String>>(); this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.conf = conf; this.conf = conf;
this.fs = fs; this.fs = fs;
@ -158,6 +163,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setDaemon(true); tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build()); this.executor.setThreadFactory(tfb.build());
this.rand = new Random(); this.rand = new Random();
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
} }
/** /**
@ -189,15 +195,16 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered Whether this is a recovered queue * @param queueRecovered Whether this is a recovered queue
*/ */
public void cleanOldLogs(String key, String id, boolean queueRecovered) { public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) { if (queueRecovered) {
SortedSet<String> wals = walsByIdRecoveredQueues.get(id); SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
if (wals != null && !wals.first().equals(key)) { if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id); cleanOldLogs(wals, key, id);
} }
} else { } else {
synchronized (this.walsById) { synchronized (this.walsById) {
SortedSet<String> wals = walsById.get(id); SortedSet<String> wals = walsById.get(id).get(logPrefix);
if (!wals.first().equals(key)) { if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id); cleanOldLogs(wals, key, id);
} }
} }
@ -238,36 +245,44 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
/** /**
* Add a new normal source to this region server * Add sources for the given peer cluster on this region server. For the newly added peer, we only
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster * @param id the id of the peer cluster
* @return the source that was created * @return the source that was created
* @throws IOException * @throws IOException
*/ */
protected ReplicationSourceInterface addSource(String id) throws IOException, protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException { ReplicationException {
ReplicationPeerConfig peerConfig ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
= replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src = ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues, getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, server, id, this.clusterId, peerConfig, peer); this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
synchronized (this.walsById) { synchronized (this.walsById) {
this.sources.add(src); this.sources.add(src);
this.walsById.put(id, new TreeSet<String>()); Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
this.walsById.put(id, walsByGroup);
// Add the latest wal to that source's queue // Add the latest wal to that source's queue
if (this.latestPath != null) { synchronized (latestPaths) {
String name = this.latestPath.getName(); if (this.latestPaths.size() > 0) {
this.walsById.get(id).add(name); for (Path logPath : latestPaths) {
String name = logPath.getName();
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
SortedSet<String> logs = new TreeSet<String>();
logs.add(name);
walsByGroup.put(walPrefix, logs);
try { try {
this.replicationQueues.addLog(src.getPeerClusterZnode(), name); this.replicationQueues.addLog(id, name);
} catch (ReplicationException e) { } catch (ReplicationException e) {
String message = String message =
"Cannot add log to queue when creating a new source, queueId=" "Cannot add log to queue when creating a new source, queueId=" + id
+ src.getPeerClusterZnode() + ", filename=" + name; + ", filename=" + name;
server.stop(message); server.stop(message);
throw e; throw e;
} }
src.enqueueLog(this.latestPath); src.enqueueLog(logPath);
}
}
} }
} }
src.startup(); src.startup();
@ -302,7 +317,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the first source on this rs * Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names * @return a sorted set of wal names
*/ */
protected Map<String, SortedSet<String>> getWALs() { protected Map<String, Map<String, SortedSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById); return Collections.unmodifiableMap(walsById);
} }
@ -310,7 +325,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the recovered sources on this rs * Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names * @return a sorted set of wal names
*/ */
protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() { protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues); return Collections.unmodifiableMap(walsByIdRecoveredQueues);
} }
@ -331,27 +346,70 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
void preLogRoll(Path newLog) throws IOException { void preLogRoll(Path newLog) throws IOException {
synchronized (this.walsById) { recordLog(newLog);
String name = newLog.getName(); String logName = newLog.getName();
for (ReplicationSourceInterface source : this.sources) { String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
synchronized (latestPaths) {
Iterator<Path> iterator = latestPaths.iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
if (path.getName().contains(logPrefix)) {
iterator.remove();
break;
}
}
this.latestPaths.add(newLog);
}
}
/**
* Check and enqueue the given log to the correct source. If there's still no source for the
* group to which the given log belongs, create one
* @param logPath the log path to check and enqueue
* @throws IOException
*/
private void recordLog(Path logPath) throws IOException {
String logName = logPath.getName();
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
// update replication queues on ZK
synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
// the to-be-removed peer
for (String id : replicationPeers.getPeerIds()) {
try { try {
this.replicationQueues.addLog(source.getPeerClusterZnode(), name); this.replicationQueues.addLog(id, logName);
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException("Cannot add log to replication queue with id=" throw new IOException("Cannot add log to replication queue"
+ source.getPeerClusterZnode() + ", filename=" + name, e); + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
} }
} }
for (SortedSet<String> wals : this.walsById.values()) { }
// update walsById map
synchronized (walsById) {
for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
String peerId = entry.getKey();
Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) { if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since // If there's no slaves, don't need to keep the old wals since
// we only consider the last one when a new slave comes in // we only consider the last one when a new slave comes in
wals.clear(); wals.clear();
} }
wals.add(name); if (logPrefix.equals(walsEntry.getKey())) {
wals.add(logName);
existingPrefix = true;
}
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
SortedSet<String> wals = new TreeSet<String>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
} }
} }
this.latestPath = newLog;
} }
void postLogRoll(Path newLog) throws IOException { void postLogRoll(Path newLog) throws IOException {
@ -414,7 +472,8 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors", e); LOG.warn("Passed replication endpoint implementation throws errors"
+ " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e); throw new IOException(e);
} }
@ -470,7 +529,7 @@ public class ReplicationSourceManager implements ReplicationListener {
+ sources.size() + " and another " + sources.size() + " and another "
+ oldsources.size() + " that were recovered"); + oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user"; String terminateMessage = "Replication stream was removed by a user";
ReplicationSourceInterface srcToRemove = null; List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
List<ReplicationSourceInterface> oldSourcesToDelete = List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>(); new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer // First close all the recovered sources for this peer
@ -486,20 +545,24 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Number of deleted recovered sources for " + id + ": " LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size()); + oldSourcesToDelete.size());
// Now look for the one on this cluster // Now look for the one on this cluster
synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
// for the to-be-removed peer
for (ReplicationSourceInterface src : this.sources) { for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) { if (id.equals(src.getPeerClusterId())) {
srcToRemove = src; srcToRemove.add(src);
break;
} }
} }
if (srcToRemove == null) { if (srcToRemove.size() == 0) {
LOG.error("The queue we wanted to close is missing " + id); LOG.error("The queue we wanted to close is missing " + id);
return; return;
} }
srcToRemove.terminate(terminateMessage); for (ReplicationSourceInterface toRemove : srcToRemove) {
this.sources.remove(srcToRemove); toRemove.terminate(terminateMessage);
this.sources.remove(toRemove);
}
deleteSource(id, true); deleteSource(id, true);
} }
}
@Override @Override
public void regionServerRemoved(String regionserver) { public void regionServerRemoved(String regionserver) {
@ -580,6 +643,7 @@ public class ReplicationSourceManager implements ReplicationListener {
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) { for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey(); String peerId = entry.getKey();
SortedSet<String> walsSet = entry.getValue();
try { try {
// there is not an actual peer defined corresponding to peerId for the failover. // there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
@ -596,7 +660,20 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
continue; continue;
} }
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
walsByIdRecoveredQueues.put(peerId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
SortedSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<String>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
// enqueue sources
ReplicationSourceInterface src = ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer); server, peerId, this.clusterId, peerConfig, peer);
@ -605,12 +682,10 @@ public class ReplicationSourceManager implements ReplicationListener {
break; break;
} }
oldsources.add(src); oldsources.add(src);
SortedSet<String> walsSet = entry.getValue();
for (String wal : walsSet) { for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal)); src.enqueueLog(new Path(oldLogDir, wal));
} }
src.startup(); src.startup();
walsByIdRecoveredQueues.put(peerId, walsSet);
} catch (IOException e) { } catch (IOException e) {
// TODO manage it // TODO manage it
LOG.error("Failed creating a source", e); LOG.error("Failed creating a source", e);

View File

@ -366,4 +366,15 @@ public class DefaultWALProvider implements WALProvider {
} }
} }
/**
* Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
* @param name Name of the WAL to parse
* @return prefix of the log
*/
public static String getWALPrefixFromWALName(String name) {
int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
return name.substring(0, endIndex);
}
} }

View File

@ -84,7 +84,7 @@ public class WALFactory {
} }
} }
static final String WAL_PROVIDER = "hbase.wal.provider"; public static final String WAL_PROVIDER = "hbase.wal.provider";
static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";

View File

@ -73,15 +73,40 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
@Before @Before
public void setup() throws FailedLogCloseException, IOException { public void setup() throws Exception {
ReplicationEndpointForTest.contructedCount.set(0); ReplicationEndpointForTest.contructedCount.set(0);
ReplicationEndpointForTest.startedCount.set(0); ReplicationEndpointForTest.startedCount.set(0);
ReplicationEndpointForTest.replicateCount.set(0); ReplicationEndpointForTest.replicateCount.set(0);
ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointReturningFalse.replicated.set(false);
ReplicationEndpointForTest.lastEntries = null; ReplicationEndpointForTest.lastEntries = null;
for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { final List<RegionServerThread> rsThreads =
utility1.getMiniHBaseCluster().getRegionServerThreads();
for (RegionServerThread rs : rsThreads) {
utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
} }
// Wait for all log roll to finish
utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (RegionServerThread rs : rsThreads) {
if (!rs.getRegionServer().walRollRequestFinished()) {
return false;
}
}
return true;
}
@Override
public String explainFailure() throws Exception {
List<String> logRollInProgressRsList = new ArrayList<String>();
for (RegionServerThread rs : rsThreads) {
if (!rs.getRegionServer().walRollRequestFinished()) {
logRollInProgressRsList.add(rs.getRegionServer().toString());
}
}
return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
}
});
} }
@Test (timeout=120000) @Test (timeout=120000)

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpoint {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
TestReplicationEndpoint.setUpBeforeClass();
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends
TestReplicationKillMasterRSCompressed {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.multiwal;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
TestReplicationBase.setUpBeforeClass();
}
}

View File

@ -260,7 +260,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
Assert.assertEquals(1000, entries.size()); Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable // replay the edits to the secondary using replay callable
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))); final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000); HTU.verifyNumericRows(region, f, 0, 1000);

View File

@ -25,6 +25,7 @@ import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -228,7 +229,11 @@ public class TestReplicationSourceManager {
} }
wal.sync(); wal.sync();
assertEquals(6, manager.getWALs().get(slaveId).size()); int logNumber = 0;
for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
logNumber += entry.getValue().size();
}
assertEquals(6, logNumber);
wal.rollWriter(); wal.rollWriter();
@ -297,8 +302,11 @@ public class TestReplicationSourceManager {
rq.init(server.getServerName().toString()); rq.init(server.getServerName().toString());
// populate some znodes in the peer znode // populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<String>(); SortedSet<String> files = new TreeSet<String>();
files.add("log1"); String group = "testgroup";
files.add("log2"); String file1 = group + ".log1";
String file2 = group + ".log2";
files.add(file1);
files.add(file2);
for (String file : files) { for (String file : files) {
rq.addLog("1", file); rq.addLog("1", file);
} }
@ -316,10 +324,10 @@ public class TestReplicationSourceManager {
w1.join(5000); w1.join(5000);
assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName(); String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id)); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
manager.cleanOldLogs("log2", id, true); manager.cleanOldLogs(file2, id, true);
// log1 should be deleted // log1 should be deleted
assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id)); assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
} }
@Test @Test

View File

@ -39,8 +39,10 @@ import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager {
private int nbRows; private int nbRows;
private int walEditKVs; private int walEditKVs;
private final AtomicLong sequenceId = new AtomicLong(1); private final AtomicLong sequenceId = new AtomicLong(1);
@Rule public TestName tn = new TestName();
@Parameters @Parameters
public static Collection<Object[]> parameters() { public static Collection<Object[]> parameters() {
@ -127,7 +130,7 @@ public class TestReplicationWALReaderManager {
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
pathWatcher = new PathWatcher(); pathWatcher = new PathWatcher();
listeners.add(pathWatcher); listeners.add(pathWatcher);
final WALFactory wals = new WALFactory(conf, listeners, "some server"); final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
log = wals.getWAL(info.getEncodedNameAsBytes()); log = wals.getWAL(info.getEncodedNameAsBytes());
} }