diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bf319199a02..bd9d42a7d80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3313,4 +3313,13 @@ public class HRegionServer extends HasThread implements } return max; } + + /** + * For testing + * @return whether all wal roll request finished for this regionserver + */ + @VisibleForTesting + public boolean walRollRequestFinished() { + return this.walRoller.walRollFinished(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index f35fdb7e473..136e03ef591 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -197,4 +197,18 @@ public class LogRoller extends HasThread { requester); } } -} \ No newline at end of file + + /** + * For testing only + * @return true if all WAL roll finished + */ + @VisibleForTesting + public boolean walRollFinished() { + for (boolean needRoll : walNeedsRoll.values()) { + if (needRoll) { + return false; + } + } + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index e8a7ddca82e..ac1257fa416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -137,6 +137,7 @@ public interface ReplicationEndpoint extends Service { static class ReplicateContext { List entries; int size; + String walGroupId; @InterfaceAudience.Private public ReplicateContext() { } @@ -149,12 +150,19 @@ public interface ReplicationEndpoint extends Service { this.size = size; return this; } + public ReplicateContext setWalGroupId(String walGroupId) { + this.walGroupId = walGroupId; + return this; + } public List getEntries() { return entries; } public int getSize() { return size; } + public String getWalGroupId(){ + return walGroupId; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index eb42e693dc0..685e95c7b2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -154,6 +154,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override public boolean replicate(ReplicateContext replicateContext) { List entries = replicateContext.getEntries(); + String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; if (!peersSelected && this.isRunning()) { @@ -219,12 +220,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi throw iox; } // update metrics - this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); + this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), + walGroupId); return true; } catch (IOException ioe) { // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(); + this.metrics.refreshAgeOfLastShippedOp(walGroupId); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index cf0878725d7..f9f7001653e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.replication.regionserver; +import java.util.HashMap; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -34,7 +37,8 @@ public class MetricsSource { private static final Log LOG = LogFactory.getLog(MetricsSource.class); - private long lastTimestamp = 0; + // tracks last shipped timestamp for each wal group + private Map lastTimeStamps = new HashMap(); private int lastQueueSize = 0; private String id; @@ -56,23 +60,29 @@ public class MetricsSource { /** * Set the age of the last edit that was shipped - * * @param timestamp write time of the edit + * @param walGroup which group we are setting */ - public void setAgeOfLastShippedOp(long timestamp) { + public void setAgeOfLastShippedOp(long timestamp, String walGroup) { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); - this.lastTimestamp = timestamp; + this.lastTimeStamps.put(walGroup, timestamp); } /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. + * @param walGroupId id of the group to update */ - public void refreshAgeOfLastShippedOp() { - if (this.lastTimestamp > 0) { - setAgeOfLastShippedOp(this.lastTimestamp); + public void refreshAgeOfLastShippedOp(String walGroupId) { + Long lastTimestamp = this.lastTimeStamps.get(walGroupId); + if (lastTimestamp == null) { + this.lastTimeStamps.put(walGroupId, 0L); + lastTimestamp = 0L; + } + if (lastTimestamp > 0) { + setAgeOfLastShippedOp(lastTimestamp, walGroupId); } } @@ -143,6 +153,7 @@ public class MetricsSource { public void clear() { singleSourceSource.clear(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); + lastTimeStamps.clear(); lastQueueSize = 0; } @@ -163,10 +174,16 @@ public class MetricsSource { } /** - * Get the timeStampsOfLastShippedOp + * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one * @return lastTimestampForAge */ public long getTimeStampOfLastShippedOp() { + long lastTimestamp = 0L; + for (long ts : lastTimeStamps.values()) { + if (ts > lastTimestamp) { + lastTimestamp = ts; + } + } return lastTimestamp; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2126f6db6b5..62c31e6bf83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -22,11 +22,17 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -77,8 +83,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterface { private static final Log LOG = LogFactory.getLog(ReplicationSource.class); - // Queue of logs to process - private PriorityBlockingQueue queue; + // Queues of logs to process, entry in format of walGroupId->queue, + // each presents a queue for one wal group + private Map> queues = + new HashMap>(); + // per group queue size, keep no more than this number of logs in each wal group + private int queueSizePerGroup; private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; @@ -96,35 +106,23 @@ public class ReplicationSource extends Thread private long replicationQueueSizeCapacity; // Max number of entries in entriesArray private int replicationQueueNbCapacity; - // Our reader for the current log. open/close handled by repLogReader - private WAL.Reader reader; - // Last position in the log that we sent to ZooKeeper - private long lastLoggedPosition = -1; - // Path of the current log - private volatile Path currentPath; private FileSystem fs; // id of this cluster private UUID clusterId; // id of the other cluster private UUID peerClusterId; // total number of edits we replicated - private long totalReplicatedEdits = 0; + private AtomicLong totalReplicatedEdits = new AtomicLong(0); // total number of edits we replicated - private long totalReplicatedOperations = 0; + private AtomicLong totalReplicatedOperations = new AtomicLong(0); // The znode we currently play with private String peerClusterZnode; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; - // Current number of operations (Put/Delete) that we need to replicate - private int currentNbOperations = 0; - // Current size of data we need to replicate - private int currentSize = 0; // Indicates if this particular source is running - private volatile boolean running = true; + private volatile boolean sourceRunning = false; // Metrics for this source private MetricsSource metrics; - // Handle on the log reader helper - private ReplicationWALReaderManager repLogReader; //WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; // ReplicationEndpoint which will handle the actual replication @@ -133,6 +131,9 @@ public class ReplicationSource extends Thread private WALEntryFilter walEntryFilter; // throttler private ReplicationThrottler throttler; + private AtomicInteger logQueueSize = new AtomicInteger(0); + private ConcurrentHashMap workerThreads = + new ConcurrentHashMap(); /** * Instantiation method used by region servers @@ -165,10 +166,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per - this.queue = - new PriorityBlockingQueue( - this.conf.getInt("hbase.regionserver.maxlogs", 32), - new LogsComparator()); + this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; @@ -176,7 +174,6 @@ public class ReplicationSource extends Thread this.manager = manager; this.fs = fs; this.metrics = metrics; - this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf); this.clusterId = clusterId; this.peerClusterZnode = peerClusterZnode; @@ -196,13 +193,33 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { - this.queue.put(log); - int queueSize = queue.size(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName()); + PriorityBlockingQueue queue = queues.get(logPrefix); + if (queue == null) { + queue = new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + queues.put(logPrefix, queue); + if (this.sourceRunning) { + // new wal group observed after source startup, start a new worker thread to track it + // notice: it's possible that log enqueued when this.running is set but worker thread + // still not launched, so it's necessary to check workerThreads before start the worker + final ReplicationSourceWorkerThread worker = + new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this); + ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker); + if (extant != null) { + LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix); + } else { + LOG.debug("Starting up worker for wal group " + logPrefix); + worker.startup(); + } + } + } + queue.put(log); + int queueSize = logQueueSize.incrementAndGet(); this.metrics.setSizeOfLogQueue(queueSize); // This will log a warning for each new log that gets created above the warn threshold - if (queueSize > this.logQueueWarnThreshold) { - LOG.warn("Queue size: " + queueSize + - " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + if (queue.size() > this.logQueueWarnThreshold) { + LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize + + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); } } @@ -217,12 +234,8 @@ public class ReplicationSource extends Thread @Override public void run() { - // We were stopped while looping to connect to sinks, just abort - if (!this.isActive()) { - uninitialize(); - return; - } - + // mark we are running now + this.sourceRunning = true; try { // start the endpoint, connect to the cluster Service.State state = replicationEndpoint.start().get(); @@ -247,22 +260,14 @@ public class ReplicationSource extends Thread int sleepMultiplier = 1; // delay this until we are in an asynchronous thread - while (this.isActive() && this.peerClusterId == null) { + while (this.isSourceActive() && this.peerClusterId == null) { this.peerClusterId = replicationEndpoint.getPeerUUID(); - if (this.isActive() && this.peerClusterId == null) { + if (this.isSourceActive() && this.peerClusterId == null) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } } } - // We were stopped while looping to contact peer's zk ensemble, just abort - if (!this.isActive()) { - uninitialize(); - return; - } - - // resetting to 1 to reuse later - sleepMultiplier = 1; // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId @@ -271,349 +276,21 @@ public class ReplicationSource extends Thread + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); } - LOG.info("Replicating "+clusterId + " -> " + peerClusterId); - - // If this is recovered, the queue is already full and the first log - // normally has a position (unless the RS failed between 2 logs) - if (this.replicationQueueInfo.isQueueRecovered()) { - try { - this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode, - this.queue.peek().getName())); - if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + - " at position " + this.repLogReader.getPosition()); - } - } catch (ReplicationException e) { - this.terminate("Couldn't get the position of this recovered queue " + - this.peerClusterZnode, e); + LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + // start workers + for (Map.Entry> entry : queues.entrySet()) { + String walGroupId = entry.getKey(); + PriorityBlockingQueue queue = entry.getValue(); + final ReplicationSourceWorkerThread worker = + new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this); + ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker); + if (extant != null) { + LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); + } else { + LOG.debug("Starting up worker for wal group " + walGroupId); + worker.startup(); } } - // Loop until we close down - while (isActive()) { - // Sleep until replication is enabled again - if (!isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - Path oldPath = getCurrentPath(); //note that in the current scenario, - //oldPath will be null when a log roll - //happens. - // Get a new path - boolean hasCurrentPath = getNextPath(); - if (getCurrentPath() != null && oldPath == null) { - sleepMultiplier = 1; //reset the sleepMultiplier on a path change - } - if (!hasCurrentPath) { - if (sleepForRetries("No log to process", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - boolean currentWALisBeingWrittenTo = false; - //For WAL files we own (rather than recovered), take a snapshot of whether the - //current WAL file (this.currentPath) is in use (for writing) NOW! - //Since the new WAL paths are enqueued only after the prev WAL file - //is 'closed', presence of an element in the queue means that - //the previous WAL file was closed, else the file is in use (currentPath) - //We take the snapshot now so that we are protected against races - //where a new file gets enqueued while the current file is being processed - //(and where we just finished reading the current file). - if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) { - currentWALisBeingWrittenTo = true; - } - // Open a reader on it - if (!openReader(sleepMultiplier)) { - // Reset the sleep multiplier, else it'd be reused for the next file - sleepMultiplier = 1; - continue; - } - - // If we got a null reader but didn't continue, then sleep and continue - if (this.reader == null) { - if (sleepForRetries("Unable to open a reader", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - - boolean gotIOE = false; - currentNbOperations = 0; - List entries = new ArrayList(1); - currentSize = 0; - try { - if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) { - continue; - } - } catch (IOException ioe) { - LOG.warn(this.peerClusterZnode + " Got: ", ioe); - gotIOE = true; - if (ioe.getCause() instanceof EOFException) { - - boolean considerDumping = false; - if (this.replicationQueueInfo.isQueueRecovered()) { - try { - FileStatus stat = this.fs.getFileStatus(this.currentPath); - if (stat.getLen() == 0) { - LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty"); - } - considerDumping = true; - } catch (IOException e) { - LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e); - } - } - - if (considerDumping && - sleepMultiplier == this.maxRetriesMultiplier && - processEndOfFile()) { - continue; - } - } - } finally { - try { - this.reader = null; - this.repLogReader.closeReader(); - } catch (IOException e) { - gotIOE = true; - LOG.warn("Unable to finalize the tailing of a file", e); - } - } - - // If we didn't get anything to replicate, or if we hit a IOE, - // wait a bit and retry. - // But if we need to stop, don't bother sleeping - if (this.isActive() && (gotIOE || entries.isEmpty())) { - if (this.lastLoggedPosition != this.repLogReader.getPosition()) { - this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.repLogReader.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); - this.lastLoggedPosition = this.repLogReader.getPosition(); - } - // Reset the sleep multiplier if nothing has actually gone wrong - if (!gotIOE) { - sleepMultiplier = 1; - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis()); - } - if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - sleepMultiplier = 1; - shipEdits(currentWALisBeingWrittenTo, entries); - } - uninitialize(); - } - - /** - * Read all the entries from the current log files and retain those - * that need to be replicated. Else, process the end of the current file. - * @param currentWALisBeingWrittenTo is the current WAL being written to - * @param entries resulting entries to be replicated - * @return true if we got nothing and went to the next file, false if we got - * entries - * @throws IOException - */ - protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, - List entries) throws IOException { - long seenEntries = 0; - if (LOG.isTraceEnabled()) { - LOG.trace("Seeking in " + this.currentPath + " at position " - + this.repLogReader.getPosition()); - } - this.repLogReader.seek(); - long positionBeforeRead = this.repLogReader.getPosition(); - WAL.Entry entry = - this.repLogReader.readNextAndSetPosition(); - while (entry != null) { - this.metrics.incrLogEditsRead(); - seenEntries++; - - // don't replicate if the log entries have already been consumed by the cluster - if (replicationEndpoint.canReplicateToSameCluster() - || !entry.getKey().getClusterIds().contains(peerClusterId)) { - // Remove all KVs that should not be replicated - entry = walEntryFilter.filter(entry); - WALEdit edit = null; - WALKey logKey = null; - if (entry != null) { - edit = entry.getEdit(); - logKey = entry.getKey(); - } - - if (edit != null && edit.size() != 0) { - //Mark that the current cluster has the change - logKey.addClusterId(clusterId); - currentNbOperations += countDistinctRowKeys(edit); - entries.add(entry); - currentSize += entry.getEdit().heapSize(); - } else { - this.metrics.incrLogEditsFiltered(); - } - } - // Stop if too many entries or too big - if (currentSize >= this.replicationQueueSizeCapacity || - entries.size() >= this.replicationQueueNbCapacity) { - break; - } - try { - entry = this.repLogReader.readNextAndSetPosition(); - } catch (IOException ie) { - LOG.debug("Break on IOE: " + ie.getMessage()); - break; - } - } - metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead); - if (currentWALisBeingWrittenTo) { - return false; - } - // If we didn't get anything and the queue has an object, it means we - // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); - } - - /** - * Poll for the next path - * @return true if a path was obtained, false if not - */ - protected boolean getNextPath() { - try { - if (this.currentPath == null) { - this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); - this.metrics.setSizeOfLogQueue(queue.size()); - if (this.currentPath != null) { - this.manager.cleanOldLogs(this.currentPath.getName(), - this.peerId, - this.replicationQueueInfo.isQueueRecovered()); - if (LOG.isTraceEnabled()) { - LOG.trace("New log: " + this.currentPath); - } - } - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while reading edits", e); - } - return this.currentPath != null; - } - - /** - * Open a reader on the current path - * - * @param sleepMultiplier by how many times the default sleeping time is augmented - * @return true if we should continue with that file, false if we are over with it - */ - protected boolean openReader(int sleepMultiplier) { - try { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Opening log " + this.currentPath); - } - this.reader = repLogReader.openReader(this.currentPath); - } catch (FileNotFoundException fnfe) { - if (this.replicationQueueInfo.isQueueRecovered()) { - // We didn't find the log in the archive directory, look if it still - // exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path rootDir = FSUtils.getRootDir(this.conf); - for (String curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = new Path(rootDir, - DefaultWALProvider.getWALDirectoryName(curDeadServerName)); - Path[] locs = new Path[] { - new Path(deadRsDirectory, currentPath.getName()), - new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), - currentPath.getName()), - }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (this.manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + this.currentPath + " still exists at " + - possibleLogLocation); - // Breaking here will make us sleep since reader is null - // TODO why don't we need to set currentPath and call openReader here? - return true; - } - } - } - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - if (stopper instanceof ReplicationSyncUp.DummyServer) { - // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal - // area rather than to the wal area for a particular region server. - FileStatus[] rss = fs.listStatus(manager.getLogDir()); - for (FileStatus rs : rss) { - Path p = rs.getPath(); - FileStatus[] logs = fs.listStatus(p); - for (FileStatus log : logs) { - p = new Path(p, log.getPath().getName()); - if (p.getName().equals(currentPath.getName())) { - currentPath = p; - LOG.info("Log " + currentPath.getName() + " found at " + currentPath); - // Open the log at the new location - this.openReader(sleepMultiplier); - return true; - } - } - } - } - - // TODO What happens if the log was missing from every single location? - // Although we need to check a couple of times as the log could have - // been moved by the master between the checks - // It can also happen if a recovered queue wasn't properly cleaned, - // such that the znode pointing to a log exists but the log was - // deleted a long time ago. - // For the moment, we'll throw the IO and processEndOfFile - throw new IOException("File from recovered queue is " + - "nowhere to be found", fnfe); - } else { - // If the log was archived, continue reading from there - Path archivedLogLocation = - new Path(manager.getOldLogDir(), currentPath.getName()); - if (this.manager.getFs().exists(archivedLogLocation)) { - currentPath = archivedLogLocation; - LOG.info("Log " + this.currentPath + " was moved to " + - archivedLogLocation); - // Open the log at the new location - this.openReader(sleepMultiplier); - - } - // TODO What happens the log is missing in both places? - } - } - } catch (IOException ioe) { - if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; - LOG.warn(this.peerClusterZnode + " Got: ", ioe); - this.reader = null; - if (ioe.getCause() instanceof NullPointerException) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - } else if (sleepMultiplier == this.maxRetriesMultiplier) { - // TODO Need a better way to determine if a file is really gone but - // TODO without scanning all logs dir - LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); - } - } - return true; - } - - /* - * Checks whether the current log file is empty, and it is not a recovered queue. This is to - * handle scenario when in an idle cluster, there is no entry in the current log and we keep on - * trying to read the log file and get EOFException. In case of a recovered queue the last log - * file may be empty, and we don't want to retry that. - */ - private boolean isCurrentLogEmpty() { - return (this.repLogReader.getPosition() == 0 && - !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0); } /** @@ -635,100 +312,6 @@ public class ReplicationSource extends Thread return sleepMultiplier < maxRetriesMultiplier; } - /** - * Count the number of different row keys in the given edit because of - * mini-batching. We assume that there's at least one Cell in the WALEdit. - * @param edit edit to count row keys from - * @return number of different row keys - */ - private int countDistinctRowKeys(WALEdit edit) { - List cells = edit.getCells(); - int distinctRowKeys = 1; - Cell lastCell = cells.get(0); - for (int i = 0; i < edit.size(); i++) { - if (!CellUtil.matchingRow(cells.get(i), lastCell)) { - distinctRowKeys++; - } - } - return distinctRowKeys; - } - - /** - * Do the shipping logic - * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) - * written to when this method was called - */ - protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { - int sleepMultiplier = 0; - if (entries.isEmpty()) { - LOG.warn("Was given 0 edits to ship"); - return; - } - while (this.isActive()) { - try { - if (this.throttler.isEnabled()) { - long sleepTicks = this.throttler.getNextSleepInterval(currentSize); - if (sleepTicks > 0) { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); - } - Thread.sleep(sleepTicks); - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping for throttling control"); - Thread.currentThread().interrupt(); - // current thread might be interrupted to terminate - // directly go back to while() for confirm this - continue; - } - // reset throttler's cycle start tick when sleep for throttling occurs - this.throttler.resetStartTick(); - } - } - // create replicateContext here, so the entries can be GC'd upon return from this call stack - ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext(); - replicateContext.setEntries(entries).setSize(currentSize); - - long startTimeNs = System.nanoTime(); - // send the edits to the endpoint. Will block until the edits are shipped and acknowledged - boolean replicated = replicationEndpoint.replicate(replicateContext); - long endTimeNs = System.nanoTime(); - - if (!replicated) { - continue; - } else { - sleepMultiplier = Math.max(sleepMultiplier-1, 0); - } - - if (this.lastLoggedPosition != this.repLogReader.getPosition()) { - this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.repLogReader.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); - this.lastLoggedPosition = this.repLogReader.getPosition(); - } - if (this.throttler.isEnabled()) { - this.throttler.addPushSize(currentSize); - } - this.totalReplicatedEdits += entries.size(); - this.totalReplicatedOperations += currentNbOperations; - this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024); - this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); - if (LOG.isTraceEnabled()) { - LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " - + this.totalReplicatedOperations + " operations in " + - ((endTimeNs - startTimeNs)/1000000) + " ms"); - } - break; - } catch (Exception ex) { - LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + - org.apache.hadoop.util.StringUtils.stringifyException(ex)); - if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - } - /** * check whether the peer is enabled or not * @@ -738,53 +321,17 @@ public class ReplicationSource extends Thread return this.replicationPeers.getStatusOfPeer(this.peerId); } - /** - * If the queue isn't empty, switch to the next one - * Else if this is a recovered queue, it means we're done! - * Else we'll just continue to try reading the log file - * @return true if we're done with the current file, false if we should - * continue trying to read from it - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", - justification="Yeah, this is how it works") - protected boolean processEndOfFile() { - if (this.queue.size() != 0) { - if (LOG.isTraceEnabled()) { - String filesize = "N/A"; - try { - FileStatus stat = this.fs.getFileStatus(this.currentPath); - filesize = stat.getLen()+""; - } catch (IOException ex) {} - LOG.trace("Reached the end of a log, stats: " + getStats() + - ", and the length of the file is " + filesize); - } - this.currentPath = null; - this.repLogReader.finishCurrentFile(); - this.reader = null; - return true; - } else if (this.replicationQueueInfo.isQueueRecovered()) { - this.manager.closeRecoveredQueue(this); - LOG.info("Finished recovering the queue with the following stats " + getStats()); - this.running = false; - return true; - } - return false; - } - @Override public void startup() { String n = Thread.currentThread().getName(); - Thread.UncaughtExceptionHandler handler = - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.error("Unexpected exception in ReplicationSource," + - " currentPath=" + currentPath, e); - } - }; - Threads.setDaemonThreadRunning( - this, n + ".replicationSource," + - this.peerClusterZnode, handler); + Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Unexpected exception in ReplicationSource", e); + } + }; + Threads + .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler); } @Override @@ -806,14 +353,21 @@ public class ReplicationSource extends Thread LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, cause); } - this.running = false; - this.interrupt(); + this.sourceRunning = false; + Collection workers = workerThreads.values(); + for (ReplicationSourceWorkerThread worker : workers) { + worker.setWorkerRunning(false); + worker.interrupt(); + } ListenableFuture future = null; if (this.replicationEndpoint != null) { future = this.replicationEndpoint.stop(); } if (join) { - Threads.shutdown(this, this.sleepForRetries); + for (ReplicationSourceWorkerThread worker : workers) { + Threads.shutdown(worker, this.sleepForRetries); + LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); + } if (future != null) { try { future.get(); @@ -836,11 +390,15 @@ public class ReplicationSource extends Thread @Override public Path getCurrentPath() { - return this.currentPath; + // only for testing + for (ReplicationSourceWorkerThread worker : workerThreads.values()) { + if (worker.getCurrentPath() != null) return worker.getCurrentPath(); + } + return null; } - private boolean isActive() { - return !this.stopper.isStopped() && this.running && !isInterrupted(); + private boolean isSourceActive() { + return !this.stopper.isStopped() && this.sourceRunning; } /** @@ -867,10 +425,23 @@ public class ReplicationSource extends Thread @Override public String getStats() { - long position = this.repLogReader.getPosition(); - return "Total replicated edits: " + totalReplicatedEdits + - ", currently replicating from: " + this.currentPath + - " at position: " + position; + StringBuilder sb = new StringBuilder(); + sb.append("Total replicated edits: ").append(totalReplicatedEdits) + .append(", current progress: \n"); + for (Map.Entry entry : workerThreads.entrySet()) { + String walGroupId = entry.getKey(); + ReplicationSourceWorkerThread worker = entry.getValue(); + long position = worker.getCurrentPosition(); + Path currentPath = worker.getCurrentPath(); + sb.append("walGroup [").append(walGroupId).append("]: "); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(position).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + } + return sb.toString(); } /** @@ -880,4 +451,572 @@ public class ReplicationSource extends Thread public MetricsSource getSourceMetrics() { return this.metrics; } + + public class ReplicationSourceWorkerThread extends Thread { + private ReplicationSource source; + private String walGroupId; + private PriorityBlockingQueue queue; + private ReplicationQueueInfo replicationQueueInfo; + // Our reader for the current log. open/close handled by repLogReader + private WAL.Reader reader; + // Last position in the log that we sent to ZooKeeper + private long lastLoggedPosition = -1; + // Path of the current log + private volatile Path currentPath; + // Handle on the log reader helper + private ReplicationWALReaderManager repLogReader; + // Current number of operations (Put/Delete) that we need to replicate + private int currentNbOperations = 0; + // Current size of data we need to replicate + private int currentSize = 0; + // Indicates whether this particular worker is running + private boolean workerRunning = true; + + public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue queue, + ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { + this.walGroupId = walGroupId; + this.queue = queue; + this.replicationQueueInfo = replicationQueueInfo; + this.repLogReader = new ReplicationWALReaderManager(fs, conf); + this.source = source; + } + + @Override + public void run() { + // If this is recovered, the queue is already full and the first log + // normally has a position (unless the RS failed between 2 logs) + if (this.replicationQueueInfo.isQueueRecovered()) { + try { + this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode, + this.queue.peek().getName())); + if (LOG.isTraceEnabled()) { + LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + + this.repLogReader.getPosition()); + } + } catch (ReplicationException e) { + terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); + } + } + // Loop until we close down + while (isWorkerActive()) { + int sleepMultiplier = 1; + // Sleep until replication is enabled again + if (!isPeerEnabled()) { + if (sleepForRetries("Replication is disabled", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + Path oldPath = getCurrentPath(); //note that in the current scenario, + //oldPath will be null when a log roll + //happens. + // Get a new path + boolean hasCurrentPath = getNextPath(); + if (getCurrentPath() != null && oldPath == null) { + sleepMultiplier = 1; //reset the sleepMultiplier on a path change + } + if (!hasCurrentPath) { + if (sleepForRetries("No log to process", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + boolean currentWALisBeingWrittenTo = false; + //For WAL files we own (rather than recovered), take a snapshot of whether the + //current WAL file (this.currentPath) is in use (for writing) NOW! + //Since the new WAL paths are enqueued only after the prev WAL file + //is 'closed', presence of an element in the queue means that + //the previous WAL file was closed, else the file is in use (currentPath) + //We take the snapshot now so that we are protected against races + //where a new file gets enqueued while the current file is being processed + //(and where we just finished reading the current file). + if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) { + currentWALisBeingWrittenTo = true; + } + // Open a reader on it + if (!openReader(sleepMultiplier)) { + // Reset the sleep multiplier, else it'd be reused for the next file + sleepMultiplier = 1; + continue; + } + + // If we got a null reader but didn't continue, then sleep and continue + if (this.reader == null) { + if (sleepForRetries("Unable to open a reader", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + + boolean gotIOE = false; + currentNbOperations = 0; + List entries = new ArrayList(1); + currentSize = 0; + try { + if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) { + continue; + } + } catch (IOException ioe) { + LOG.warn(peerClusterZnode + " Got: ", ioe); + gotIOE = true; + if (ioe.getCause() instanceof EOFException) { + + boolean considerDumping = false; + if (this.replicationQueueInfo.isQueueRecovered()) { + try { + FileStatus stat = fs.getFileStatus(this.currentPath); + if (stat.getLen() == 0) { + LOG.warn(peerClusterZnode + " Got EOF and the file was empty"); + } + considerDumping = true; + } catch (IOException e) { + LOG.warn(peerClusterZnode + " Got while getting file size: ", e); + } + } + + if (considerDumping && + sleepMultiplier == maxRetriesMultiplier && + processEndOfFile()) { + continue; + } + } + } finally { + try { + this.reader = null; + this.repLogReader.closeReader(); + } catch (IOException e) { + gotIOE = true; + LOG.warn("Unable to finalize the tailing of a file", e); + } + } + + // If we didn't get anything to replicate, or if we hit a IOE, + // wait a bit and retry. + // But if we need to stop, don't bother sleeping + if (isWorkerActive() && (gotIOE || entries.isEmpty())) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { + manager.logPositionAndCleanOldLogs(this.currentPath, + peerClusterZnode, this.repLogReader.getPosition(), + this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); + this.lastLoggedPosition = this.repLogReader.getPosition(); + } + // Reset the sleep multiplier if nothing has actually gone wrong + if (!gotIOE) { + sleepMultiplier = 1; + // if there was nothing to ship and it's not an error + // set "ageOfLastShippedOp" to to indicate that we're current + metrics.setAgeOfLastShippedOp(System.currentTimeMillis(), walGroupId); + } + if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + sleepMultiplier = 1; + shipEdits(currentWALisBeingWrittenTo, entries); + } + if (replicationQueueInfo.isQueueRecovered()) { + // use synchronize to make sure one last thread will clean the queue + synchronized (workerThreads) { + Threads.sleep(100);// wait a short while for other worker thread to fully exit + boolean allOtherTaskDone = true; + for (ReplicationSourceWorkerThread worker : workerThreads.values()) { + if (!worker.equals(this) && worker.isAlive()) { + allOtherTaskDone = false; + break; + } + } + if (allOtherTaskDone) { + manager.closeRecoveredQueue(this.source); + LOG.info("Finished recovering queue " + peerClusterZnode + + " with the following stats: " + getStats()); + } + } + } + } + + /** + * Read all the entries from the current log files and retain those that need to be replicated. + * Else, process the end of the current file. + * @param currentWALisBeingWrittenTo is the current WAL being written to + * @param entries resulting entries to be replicated + * @return true if we got nothing and went to the next file, false if we got entries + * @throws IOException + */ + protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, + List entries) throws IOException { + long seenEntries = 0; + if (LOG.isTraceEnabled()) { + LOG.trace("Seeking in " + this.currentPath + " at position " + + this.repLogReader.getPosition()); + } + this.repLogReader.seek(); + long positionBeforeRead = this.repLogReader.getPosition(); + WAL.Entry entry = this.repLogReader.readNextAndSetPosition(); + while (entry != null) { + metrics.incrLogEditsRead(); + seenEntries++; + + // don't replicate if the log entries have already been consumed by the cluster + if (replicationEndpoint.canReplicateToSameCluster() + || !entry.getKey().getClusterIds().contains(peerClusterId)) { + // Remove all KVs that should not be replicated + entry = walEntryFilter.filter(entry); + WALEdit edit = null; + WALKey logKey = null; + if (entry != null) { + edit = entry.getEdit(); + logKey = entry.getKey(); + } + + if (edit != null && edit.size() != 0) { + // Mark that the current cluster has the change + logKey.addClusterId(clusterId); + currentNbOperations += countDistinctRowKeys(edit); + entries.add(entry); + currentSize += entry.getEdit().heapSize(); + } else { + metrics.incrLogEditsFiltered(); + } + } + // Stop if too many entries or too big + // FIXME check the relationship between single wal group and overall + if (currentSize >= replicationQueueSizeCapacity + || entries.size() >= replicationQueueNbCapacity) { + break; + } + try { + entry = this.repLogReader.readNextAndSetPosition(); + } catch (IOException ie) { + LOG.debug("Break on IOE: " + ie.getMessage()); + break; + } + } + metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead); + if (currentWALisBeingWrittenTo) { + return false; + } + // If we didn't get anything and the queue has an object, it means we + // hit the end of the file for sure + return seenEntries == 0 && processEndOfFile(); + } + + /** + * Poll for the next path + * @return true if a path was obtained, false if not + */ + protected boolean getNextPath() { + try { + if (this.currentPath == null) { + this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS); + int queueSize = logQueueSize.decrementAndGet(); + metrics.setSizeOfLogQueue(queueSize); + if (this.currentPath != null) { + // For recovered queue: must use peerClusterZnode since peerId is a parsed value + manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode, + this.replicationQueueInfo.isQueueRecovered()); + if (LOG.isTraceEnabled()) { + LOG.trace("New log: " + this.currentPath); + } + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while reading edits", e); + } + return this.currentPath != null; + } + + /** + * Open a reader on the current path + * + * @param sleepMultiplier by how many times the default sleeping time is augmented + * @return true if we should continue with that file, false if we are over with it + */ + protected boolean openReader(int sleepMultiplier) { + + try { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Opening log " + this.currentPath); + } + this.reader = repLogReader.openReader(this.currentPath); + } catch (FileNotFoundException fnfe) { + if (this.replicationQueueInfo.isQueueRecovered()) { + // We didn't find the log in the archive directory, look if it still + // exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path rootDir = FSUtils.getRootDir(conf); + for (String curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = new Path(rootDir, + DefaultWALProvider.getWALDirectoryName(curDeadServerName)); + Path[] locs = new Path[] { + new Path(deadRsDirectory, currentPath.getName()), + new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), + currentPath.getName()), + }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (manager.getFs().exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + this.currentPath + " still exists at " + + possibleLogLocation); + // Breaking here will make us sleep since reader is null + // TODO why don't we need to set currentPath and call openReader here? + return true; + } + } + } + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + if (stopper instanceof ReplicationSyncUp.DummyServer) { + // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal + // area rather than to the wal area for a particular region server. + FileStatus[] rss = fs.listStatus(manager.getLogDir()); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(currentPath.getName())) { + currentPath = p; + LOG.info("Log " + currentPath.getName() + " found at " + currentPath); + // Open the log at the new location + this.openReader(sleepMultiplier); + return true; + } + } + } + } + + // TODO What happens if the log was missing from every single location? + // Although we need to check a couple of times as the log could have + // been moved by the master between the checks + // It can also happen if a recovered queue wasn't properly cleaned, + // such that the znode pointing to a log exists but the log was + // deleted a long time ago. + // For the moment, we'll throw the IO and processEndOfFile + throw new IOException("File from recovered queue is " + + "nowhere to be found", fnfe); + } else { + // If the log was archived, continue reading from there + Path archivedLogLocation = + new Path(manager.getOldLogDir(), currentPath.getName()); + if (manager.getFs().exists(archivedLogLocation)) { + currentPath = archivedLogLocation; + LOG.info("Log " + this.currentPath + " was moved to " + + archivedLogLocation); + // Open the log at the new location + this.openReader(sleepMultiplier); + + } + // TODO What happens the log is missing in both places? + } + } + } catch (IOException ioe) { + if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; + LOG.warn(peerClusterZnode + " Got: ", ioe); + this.reader = null; + if (ioe.getCause() instanceof NullPointerException) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + } else if (sleepMultiplier == maxRetriesMultiplier) { + // TODO Need a better way to determine if a file is really gone but + // TODO without scanning all logs dir + LOG.warn("Waited too long for this file, considering dumping"); + return !processEndOfFile(); + } + } + return true; + } + + /* + * Checks whether the current log file is empty, and it is not a recovered queue. This is to + * handle scenario when in an idle cluster, there is no entry in the current log and we keep on + * trying to read the log file and get EOFException. In case of a recovered queue the last log + * file may be empty, and we don't want to retry that. + */ + private boolean isCurrentLogEmpty() { + return (this.repLogReader.getPosition() == 0 && + !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0); + } + + /** + * Count the number of different row keys in the given edit because of mini-batching. We assume + * that there's at least one Cell in the WALEdit. + * @param edit edit to count row keys from + * @return number of different row keys + */ + private int countDistinctRowKeys(WALEdit edit) { + List cells = edit.getCells(); + int distinctRowKeys = 1; + Cell lastCell = cells.get(0); + for (int i = 0; i < edit.size(); i++) { + if (!CellUtil.matchingRow(cells.get(i), lastCell)) { + distinctRowKeys++; + } + } + return distinctRowKeys; + } + + /** + * Do the shipping logic + * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) + * written to when this method was called + */ + protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { + int sleepMultiplier = 0; + if (entries.isEmpty()) { + LOG.warn("Was given 0 edits to ship"); + return; + } + while (isWorkerActive()) { + try { + if (throttler.isEnabled()) { + long sleepTicks = throttler.getNextSleepInterval(currentSize); + if (sleepTicks > 0) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); + } + Thread.sleep(sleepTicks); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping for throttling control"); + Thread.currentThread().interrupt(); + // current thread might be interrupted to terminate + // directly go back to while() for confirm this + continue; + } + // reset throttler's cycle start tick when sleep for throttling occurs + throttler.resetStartTick(); + } + } + // create replicateContext here, so the entries can be GC'd upon return from this call + // stack + ReplicationEndpoint.ReplicateContext replicateContext = + new ReplicationEndpoint.ReplicateContext(); + replicateContext.setEntries(entries).setSize(currentSize); + replicateContext.setWalGroupId(walGroupId); + + long startTimeNs = System.nanoTime(); + // send the edits to the endpoint. Will block until the edits are shipped and acknowledged + boolean replicated = replicationEndpoint.replicate(replicateContext); + long endTimeNs = System.nanoTime(); + + if (!replicated) { + continue; + } else { + sleepMultiplier = Math.max(sleepMultiplier - 1, 0); + } + + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { + manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, + this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), + currentWALisBeingWrittenTo); + this.lastLoggedPosition = this.repLogReader.getPosition(); + } + if (throttler.isEnabled()) { + throttler.addPushSize(currentSize); + } + totalReplicatedEdits.addAndGet(entries.size()); + totalReplicatedOperations.addAndGet(currentNbOperations); + // FIXME check relationship between wal group and overall + metrics.shipBatch(currentNbOperations, currentSize / 1024); + metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), + walGroupId); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or " + + totalReplicatedOperations + " operations in " + + ((endTimeNs - startTimeNs) / 1000000) + " ms"); + } + break; + } catch (Exception ex) { + LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + + org.apache.hadoop.util.StringUtils.stringifyException(ex)); + if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + } + + /** + * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means + * we're done! Else we'll just continue to try reading the log file + * @return true if we're done with the current file, false if we should continue trying to read + * from it + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", + justification = "Yeah, this is how it works") + protected boolean processEndOfFile() { + if (this.queue.size() != 0) { + if (LOG.isTraceEnabled()) { + String filesize = "N/A"; + try { + FileStatus stat = fs.getFileStatus(this.currentPath); + filesize = stat.getLen() + ""; + } catch (IOException ex) { + } + LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() + + ", and the length of the file is " + filesize); + } + this.currentPath = null; + this.repLogReader.finishCurrentFile(); + this.reader = null; + return true; + } else if (this.replicationQueueInfo.isQueueRecovered()) { + LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + + peerClusterZnode); + workerRunning = false; + return true; + } + return false; + } + + public void startup() { + String n = Thread.currentThread().getName(); + Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath=" + + getCurrentPath(), e); + } + }; + Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + "," + + peerClusterZnode, handler); + workerThreads.put(walGroupId, this); + } + + public Path getCurrentPath() { + return this.currentPath; + } + + public long getCurrentPosition() { + return this.repLogReader.getPosition(); + } + + private boolean isWorkerActive() { + return !stopper.isStopped() && workerRunning && !isInterrupted(); + } + + private void terminate(String reason, Exception cause) { + if (cause == null) { + LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); + + } else { + LOG.error("Closing worker for wal group " + this.walGroupId + + " because an error occurred: " + reason, cause); + } + this.interrupt(); + Threads.shutdown(this, sleepForRetries); + LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); + } + + public void setWorkerRunning(boolean workerRunning) { + this.workerRunning = workerRunning; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0c8f6f9f40f..a8cffbad9af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -23,9 +23,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener { // All about stopping private final Server server; // All logs we are currently tracking - private final Map> walsById; + // Index structure of the map is: peer_id->logPrefix/logGroup->logs + private final Map>> walsById; // Logs for recovered sources we are currently tracking - private final Map> walsByIdRecoveredQueues; + private final Map>> walsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; - // The path to the latest log we saw, for new coming sources - private Path latestPath; + // The paths to the latest log of each wal group, for new coming peers + private Set latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -133,8 +138,8 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; - this.walsById = new HashMap>(); - this.walsByIdRecoveredQueues = new ConcurrentHashMap>(); + this.walsById = new HashMap>>(); + this.walsByIdRecoveredQueues = new ConcurrentHashMap>>(); this.oldsources = new CopyOnWriteArrayList(); this.conf = conf; this.fs = fs; @@ -158,6 +163,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); + this.latestPaths = Collections.synchronizedSet(new HashSet()); } /** @@ -189,15 +195,16 @@ public class ReplicationSourceManager implements ReplicationListener { * @param queueRecovered Whether this is a recovered queue */ public void cleanOldLogs(String key, String id, boolean queueRecovered) { + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); if (queueRecovered) { - SortedSet wals = walsByIdRecoveredQueues.get(id); + SortedSet wals = walsByIdRecoveredQueues.get(id).get(logPrefix); if (wals != null && !wals.first().equals(key)) { cleanOldLogs(wals, key, id); } } else { synchronized (this.walsById) { - SortedSet wals = walsById.get(id); - if (!wals.first().equals(key)) { + SortedSet wals = walsById.get(id).get(logPrefix); + if (wals != null && !wals.first().equals(key)) { cleanOldLogs(wals, key, id); } } @@ -238,36 +245,44 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Add a new normal source to this region server + * Add sources for the given peer cluster on this region server. For the newly added peer, we only + * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig - = replicationPeers.getReplicationPeerConfig(id); + ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); - this.walsById.put(id, new TreeSet()); + Map> walsByGroup = new HashMap>(); + this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue - if (this.latestPath != null) { - String name = this.latestPath.getName(); - this.walsById.get(id).add(name); - try { - this.replicationQueues.addLog(src.getPeerClusterZnode(), name); - } catch (ReplicationException e) { - String message = - "Cannot add log to queue when creating a new source, queueId=" - + src.getPeerClusterZnode() + ", filename=" + name; - server.stop(message); - throw e; + synchronized (latestPaths) { + if (this.latestPaths.size() > 0) { + for (Path logPath : latestPaths) { + String name = logPath.getName(); + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); + SortedSet logs = new TreeSet(); + logs.add(name); + walsByGroup.put(walPrefix, logs); + try { + this.replicationQueues.addLog(id, name); + } catch (ReplicationException e) { + String message = + "Cannot add log to queue when creating a new source, queueId=" + id + + ", filename=" + name; + server.stop(message); + throw e; + } + src.enqueueLog(logPath); + } } - src.enqueueLog(this.latestPath); } } src.startup(); @@ -302,7 +317,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the first source on this rs * @return a sorted set of wal names */ - protected Map> getWALs() { + protected Map>> getWALs() { return Collections.unmodifiableMap(walsById); } @@ -310,7 +325,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the recovered sources on this rs * @return a sorted set of wal names */ - protected Map> getWalsByIdRecoveredQueues() { + protected Map>> getWalsByIdRecoveredQueues() { return Collections.unmodifiableMap(walsByIdRecoveredQueues); } @@ -331,27 +346,70 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - synchronized (this.walsById) { - String name = newLog.getName(); - for (ReplicationSourceInterface source : this.sources) { - try { - this.replicationQueues.addLog(source.getPeerClusterZnode(), name); - } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue with id=" - + source.getPeerClusterZnode() + ", filename=" + name, e); + recordLog(newLog); + String logName = newLog.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + synchronized (latestPaths) { + Iterator iterator = latestPaths.iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + if (path.getName().contains(logPrefix)) { + iterator.remove(); + break; } } - for (SortedSet wals : this.walsById.values()) { - if (this.sources.isEmpty()) { - // If there's no slaves, don't need to keep the old wals since - // we only consider the last one when a new slave comes in - wals.clear(); + this.latestPaths.add(newLog); + } + } + + /** + * Check and enqueue the given log to the correct source. If there's still no source for the + * group to which the given log belongs, create one + * @param logPath the log path to check and enqueue + * @throws IOException + */ + private void recordLog(Path logPath) throws IOException { + String logName = logPath.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + // update replication queues on ZK + synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for + // the to-be-removed peer + for (String id : replicationPeers.getPeerIds()) { + try { + this.replicationQueues.addLog(id, logName); + } catch (ReplicationException e) { + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); + } + } + } + // update walsById map + synchronized (walsById) { + for (Map.Entry>> entry : this.walsById.entrySet()) { + String peerId = entry.getKey(); + Map> walsByPrefix = entry.getValue(); + boolean existingPrefix = false; + for (Map.Entry> walsEntry : walsByPrefix.entrySet()) { + SortedSet wals = walsEntry.getValue(); + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old wals since + // we only consider the last one when a new slave comes in + wals.clear(); + } + if (logPrefix.equals(walsEntry.getKey())) { + wals.add(logName); + existingPrefix = true; + } + } + if (!existingPrefix) { + // The new log belongs to a new group, add it into this peer + LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); + SortedSet wals = new TreeSet(); + wals.add(logName); + walsByPrefix.put(logPrefix, wals); } - wals.add(name); } } - - this.latestPath = newLog; } void postLogRoll(Path newLog) throws IOException { @@ -376,7 +434,7 @@ public class ReplicationSourceManager implements ReplicationListener { final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Server server, final String peerId, final UUID clusterId, final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) - throws IOException { + throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -414,7 +472,8 @@ public class ReplicationSourceManager implements ReplicationListener { } } } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors", e); + LOG.warn("Passed replication endpoint implementation throws errors" + + " while initializing ReplicationSource for peer: " + peerId, e); throw new IOException(e); } @@ -470,7 +529,7 @@ public class ReplicationSourceManager implements ReplicationListener { + sources.size() + " and another " + oldsources.size() + " that were recovered"); String terminateMessage = "Replication stream was removed by a user"; - ReplicationSourceInterface srcToRemove = null; + List srcToRemove = new ArrayList(); List oldSourcesToDelete = new ArrayList(); // First close all the recovered sources for this peer @@ -486,19 +545,23 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()); // Now look for the one on this cluster - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerClusterId())) { - srcToRemove = src; - break; + synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source + // for the to-be-removed peer + for (ReplicationSourceInterface src : this.sources) { + if (id.equals(src.getPeerClusterId())) { + srcToRemove.add(src); + } } + if (srcToRemove.size() == 0) { + LOG.error("The queue we wanted to close is missing " + id); + return; + } + for (ReplicationSourceInterface toRemove : srcToRemove) { + toRemove.terminate(terminateMessage); + this.sources.remove(toRemove); + } + deleteSource(id, true); } - if (srcToRemove == null) { - LOG.error("The queue we wanted to close is missing " + id); - return; - } - srcToRemove.terminate(terminateMessage); - this.sources.remove(srcToRemove); - deleteSource(id, true); } @Override @@ -580,6 +643,7 @@ public class ReplicationSourceManager implements ReplicationListener { for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); + SortedSet walsSet = entry.getValue(); try { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); @@ -596,7 +660,20 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); continue; } + // track sources in walsByIdRecoveredQueues + Map> walsByGroup = new HashMap>(); + walsByIdRecoveredQueues.put(peerId, walsByGroup); + for (String wal : walsSet) { + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal); + SortedSet wals = walsByGroup.get(walPrefix); + if (wals == null) { + wals = new TreeSet(); + walsByGroup.put(walPrefix, wals); + } + wals.add(wal); + } + // enqueue sources ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, server, peerId, this.clusterId, peerConfig, peer); @@ -605,12 +682,10 @@ public class ReplicationSourceManager implements ReplicationListener { break; } oldsources.add(src); - SortedSet walsSet = entry.getValue(); for (String wal : walsSet) { src.enqueueLog(new Path(oldLogDir, wal)); } src.startup(); - walsByIdRecoveredQueues.put(peerId, walsSet); } catch (IOException e) { // TODO manage it LOG.error("Failed creating a source", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index 44a033be028..661016d13e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -366,4 +366,15 @@ public class DefaultWALProvider implements WALProvider { } } + /** + * Get prefix of the log from its name, assuming WAL name in format of + * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()} + * @param name Name of the WAL to parse + * @return prefix of the log + */ + public static String getWALPrefixFromWALName(String name) { + int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); + return name.substring(0, endIndex); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 4ef320a49ec..e44a4d1cac1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -84,7 +84,7 @@ public class WALFactory { } } - static final String WAL_PROVIDER = "hbase.wal.provider"; + public static final String WAL_PROVIDER = "hbase.wal.provider"; static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 49ca4869c2b..03d8cb58398 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -73,15 +73,40 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Before - public void setup() throws FailedLogCloseException, IOException { + public void setup() throws Exception { ReplicationEndpointForTest.contructedCount.set(0); ReplicationEndpointForTest.startedCount.set(0); ReplicationEndpointForTest.replicateCount.set(0); ReplicationEndpointReturningFalse.replicated.set(false); ReplicationEndpointForTest.lastEntries = null; - for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { + final List rsThreads = + utility1.getMiniHBaseCluster().getRegionServerThreads(); + for (RegionServerThread rs : rsThreads) { utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } + // Wait for all log roll to finish + utility1.waitFor(3000, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + for (RegionServerThread rs : rsThreads) { + if (!rs.getRegionServer().walRollRequestFinished()) { + return false; + } + } + return true; + } + + @Override + public String explainFailure() throws Exception { + List logRollInProgressRsList = new ArrayList(); + for (RegionServerThread rs : rsThreads) { + if (!rs.getRegionServer().walRollRequestFinished()) { + logRollInProgressRsList.add(rs.getRegionServer().toString()); + } + } + return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; + } + }); } @Test (timeout=120000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java new file mode 100644 index 00000000000..c4da4a3bf21 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpoint { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + TestReplicationEndpoint.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java new file mode 100644 index 00000000000..bbe17b2aa47 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends + TestReplicationKillMasterRSCompressed { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + TestReplicationKillMasterRSCompressed.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java new file mode 100644 index 00000000000..64b79198336 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.multiwal; + +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(WALFactory.WAL_PROVIDER, "multiwal"); + TestReplicationBase.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 0a82161ad14..f8f71a495a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -260,7 +260,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster { Assert.assertEquals(1000, entries.size()); // replay the edits to the secondary using replay callable - replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))); + final String fakeWalGroupId = "fakeWALGroup"; + replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)) + .setWalGroupId(fakeWalGroupId)); Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); HTU.verifyNumericRows(region, f, 0, 1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index bb8e7bdcc35..d6fb401d850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -25,6 +25,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -228,7 +229,11 @@ public class TestReplicationSourceManager { } wal.sync(); - assertEquals(6, manager.getWALs().get(slaveId).size()); + int logNumber = 0; + for (Map.Entry> entry : manager.getWALs().get(slaveId).entrySet()) { + logNumber += entry.getValue().size(); + } + assertEquals(6, logNumber); wal.rollWriter(); @@ -297,8 +302,11 @@ public class TestReplicationSourceManager { rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet files = new TreeSet(); - files.add("log1"); - files.add("log2"); + String group = "testgroup"; + String file1 = group + ".log1"; + String file2 = group + ".log2"; + files.add(file1); + files.add(file2); for (String file : files) { rq.addLog("1", file); } @@ -316,10 +324,10 @@ public class TestReplicationSourceManager { w1.join(5000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); - assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id)); - manager.cleanOldLogs("log2", id, true); + assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); + manager.cleanOldLogs(file2, id, true); // log1 should be deleted - assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id)); + assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index bec6d4530a8..7f57b39e182 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -39,8 +39,10 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager { private int nbRows; private int walEditKVs; private final AtomicLong sequenceId = new AtomicLong(1); + @Rule public TestName tn = new TestName(); @Parameters public static Collection parameters() { @@ -127,7 +130,7 @@ public class TestReplicationWALReaderManager { List listeners = new ArrayList(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, "some server"); + final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); log = wals.getWAL(info.getEncodedNameAsBytes()); }