diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java new file mode 100644 index 00000000000..7cd3fedc8d9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java @@ -0,0 +1,70 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.UUID; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WAL.Entry; + + +/** + * Filters out entries with our peerClusterId (i.e. already replicated) + * and marks all other entries with our clusterID + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +@InterfaceStability.Evolving +public class ClusterMarkingEntryFilter implements WALEntryFilter { + private UUID clusterId; + private UUID peerClusterId; + private ReplicationEndpoint replicationEndpoint; + + /** + * @param clusterId id of this cluster + * @param peerClusterId of the other cluster + * @param replicationEndpoint ReplicationEndpoint which will handle the actual replication + */ + public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) { + this.clusterId = clusterId; + this.peerClusterId = peerClusterId; + this.replicationEndpoint = replicationEndpoint; + } + @Override + public Entry filter(Entry entry) { + // don't replicate if the log entries have already been consumed by the cluster + if (replicationEndpoint.canReplicateToSameCluster() + || !entry.getKey().getClusterIds().contains(peerClusterId)) { + WALEdit edit = entry.getEdit(); + WALKey logKey = entry.getKey(); + + if (edit != null && !edit.isEmpty()) { + // Mark that the current cluster has the change + logKey.addClusterId(clusterId); + // We need to set the CC to null else it will be compressed when sent to the sink + entry.setCompressionContext(null); + return entry; + } + } + return null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index e48bda30295..124c3401413 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -25,8 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; -import java.io.EOFException; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; +import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -65,19 +64,16 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Class that handles the source of a replication stream. @@ -92,8 +88,7 @@ import org.apache.hadoop.hbase.wal.WALKey; * */ @InterfaceAudience.Private -public class ReplicationSource extends Thread - implements ReplicationSourceInterface { +public class ReplicationSource extends Thread implements ReplicationSourceInterface { private static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, @@ -117,10 +112,6 @@ public class ReplicationSource extends Thread private Stoppable stopper; // How long should we sleep for each retry private long sleepForRetries; - // Max size in bytes of entriesArray - private long replicationQueueSizeCapacity; - // Max number of entries in entriesArray - private int replicationQueueNbCapacity; private FileSystem fs; // id of this cluster private UUID clusterId; @@ -148,11 +139,10 @@ public class ReplicationSource extends Thread private ReplicationThrottler throttler; private long defaultBandwidth; private long currentBandwidth; - private ConcurrentHashMap workerThreads = - new ConcurrentHashMap(); + private ConcurrentHashMap workerThreads = + new ConcurrentHashMap(); private AtomicLong totalBufferUsed; - private long totalBufferQuota; /** * Instantiation method used by region servers @@ -177,10 +167,6 @@ public class ReplicationSource extends Thread this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); decorateConf(); - this.replicationQueueSizeCapacity = - this.conf.getLong("replication.source.size.capacity", 1024*1024*64); - this.replicationQueueNbCapacity = - this.conf.getInt("replication.source.nb.capacity", 25000); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -206,12 +192,8 @@ public class ReplicationSource extends Thread currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); - this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId - + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity - + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" - + this.currentBandwidth); + + ", currentBandwidth=" + this.currentBandwidth); } private void decorateConf() { @@ -232,9 +214,9 @@ public class ReplicationSource extends Thread // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker - final ReplicationSourceWorkerThread worker = - new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this); - ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker); + final ReplicationSourceShipperThread worker = + new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this); + ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix); } else { @@ -341,9 +323,9 @@ public class ReplicationSource extends Thread for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue queue = entry.getValue(); - final ReplicationSourceWorkerThread worker = - new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this); - ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker); + final ReplicationSourceShipperThread worker = + new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this); + ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { @@ -414,9 +396,10 @@ public class ReplicationSource extends Thread + " because an error occurred: " + reason, cause); } this.sourceRunning = false; - Collection workers = workerThreads.values(); - for (ReplicationSourceWorkerThread worker : workers) { + Collection workers = workerThreads.values(); + for (ReplicationSourceShipperThread worker : workers) { worker.setWorkerRunning(false); + worker.entryReader.interrupt(); worker.interrupt(); } ListenableFuture future = null; @@ -424,7 +407,7 @@ public class ReplicationSource extends Thread future = this.replicationEndpoint.stop(); } if (join) { - for (ReplicationSourceWorkerThread worker : workers) { + for (ReplicationSourceShipperThread worker : workers) { Threads.shutdown(worker, this.sleepForRetries); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); } @@ -453,7 +436,7 @@ public class ReplicationSource extends Thread @Override public Path getCurrentPath() { // only for testing - for (ReplicationSourceWorkerThread worker : workerThreads.values()) { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { if (worker.getCurrentPath() != null) return worker.getCurrentPath(); } return null; @@ -490,9 +473,9 @@ public class ReplicationSource extends Thread StringBuilder sb = new StringBuilder(); sb.append("Total replicated edits: ").append(totalReplicatedEdits) .append(", current progress: \n"); - for (Map.Entry entry : workerThreads.entrySet()) { + for (Map.Entry entry : workerThreads.entrySet()) { String walGroupId = entry.getKey(); - ReplicationSourceWorkerThread worker = entry.getValue(); + ReplicationSourceShipperThread worker = entry.getValue(); long position = worker.getCurrentPosition(); Path currentPath = worker.getCurrentPath(); sb.append("walGroup [").append(walGroupId).append("]: "); @@ -521,28 +504,20 @@ public class ReplicationSource extends Thread return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } - public class ReplicationSourceWorkerThread extends Thread { - ReplicationSource source; + // This thread reads entries from a queue and ships them. + // Entries are placed onto the queue by ReplicationSourceWALReaderThread + public class ReplicationSourceShipperThread extends Thread { + ReplicationSourceInterface source; String walGroupId; PriorityBlockingQueue queue; ReplicationQueueInfo replicationQueueInfo; - // Our reader for the current log. open/close handled by repLogReader - private WAL.Reader reader; // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log private volatile Path currentPath; - // Handle on the log reader helper - private ReplicationWALReaderManager repLogReader; - // Current number of operations (Put/Delete) that we need to replicate - private int currentNbOperations = 0; - // Current size of data we need to replicate - private int currentSize = 0; // Indicates whether this particular worker is running private boolean workerRunning = true; - // Current number of hfiles that we need to replicate - private long currentNbHFiles = 0; - List entries; + ReplicationSourceWALReaderThread entryReader; // Use guava cache to set ttl for each key private LoadingCache canSkipWaitingSet = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.DAYS).build( @@ -554,33 +529,17 @@ public class ReplicationSource extends Thread } ); - public ReplicationSourceWorkerThread(String walGroupId, + public ReplicationSourceShipperThread(String walGroupId, PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, - ReplicationSource source) { + ReplicationSourceInterface source) { this.walGroupId = walGroupId; this.queue = queue; this.replicationQueueInfo = replicationQueueInfo; - this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; - this.entries = new ArrayList<>(); } @Override public void run() { - // If this is recovered, the queue is already full and the first log - // normally has a position (unless the RS failed between 2 logs) - if (this.replicationQueueInfo.isQueueRecovered()) { - try { - this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode, - this.queue.peek().getName())); - if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " - + this.repLogReader.getPosition()); - } - } catch (ReplicationException e) { - terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); - } - } // Loop until we close down while (isWorkerActive()) { int sleepMultiplier = 1; @@ -591,150 +550,43 @@ public class ReplicationSource extends Thread } continue; } - Path oldPath = getCurrentPath(); //note that in the current scenario, - //oldPath will be null when a log roll - //happens. - // Get a new path - boolean hasCurrentPath = getNextPath(); - if (getCurrentPath() != null && oldPath == null) { - sleepMultiplier = 1; //reset the sleepMultiplier on a path change - } - if (!hasCurrentPath) { - if (sleepForRetries("No log to process", sleepMultiplier)) { + while (entryReader == null) { + if (sleepForRetries("Replication WAL entry reader thread not initialized", + sleepMultiplier)) { sleepMultiplier++; } - continue; - } - boolean currentWALisBeingWrittenTo = false; - //For WAL files we own (rather than recovered), take a snapshot of whether the - //current WAL file (this.currentPath) is in use (for writing) NOW! - //Since the new WAL paths are enqueued only after the prev WAL file - //is 'closed', presence of an element in the queue means that - //the previous WAL file was closed, else the file is in use (currentPath) - //We take the snapshot now so that we are protected against races - //where a new file gets enqueued while the current file is being processed - //(and where we just finished reading the current file). - if (!this.replicationQueueInfo.isQueueRecovered() && queue.isEmpty()) { - currentWALisBeingWrittenTo = true; - } - // Open a reader on it - if (!openReader(sleepMultiplier)) { - // Reset the sleep multiplier, else it'd be reused for the next file - sleepMultiplier = 1; - continue; - } - - // If we got a null reader but didn't continue, then sleep and continue - if (this.reader == null) { - if (sleepForRetries("Unable to open a reader", sleepMultiplier)) { - sleepMultiplier++; + if (sleepMultiplier == maxRetriesMultiplier) { + LOG.warn("Replication WAL entry reader thread not initialized"); } - continue; } - boolean gotIOE = false; - currentNbOperations = 0; - currentNbHFiles = 0; - entries.clear(); - Map lastPositionsForSerialScope = new HashMap<>(); - currentSize = 0; try { - if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries, - lastPositionsForSerialScope)) { - for (Map.Entry entry : lastPositionsForSerialScope.entrySet()) { - waitingUntilCanPush(entry); - } - try { - MetaTableAccessor - .updateReplicationPositions(manager.getConnection(), actualPeerId, - lastPositionsForSerialScope); - } catch (IOException e) { - LOG.error("updateReplicationPositions fail", e); - stopper.stop("updateReplicationPositions fail"); - } - + WALEntryBatch entryBatch = entryReader.take(); + for (Map.Entry entry : entryBatch.getLastSeqIds().entrySet()) { + waitingUntilCanPush(entry); + } + shipEdits(entryBatch); + releaseBufferQuota((int) entryBatch.getHeapSize()); + if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() + && entryBatch.getLastSeqIds().isEmpty()) { + LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + + peerClusterZnode); + metrics.incrCompletedRecoveryQueue(); + setWorkerRunning(false); continue; } - } catch (IOException ioe) { - LOG.warn(peerClusterZnode + " Got: ", ioe); - gotIOE = true; - if (ioe.getCause() instanceof EOFException) { - - boolean considerDumping = false; - if (this.replicationQueueInfo.isQueueRecovered()) { - try { - FileStatus stat = fs.getFileStatus(this.currentPath); - if (stat.getLen() == 0) { - LOG.warn(peerClusterZnode + " Got EOF and the file was empty"); - } - considerDumping = true; - } catch (IOException e) { - LOG.warn(peerClusterZnode + " Got while getting file size: ", e); - } - } - - if (considerDumping && - sleepMultiplier == maxRetriesMultiplier && - processEndOfFile()) { - continue; - } - } - } finally { - try { - this.reader = null; - this.repLogReader.closeReader(); - } catch (IOException e) { - gotIOE = true; - LOG.warn("Unable to finalize the tailing of a file", e); - } + } catch (InterruptedException e) { + LOG.trace("Interrupted while waiting for next replication entry batch", e); + Thread.currentThread().interrupt(); } - for(Map.Entry entry: lastPositionsForSerialScope.entrySet()) { - waitingUntilCanPush(entry); - } - // If we didn't get anything to replicate, or if we hit a IOE, - // wait a bit and retry. - // But if we need to stop, don't bother sleeping - if (isWorkerActive() && (gotIOE || entries.isEmpty())) { - if (this.lastLoggedPosition != this.repLogReader.getPosition()) { - - // Save positions to meta table before zk. - if (!gotIOE) { - try { - MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, - lastPositionsForSerialScope); - } catch (IOException e) { - LOG.error("updateReplicationPositions fail", e); - stopper.stop("updateReplicationPositions fail"); - } - } - - manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, - this.repLogReader.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); - - this.lastLoggedPosition = this.repLogReader.getPosition(); - } - // Reset the sleep multiplier if nothing has actually gone wrong - if (!gotIOE) { - sleepMultiplier = 1; - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); - } - if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { - sleepMultiplier++; - } - continue; - } - shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); - releaseBufferQuota(); } + if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit boolean allOtherTaskDone = true; - for (ReplicationSourceWorkerThread worker : workerThreads.values()) { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { if (!worker.equals(this) && worker.isAlive()) { allOtherTaskDone = false; break; @@ -773,127 +625,6 @@ public class ReplicationSource extends Thread } } - /** - * Read all the entries from the current log files and retain those that need to be replicated. - * Else, process the end of the current file. - * @param currentWALisBeingWrittenTo is the current WAL being written to - * @param entries resulting entries to be replicated - * @param lastPosition save the last sequenceid for each region if the table has - * serial-replication scope - * @return true if we got nothing and went to the next file, false if we got entries - * @throws IOException - */ - protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, - List entries, Map lastPosition) throws IOException { - long seenEntries = 0; - if (LOG.isTraceEnabled()) { - LOG.trace("Seeking in " + this.currentPath + " at position " - + this.repLogReader.getPosition()); - } - this.repLogReader.seek(); - long positionBeforeRead = this.repLogReader.getPosition(); - WAL.Entry entry = this.repLogReader.readNextAndSetPosition(); - while (entry != null) { - metrics.incrLogEditsRead(); - seenEntries++; - - if (entry.hasSerialReplicationScope()) { - String key = Bytes.toString(entry.getKey().getEncodedRegionName()); - lastPosition.put(key, entry.getKey().getSequenceId()); - if (entry.getEdit().getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); - if (maybeEvent != null && maybeEvent.getEventType() - == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { - // In serially replication, if we move a region to another RS and move it back, we may - // read logs crossing two sections. We should break at REGION_CLOSE and push the first - // section first in case of missing the middle section belonging to the other RS. - // In a worker thread, if we can push the first log of a region, we can push all logs - // in the same region without waiting until we read a close marker because next time - // we read logs in this region, it must be a new section and not adjacent with this - // region. Mark it negative. - lastPosition.put(key, -entry.getKey().getSequenceId()); - break; - } - } - } - boolean totalBufferTooLarge = false; - // don't replicate if the log entries have already been consumed by the cluster - if (replicationEndpoint.canReplicateToSameCluster() - || !entry.getKey().getClusterIds().contains(peerClusterId)) { - // Remove all KVs that should not be replicated - entry = walEntryFilter.filter(entry); - WALEdit edit = null; - WALKey logKey = null; - if (entry != null) { - edit = entry.getEdit(); - logKey = entry.getKey(); - } - - if (edit != null && edit.size() != 0) { - // Mark that the current cluster has the change - logKey.addClusterId(clusterId); - currentNbOperations += countDistinctRowKeys(edit); - entries.add(entry); - int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); - currentSize += delta; - totalBufferTooLarge = acquireBufferQuota(delta); - } else { - metrics.incrLogEditsFiltered(); - } - } - // Stop if too many entries or too big - // FIXME check the relationship between single wal group and overall - if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity - || entries.size() >= replicationQueueNbCapacity) { - break; - } - - try { - entry = this.repLogReader.readNextAndSetPosition(); - } catch (IOException ie) { - LOG.debug("Break on IOE: " + ie.getMessage()); - break; - } - } - metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead); - if (currentWALisBeingWrittenTo) { - return false; - } - // If we didn't get anything and the queue has an object, it means we - // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); - } - - /** - * Calculate the total size of all the store files - * @param edit edit to count row keys from - * @return the total size of the store files - */ - private int calculateTotalSizeOfStoreFiles(WALEdit edit) { - List cells = edit.getCells(); - int totalStoreFilesSize = 0; - - int totalCells = edit.size(); - for (int i = 0; i < totalCells; i++) { - if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { - try { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); - List stores = bld.getStoresList(); - int totalStores = stores.size(); - for (int j = 0; j < totalStores; j++) { - totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes(); - } - } catch (IOException e) { - LOG.error("Failed to deserialize bulk load entry from wal edit. " - + "Size of HFiles part of cell will not be considered in replication " - + "request size calculation.", e); - } - } - } - return totalStoreFilesSize; - } - private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = peerClusterZnode; if (peerId.contains("-")) { @@ -918,204 +649,6 @@ public class ReplicationSource extends Thread } } - /** - * Poll for the next path - * @return true if a path was obtained, false if not - */ - protected boolean getNextPath() { - try { - if (this.currentPath == null) { - this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS); - metrics.decrSizeOfLogQueue(); - if (this.currentPath != null) { - // For recovered queue: must use peerClusterZnode since peerId is a parsed value - manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode, - this.replicationQueueInfo.isQueueRecovered()); - if (LOG.isTraceEnabled()) { - LOG.trace("New log: " + this.currentPath); - } - } - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while reading edits", e); - } - return this.currentPath != null; - } - - /** - * Open a reader on the current path - * - * @param sleepMultiplier by how many times the default sleeping time is augmented - * @return true if we should continue with that file, false if we are over with it - */ - protected boolean openReader(int sleepMultiplier) { - try { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Opening log " + this.currentPath); - } - this.reader = repLogReader.openReader(this.currentPath); - } catch (FileNotFoundException fnfe) { - if (this.replicationQueueInfo.isQueueRecovered()) { - // We didn't find the log in the archive directory, look if it still - // exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path walDir = FSUtils.getWALRootDir(conf); - for (String curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = new Path(walDir, - AbstractFSWALProvider.getWALDirectoryName(curDeadServerName)); - Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()), - new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), - currentPath.getName()) }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + this.currentPath + " still exists at " + - possibleLogLocation); - // Breaking here will make us sleep since reader is null - // TODO why don't we need to set currentPath and call openReader here? - return true; - } - } - } - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - if (stopper instanceof ReplicationSyncUp.DummyServer) { - // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal - // area rather than to the wal area for a particular region server. - FileStatus[] rss = fs.listStatus(manager.getLogDir()); - for (FileStatus rs : rss) { - Path p = rs.getPath(); - FileStatus[] logs = fs.listStatus(p); - for (FileStatus log : logs) { - p = new Path(p, log.getPath().getName()); - if (p.getName().equals(currentPath.getName())) { - currentPath = p; - LOG.info("Log " + currentPath.getName() + " found at " + currentPath); - // Open the log at the new location - this.openReader(sleepMultiplier); - return true; - } - } - } - } - - // TODO What happens if the log was missing from every single location? - // Although we need to check a couple of times as the log could have - // been moved by the master between the checks - // It can also happen if a recovered queue wasn't properly cleaned, - // such that the znode pointing to a log exists but the log was - // deleted a long time ago. - // For the moment, we'll throw the IO and processEndOfFile - throw new IOException("File from recovered queue is " + - "nowhere to be found", fnfe); - } else { - // If the log was archived, continue reading from there - Path archivedLogLocation = - new Path(manager.getOldLogDir(), currentPath.getName()); - if (manager.getFs().exists(archivedLogLocation)) { - currentPath = archivedLogLocation; - LOG.info("Log " + this.currentPath + " was moved to " + - archivedLogLocation); - // Open the log at the new location - this.openReader(sleepMultiplier); - - } - // TODO What happens the log is missing in both places? - } - } - } catch (LeaseNotRecoveredException lnre) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre); - recoverLease(conf, currentPath); - this.reader = null; - } catch (IOException ioe) { - if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; - LOG.warn(peerClusterZnode + " Got: ", ioe); - this.reader = null; - if (ioe.getCause() instanceof NullPointerException) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - } else if (sleepMultiplier >= maxRetriesMultiplier) { - // TODO Need a better way to determine if a file is really gone but - // TODO without scanning all logs dir - LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); - } - } - return true; - } - - private void recoverLease(final Configuration conf, final Path path) { - try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { - @Override - public boolean progress() { - LOG.debug("recover WAL lease: " + path); - return isWorkerActive(); - } - }); - } catch (IOException e) { - LOG.warn("unable to recover lease for WAL: " + path, e); - } - } - - /* - * Checks whether the current log file is empty, and it is not a recovered queue. This is to - * handle scenario when in an idle cluster, there is no entry in the current log and we keep on - * trying to read the log file and get EOFException. In case of a recovered queue the last log - * file may be empty, and we don't want to retry that. - */ - private boolean isCurrentLogEmpty() { - return (this.repLogReader.getPosition() == 0 && - !this.replicationQueueInfo.isQueueRecovered() && queue.isEmpty()); - } - - /** - * Count the number of different row keys in the given edit because of mini-batching. We assume - * that there's at least one Cell in the WALEdit. - * @param edit edit to count row keys from - * @return number of different row keys - */ - private int countDistinctRowKeys(WALEdit edit) { - List cells = edit.getCells(); - int distinctRowKeys = 1; - int totalHFileEntries = 0; - Cell lastCell = cells.get(0); - - int totalCells = edit.size(); - for (int i = 0; i < totalCells; i++) { - // Count HFiles to be replicated - if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { - try { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); - List stores = bld.getStoresList(); - int totalStores = stores.size(); - for (int j = 0; j < totalStores; j++) { - totalHFileEntries += stores.get(j).getStoreFileList().size(); - } - } catch (IOException e) { - LOG.error("Failed to deserialize bulk load entry from wal edit. " - + "Then its hfiles count will not be added into metric."); - } - } - - if (!CellUtil.matchingRows(cells.get(i), lastCell)) { - distinctRowKeys++; - } - lastCell = cells.get(i); - } - currentNbHFiles += totalHFileEntries; - return distinctRowKeys + totalHFileEntries; - } - private void checkBandwidthChangeAndResetThrottler() { long peerBandwidth = getCurrentBandwidth(); if (peerBandwidth != currentBandwidth) { @@ -1128,16 +661,24 @@ public class ReplicationSource extends Thread /** * Do the shipping logic - * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) - * written to when this method was called */ - protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries, - Map lastPositionsForSerialScope) { + protected void shipEdits(WALEntryBatch entryBatch) { + List entries = entryBatch.getWalEntries(); + long lastReadPosition = entryBatch.getLastWalPosition(); + currentPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - LOG.warn("Was given 0 edits to ship"); + if (lastLoggedPosition != lastReadPosition) { + // Save positions to meta table before zk. + updateSerialRepPositions(entryBatch.getLastSeqIds()); + updateLogPosition(lastReadPosition); + // if there was nothing to ship and it's not an error + // set "ageOfLastShippedOp" to to indicate that we're current + metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); + } return; } + int currentSize = (int) entryBatch.getHeapSize(); while (isWorkerActive()) { try { checkBandwidthChangeAndResetThrottler(); @@ -1178,7 +719,7 @@ public class ReplicationSource extends Thread sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - if (this.lastLoggedPosition != this.repLogReader.getPosition()) { + if (this.lastLoggedPosition != lastReadPosition) { //Clean up hfile references int size = entries.size(); for (int i = 0; i < size; i++) { @@ -1186,27 +727,18 @@ public class ReplicationSource extends Thread } // Save positions to meta table before zk. - try { - MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, - lastPositionsForSerialScope); - } catch (IOException e) { - LOG.error("updateReplicationPositions fail", e); - stopper.stop("updateReplicationPositions fail"); - } + updateSerialRepPositions(entryBatch.getLastSeqIds()); //Log and clean up WAL logs - manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, - this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), - currentWALisBeingWrittenTo); - this.lastLoggedPosition = this.repLogReader.getPosition(); + updateLogPosition(lastReadPosition); } if (throttler.isEnabled()) { throttler.addPushSize(currentSize); } totalReplicatedEdits.addAndGet(entries.size()); - totalReplicatedOperations.addAndGet(currentNbOperations); + totalReplicatedOperations.addAndGet(entryBatch.getNbOperations()); // FIXME check relationship between wal group and overall - metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles); + metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles()); metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { @@ -1225,62 +757,20 @@ public class ReplicationSource extends Thread } } - /** - * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means - * we're done! Else we'll just continue to try reading the log file - * @return true if we're done with the current file, false if we should continue trying to read - * from it - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", - justification = "Yeah, this is how it works") - protected boolean processEndOfFile() { - // We presume this means the file we're reading is closed. - if (this.queue.size() != 0) { - // -1 means the wal wasn't closed cleanly. - final long trailerSize = this.repLogReader.currentTrailerSize(); - final long currentPosition = this.repLogReader.getPosition(); - FileStatus stat = null; - try { - stat = fs.getFileStatus(this.currentPath); - } catch (IOException exception) { - LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly" - + ", stats: " + getStats()); - metrics.incrUnknownFileLengthForClosedWAL(); - } - if (stat != null) { - if (trailerSize < 0) { - if (currentPosition < stat.getLen()) { - final long skippedBytes = stat.getLen() - currentPosition; - LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data."); - metrics.incrUncleanlyClosedWALs(); - metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); - } - } else if (currentPosition + trailerSize < stat.getLen()){ - LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() + - ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats()); - repLogReader.setPosition(0); - metrics.incrRestartedWALReading(); - metrics.incrRepeatedFileBytes(currentPosition); - return false; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() - + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); - } - this.currentPath = null; - this.repLogReader.finishCurrentFile(); - this.reader = null; - metrics.incrCompletedWAL(); - return true; - } else if (this.replicationQueueInfo.isQueueRecovered()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + peerClusterZnode); - metrics.incrCompletedRecoveryQueue(); - workerRunning = false; - return true; + private void updateSerialRepPositions(Map lastPositionsForSerialScope) { + try { + MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, + lastPositionsForSerialScope); + } catch (IOException e) { + LOG.error("updateReplicationPositions fail", e); + stopper.stop("updateReplicationPositions fail"); } - return false; + } + + private void updateLogPosition(long lastReadPosition) { + manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition, + this.replicationQueueInfo.isQueueRecovered(), false); + lastLoggedPosition = lastReadPosition; } public void startup() { @@ -1295,6 +785,134 @@ public class ReplicationSource extends Thread Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + "," + peerClusterZnode, handler); workerThreads.put(walGroupId, this); + + long startPosition = 0; + + if (this.replicationQueueInfo.isQueueRecovered()) { + startPosition = getRecoveredQueueStartPos(startPosition); + int numRetries = 0; + while (numRetries <= maxRetriesMultiplier) { + try { + locateRecoveredPaths(); + break; + } catch (IOException e) { + LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); + numRetries++; + } + } + } + + startWALReaderThread(n, handler, startPosition); + } + + // If this is a recovered queue, the queue is already full and the first log + // normally has a position (unless the RS failed between 2 logs) + private long getRecoveredQueueStartPos(long startPosition) { + try { + startPosition = + (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName())); + if (LOG.isTraceEnabled()) { + LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + + startPosition); + } + } catch (ReplicationException e) { + terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); + } + return startPosition; + } + + // start a background thread to read and batch entries + private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler, + long startPosition) { + ArrayList filters = Lists.newArrayList(walEntryFilter, + new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); + ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); + entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue, + startPosition, fs, conf, readerFilter, metrics); + Threads.setDaemonThreadRunning(entryReader, threadName + + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + handler); + } + + // Loops through the recovered queue and tries to find the location of each log + // this is necessary because the logs may have moved before recovery was initiated + private void locateRecoveredPaths() throws IOException { + boolean hasPathChanged = false; + PriorityBlockingQueue newPaths = + new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + pathsLoop: for (Path path : queue) { + if (fs.exists(path)) { // still in same location, don't need to do anything + newPaths.add(path); + continue; + } + // Path changed - try to find the right path. + hasPathChanged = true; + if (stopper instanceof ReplicationSyncUp.DummyServer) { + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + Path newPath = getReplSyncUpPath(path); + newPaths.add(newPath); + continue; + } else { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path walDir = FSUtils.getWALRootDir(conf); + for (String curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = + new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName)); + Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (manager.getFs().exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + path + " still exists at " + possibleLogLocation); + newPaths.add(possibleLogLocation); + continue pathsLoop; + } + } + } + // didn't find a new location + LOG.error( + String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); + newPaths.add(path); + } + } + + if (hasPathChanged) { + if (newPaths.size() != queue.size()) { // this shouldn't happen + LOG.error("Recovery queue size is incorrect"); + throw new IOException("Recovery queue size error"); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (Path path : newPaths) { + queue.add(path); + } + } + } + + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal + // area rather than to the wal area for a particular region server. + private Path getReplSyncUpPath(Path path) throws IOException { + FileStatus[] rss = fs.listStatus(manager.getLogDir()); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(path.getName())) { + LOG.info("Log " + p.getName() + " found at " + p); + return p; + } + } + } + LOG.error("Didn't find path for: " + path.getName()); + return path; } public Path getCurrentPath() { @@ -1302,7 +920,7 @@ public class ReplicationSource extends Thread } public long getCurrentPosition() { - return this.repLogReader.getPosition(); + return this.lastLoggedPosition; } private boolean isWorkerActive() { @@ -1317,27 +935,20 @@ public class ReplicationSource extends Thread LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, cause); } + entryReader.interrupt(); + Threads.shutdown(entryReader, sleepForRetries); this.interrupt(); Threads.shutdown(this, sleepForRetries); LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); } public void setWorkerRunning(boolean workerRunning) { + entryReader.setReaderRunning(workerRunning); this.workerRunning = workerRunning; } - /** - * @param size delta size for grown buffer - * @return true if we should clear buffer and push all - */ - private boolean acquireBufferQuota(long size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; - } - - private void releaseBufferQuota() { - totalBufferUsed.addAndGet(-currentSize); - currentSize = 0; - entries.clear(); + private void releaseBufferQuota(int size) { + totalBufferUsed.addAndGet(-size); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java new file mode 100644 index 00000000000..29808e97577 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -0,0 +1,471 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +/** + * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplicationSourceWALReaderThread extends Thread { + private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class); + + private PriorityBlockingQueue logQueue; + private FileSystem fs; + private Configuration conf; + private BlockingQueue entryBatchQueue; + // max (heap) size of each batch - multiply by number of batches in queue to get total + private long replicationBatchSizeCapacity; + // max count of each batch - multiply by number of batches in queue to get total + private int replicationBatchCountCapacity; + // position in the WAL to start reading at + private long currentPosition; + private WALEntryFilter filter; + private long sleepForRetries; + //Indicates whether this particular worker is running + private boolean isReaderRunning = true; + private ReplicationQueueInfo replicationQueueInfo; + private int maxRetriesMultiplier; + private MetricsSource metrics; + + private AtomicLong totalBufferUsed; + private long totalBufferQuota; + + /** + * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the + * entries, and puts them on a batch queue. + * @param manager replication manager + * @param replicationQueueInfo + * @param logQueue The WAL queue to read off of + * @param startPosition position in the first WAL to start reading from + * @param fs the files system to use + * @param conf configuration to use + * @param filter The filter to use while reading + * @param metrics replication metrics + */ + public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, + ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue logQueue, + long startPosition, + FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) { + this.replicationQueueInfo = replicationQueueInfo; + this.logQueue = logQueue; + this.currentPosition = startPosition; + this.fs = fs; + this.conf = conf; + this.filter = filter; + this.replicationBatchSizeCapacity = + this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); + this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); + // memory used will be batchSizeCapacity * (nb.batches + 1) + // the +1 is for the current thread reading before placing onto the queue + int batchCount = conf.getInt("replication.source.nb.batches", 1); + this.totalBufferUsed = manager.getTotalBufferUsed(); + this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.sleepForRetries = + this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second + this.maxRetriesMultiplier = + this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per + this.metrics = metrics; + this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); + LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() + + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity + + ", replicationBatchQueueCapacity=" + batchCount); + } + + @Override + public void run() { + int sleepMultiplier = 1; + while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { + while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!checkQuota()) { + continue; + } + WALEntryBatch batch = null; + while (entryStream.hasNext()) { + if (batch == null) { + batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + } + Entry entry = entryStream.next(); + if (updateSerialReplPos(batch, entry)) { + batch.lastWalPosition = entryStream.getPosition(); + break; + } + entry = filterEntry(entry); + if (entry != null) { + WALEdit edit = entry.getEdit(); + if (edit != null && !edit.isEmpty()) { + long entrySize = getEntrySize(entry); + batch.addEntry(entry); + updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + // Stop if too many entries or too big + if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity + || batch.getNbEntries() >= replicationBatchCountCapacity) { + break; + } + } + } + } + if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Read %s WAL entries eligible for replication", + batch.getNbEntries())); + } + entryBatchQueue.put(batch); + sleepMultiplier = 1; + } else { // got no entries and didn't advance position in WAL + LOG.trace("Didn't read any new entries from WAL"); + if (replicationQueueInfo.isQueueRecovered()) { + // we're done with queue recovery, shut ourself down + setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(batch != null ? batch + : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); + } else { + Thread.sleep(sleepForRetries); + } + } + currentPosition = entryStream.getPosition(); + entryStream.reset(); // reuse stream + } + } catch (IOException | WALEntryStreamRuntimeException e) { // stream related + if (sleepMultiplier < maxRetriesMultiplier) { + LOG.debug("Failed to read stream of replication entries: " + e); + sleepMultiplier++; + } else { + LOG.error("Failed to read stream of replication entries", e); + } + Threads.sleep(sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads"); + Thread.currentThread().interrupt(); + } + } + } + + //returns false if we've already exceeded the global quota + private boolean checkQuota() { + // try not to go over total quota + if (totalBufferUsed.get() > totalBufferQuota) { + Threads.sleep(sleepForRetries); + return false; + } + return true; + } + + private Entry filterEntry(Entry entry) { + Entry filtered = filter.filter(entry); + if (entry != null && filtered == null) { + metrics.incrLogEditsFiltered(); + } + return filtered; + } + + /** + * @return true if we should stop reading because we're at REGION_CLOSE + */ + private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException { + if (entry.hasSerialReplicationScope()) { + String key = Bytes.toString(entry.getKey().getEncodedRegionName()); + batch.setLastPosition(key, entry.getKey().getSequenceId()); + if (!entry.getEdit().getCells().isEmpty()) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + if (maybeEvent != null && maybeEvent + .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { + // In serially replication, if we move a region to another RS and move it back, we may + // read logs crossing two sections. We should break at REGION_CLOSE and push the first + // section first in case of missing the middle section belonging to the other RS. + // In a worker thread, if we can push the first log of a region, we can push all logs + // in the same region without waiting until we read a close marker because next time + // we read logs in this region, it must be a new section and not adjacent with this + // region. Mark it negative. + batch.setLastPosition(key, -entry.getKey().getSequenceId()); + return true; + } + } + } + return false; + } + + /** + * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a + * batch to become available + * @return A batch of entries, along with the position in the log after reading the batch + * @throws InterruptedException if interrupted while waiting + */ + public WALEntryBatch take() throws InterruptedException { + return entryBatchQueue.take(); + } + + private long getEntrySize(Entry entry) { + WALEdit edit = entry.getEdit(); + return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); + } + + private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { + WALEdit edit = entry.getEdit(); + if (edit != null && !edit.isEmpty()) { + batch.incrementHeapSize(entrySize); + Pair nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); + batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); + batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); + } + batch.lastWalPosition = entryPosition; + } + + /** + * Count the number of different row keys in the given edit because of mini-batching. We assume + * that there's at least one Cell in the WALEdit. + * @param edit edit to count row keys from + * @return number of different row keys and HFiles + */ + private Pair countDistinctRowKeysAndHFiles(WALEdit edit) { + List cells = edit.getCells(); + int distinctRowKeys = 1; + int totalHFileEntries = 0; + Cell lastCell = cells.get(0); + + int totalCells = edit.size(); + for (int i = 0; i < totalCells; i++) { + // Count HFiles to be replicated + if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); + List stores = bld.getStoresList(); + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { + totalHFileEntries += stores.get(j).getStoreFileList().size(); + } + } catch (IOException e) { + LOG.error("Failed to deserialize bulk load entry from wal edit. " + + "Then its hfiles count will not be added into metric."); + } + } + + if (!CellUtil.matchingRows(cells.get(i), lastCell)) { + distinctRowKeys++; + } + lastCell = cells.get(i); + } + + Pair result = new Pair<>(distinctRowKeys, totalHFileEntries); + return result; + } + + /** + * Calculate the total size of all the store files + * @param edit edit to count row keys from + * @return the total size of the store files + */ + private int calculateTotalSizeOfStoreFiles(WALEdit edit) { + List cells = edit.getCells(); + int totalStoreFilesSize = 0; + + int totalCells = edit.size(); + for (int i = 0; i < totalCells; i++) { + if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); + List stores = bld.getStoresList(); + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { + totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes(); + } + } catch (IOException e) { + LOG.error("Failed to deserialize bulk load entry from wal edit. " + + "Size of HFiles part of cell will not be considered in replication " + + "request size calculation.", + e); + } + } + } + return totalStoreFilesSize; + } + + /** + * @param size delta size for grown buffer + * @return true if we should clear buffer and push all + */ + private boolean acquireBufferQuota(long size) { + return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + } + + /** + * @return whether the reader thread is running + */ + public boolean isReaderRunning() { + return isReaderRunning && !isInterrupted(); + } + + /** + * @param readerRunning the readerRunning to set + */ + public void setReaderRunning(boolean readerRunning) { + this.isReaderRunning = readerRunning; + } + + /** + * Holds a batch of WAL entries to replicate, along with some statistics + * + */ + static class WALEntryBatch { + private List walEntries; + // last WAL that was read + private Path lastWalPath; + // position in WAL of last entry in this batch + private long lastWalPosition = 0; + // number of distinct row keys in this batch + private int nbRowKeys = 0; + // number of HFiles + private int nbHFiles = 0; + // heap size of data we need to replicate + private long heapSize = 0; + // save the last sequenceid for each region if the table has serial-replication scope + private Map lastSeqIds = new HashMap<>(); + + /** + * @param walEntries + * @param lastWalPath Path of the WAL the last entry in this batch was read from + * @param lastWalPosition Position in the WAL the last entry in this batch was read from + */ + private WALEntryBatch(int maxNbEntries, Path lastWalPath) { + this.walEntries = new ArrayList<>(maxNbEntries); + this.lastWalPath = lastWalPath; + } + + public void addEntry(Entry entry) { + walEntries.add(entry); + } + + /** + * @return the WAL Entries. + */ + public List getWalEntries() { + return walEntries; + } + + /** + * @return the path of the last WAL that was read. + */ + public Path getLastWalPath() { + return lastWalPath; + } + + /** + * @return the position in the last WAL that was read. + */ + public long getLastWalPosition() { + return lastWalPosition; + } + + public int getNbEntries() { + return walEntries.size(); + } + + /** + * @return the number of distinct row keys in this batch + */ + public int getNbRowKeys() { + return nbRowKeys; + } + + /** + * @return the number of HFiles in this batch + */ + public int getNbHFiles() { + return nbHFiles; + } + + /** + * @return total number of operations in this batch + */ + public int getNbOperations() { + return getNbRowKeys() + getNbHFiles(); + } + + /** + * @return the heap size of this batch + */ + public long getHeapSize() { + return heapSize; + } + + /** + * @return the last sequenceid for each region if the table has serial-replication scope + */ + public Map getLastSeqIds() { + return lastSeqIds; + } + + private void incrementNbRowKeys(int increment) { + nbRowKeys += increment; + } + + private void incrementNbHFiles(int increment) { + nbHFiles += increment; + } + + private void incrementHeapSize(long increment) { + heapSize += increment; + } + + private void setLastPosition(String region, Long sequenceId) { + getLastSeqIds().put(region, sequenceId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java deleted file mode 100644 index 3558d088cb2..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALFactory; - -import java.io.IOException; - -/** - * Wrapper class around WAL to help manage the implementation details - * such as compression. - */ -@InterfaceAudience.Private -public class ReplicationWALReaderManager { - - private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class); - private final FileSystem fs; - private final Configuration conf; - private long position = 0; - private Reader reader; - private Path lastPath; - - /** - * Creates the helper but doesn't open any file - * Use setInitialPosition after using the constructor if some content needs to be skipped - * @param fs - * @param conf - */ - public ReplicationWALReaderManager(FileSystem fs, Configuration conf) { - this.fs = fs; - this.conf = conf; - } - - /** - * Opens the file at the current position - * @param path - * @return an WAL reader. - * @throws IOException - */ - public Reader openReader(Path path) throws IOException { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (this.reader == null || !this.lastPath.equals(path)) { - this.closeReader(); - this.reader = WALFactory.createReader(this.fs, path, this.conf); - this.lastPath = path; - } else { - try { - this.reader.reset(); - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - return this.reader; - } - - /** - * Get the next entry, returned and also added in the array - * @return a new entry or null - * @throws IOException - */ - public Entry readNextAndSetPosition() throws IOException { - Entry entry = this.reader.next(); - // Store the position so that in the future the reader can start - // reading from here. If the above call to next() throws an - // exception, the position won't be changed and retry will happen - // from the last known good position - this.position = this.reader.getPosition(); - // We need to set the CC to null else it will be compressed when sent to the sink - if (entry != null) { - entry.setCompressionContext(null); - } - return entry; - } - - /** - * Advance the reader to the current position - * @throws IOException - */ - public void seek() throws IOException { - if (this.position != 0) { - this.reader.seek(this.position); - } - } - - /** - * Get the position that we stopped reading at - * @return current position, cannot be negative - */ - public long getPosition() { - return this.position; - } - - public void setPosition(long pos) { - this.position = pos; - } - - public long currentTrailerSize() { - long size = -1L; - if (reader instanceof ProtobufLogReader) { - final ProtobufLogReader pblr = (ProtobufLogReader)reader; - size = pblr.trailerSize(); - } - return size; - } - - /** - * Close the current reader - * @throws IOException - */ - public void closeReader() throws IOException { - if (this.reader != null) { - this.reader.close(); - this.reader = null; - } - } - - /** - * Tell the helper to reset internal state - */ - void finishCurrentFile() { - this.position = 0; - try { - this.closeReader(); - } catch (IOException e) { - LOG.warn("Unable to close reader", e); - } - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java new file mode 100644 index 00000000000..c4d552c17d0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -0,0 +1,411 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class WALEntryStream implements Iterator, Closeable, Iterable { + private static final Log LOG = LogFactory.getLog(WALEntryStream.class); + + private Reader reader; + private Path currentPath; + // cache of next entry for hasNext() + private Entry currentEntry; + // position after reading current entry + private long currentPosition = 0; + private PriorityBlockingQueue logQueue; + private FileSystem fs; + private Configuration conf; + private MetricsSource metrics; + + /** + * Create an entry stream over the given queue + * @param logQueue the queue of WAL paths + * @param fs {@link FileSystem} to use to create {@link Reader} for this stream + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param metrics replication metrics + * @throws IOException + */ + public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + MetricsSource metrics) + throws IOException { + this(logQueue, fs, conf, 0, metrics); + } + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL paths + * @param fs {@link FileSystem} to use to create {@link Reader} for this stream + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param metrics replication metrics + * @throws IOException + */ + public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + long startPosition, MetricsSource metrics) throws IOException { + this.logQueue = logQueue; + this.fs = fs; + this.conf = conf; + this.currentPosition = startPosition; + this.metrics = metrics; + } + + /** + * @return true if there is another WAL {@link Entry} + * @throws WALEntryStreamRuntimeException if there was an Exception while reading + */ + @Override + public boolean hasNext() { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (Exception e) { + throw new WALEntryStreamRuntimeException(e); + } + } + return currentEntry != null; + } + + /** + * @return the next WAL entry in this stream + * @throws WALEntryStreamRuntimeException if there was an IOException + * @throws NoSuchElementException if no more entries in the stream. + */ + @Override + public Entry next() { + if (!hasNext()) throw new NoSuchElementException(); + Entry save = currentEntry; + currentEntry = null; // gets reloaded by hasNext() + return save; + } + + /** + * Not supported. + */ + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + /** + * @return the iterator over WAL entries in the queue. + */ + @Override + public Iterator iterator() { + return this; + } + + /** + * @return the position of the last Entry returned by next() + */ + public long getPosition() { + return currentPosition; + } + + /** + * @return the {@link Path} of the current WAL + */ + public Path getCurrentPath() { + return currentPath; + } + + private String getCurrentPathStat() { + StringBuilder sb = new StringBuilder(); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(currentPosition).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + + /** + * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned + * false) + * @throws IOException + */ + public void reset() throws IOException { + if (reader != null && currentPath != null) { + resetReader(); + } + } + + private void setPosition(long position) { + currentPosition = position; + } + + private void setCurrentPath(Path path) { + this.currentPath = path; + } + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + readNextEntryAndSetPosition(); + if (currentEntry == null) { // no more entries in this log file - see if log was rolled + if (logQueue.size() > 1) { // log was rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, + // and a new log was enqueued while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndSetPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndSetPosition(); + } + } + } + } // no other logs, we've simply hit the end of the current open log. Do nothing + } + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + private boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(this.currentPath); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + if (stat != null) { + if (trailerSize < 0) { + if (currentPosition < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPosition; + LOG.info("Reached the end of WAL file '" + currentPath + + "'. It was not closed cleanly, so we did not parse " + skippedBytes + + " bytes of data."); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPosition + trailerSize < stat.getLen()) { + LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + + ", which is too far away from reported file length " + stat.getLen() + + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPosition); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private void dequeueCurrentLog() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Reached the end of log " + currentPath); + } + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + private void readNextEntryAndSetPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPosition); + } + currentEntry = readEntry; // could be null + setPosition(readerPos); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + private boolean openNextLog() throws IOException { + Path nextPath = logQueue.peek(); + if (nextPath != null) { + openReader(nextPath); + if (reader != null) return true; + } + return false; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } else { + LOG.error("Couldn't locate log: " + path); + return path; + } + } + + private void openReader(Path path) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !getCurrentPath().equals(path)) { + closeReader(); + reader = WALFactory.createReader(fs, path, conf); + seek(); + setCurrentPath(path); + } else { + resetReader(); + } + } catch (FileNotFoundException fnfe) { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(path); + if (!path.equals(archivedLog)) { + openReader(archivedLog); + } else { + throw fnfe; + } + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentPath, lnre); + recoverLease(conf, currentPath); + reader = null; + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + private void resetReader() throws IOException { + try { + reader.reset(); + seek(); + } catch (FileNotFoundException fnfe) { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(currentPath); + if (!currentPath.equals(archivedLog)) { + openReader(archivedLog); + } else { + throw fnfe; + } + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } + } + + private void seek() throws IOException { + if (currentPosition != 0) { + reader.seek(currentPosition); + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + @InterfaceAudience.Private + public static class WALEntryStreamRuntimeException extends RuntimeException { + private static final long serialVersionUID = -6298201811259982568L; + + public WALEntryStreamRuntimeException(Exception e) { + super(e); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java deleted file mode 100644 index a76cec98cde..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ /dev/null @@ -1,238 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; -import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@Category({ReplicationTests.class, LargeTests.class}) -@RunWith(Parameterized.class) -public class TestReplicationWALReaderManager { - - private static HBaseTestingUtility TEST_UTIL; - private static Configuration conf; - private static FileSystem fs; - private static MiniDFSCluster cluster; - private static final TableName tableName = TableName.valueOf("tablename"); - private static final byte [] family = Bytes.toBytes("column"); - private static final byte [] qualifier = Bytes.toBytes("qualifier"); - private static final HRegionInfo info = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); - private static final HTableDescriptor htd = new HTableDescriptor(tableName); - private static NavigableMap scopes; - - private WAL log; - private ReplicationWALReaderManager logManager; - private PathWatcher pathWatcher; - private int nbRows; - private int walEditKVs; - private final AtomicLong sequenceId = new AtomicLong(1); - @Rule public TestName tn = new TestName(); - private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - - @Parameters - public static Collection parameters() { - // Try out different combinations of row count and KeyValue count - int[] NB_ROWS = { 1500, 60000 }; - int[] NB_KVS = { 1, 100 }; - // whether compression is used - Boolean[] BOOL_VALS = { false, true }; - List parameters = new ArrayList(); - for (int nbRows : NB_ROWS) { - for (int walEditKVs : NB_KVS) { - for (boolean b : BOOL_VALS) { - Object[] arr = new Object[3]; - arr[0] = nbRows; - arr[1] = walEditKVs; - arr[2] = b; - parameters.add(arr); - } - } - } - return parameters; - } - - public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) { - this.nbRows = nbRows; - this.walEditKVs = walEditKVs; - TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, - enableCompression); - mvcc.advanceTo(1); - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); - conf = TEST_UTIL.getConfiguration(); - TEST_UTIL.startMiniDFSCluster(3); - - cluster = TEST_UTIL.getDFSCluster(); - fs = cluster.getFileSystem(); - scopes = new TreeMap( - Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before - public void setUp() throws Exception { - logManager = new ReplicationWALReaderManager(fs, conf); - List listeners = new ArrayList(1); - pathWatcher = new PathWatcher(); - listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); - log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); - } - - @After - public void tearDown() throws Exception { - log.close(); - } - - @Test - public void test() throws Exception { - // Grab the path that was generated when the log rolled as part of its creation - Path path = pathWatcher.currentPath; - - assertEquals(0, logManager.getPosition()); - - appendToLog(); - - // There's one edit in the log, read it. Reading past it needs to return nulls - assertNotNull(logManager.openReader(path)); - logManager.seek(); - WAL.Entry entry = logManager.readNextAndSetPosition(); - assertNotNull(entry); - entry = logManager.readNextAndSetPosition(); - assertNull(entry); - logManager.closeReader(); - long oldPos = logManager.getPosition(); - - appendToLog(); - - // Read the newly added entry, make sure we made progress - assertNotNull(logManager.openReader(path)); - logManager.seek(); - entry = logManager.readNextAndSetPosition(); - assertNotEquals(oldPos, logManager.getPosition()); - assertNotNull(entry); - logManager.closeReader(); - oldPos = logManager.getPosition(); - - log.rollWriter(); - - // We rolled but we still should see the end of the first log and not get data - assertNotNull(logManager.openReader(path)); - logManager.seek(); - entry = logManager.readNextAndSetPosition(); - assertEquals(oldPos, logManager.getPosition()); - assertNull(entry); - logManager.finishCurrentFile(); - - path = pathWatcher.currentPath; - - for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); } - log.rollWriter(); - logManager.openReader(path); - logManager.seek(); - for (int i = 0; i < nbRows; i++) { - WAL.Entry e = logManager.readNextAndSetPosition(); - if (e == null) { - fail("Should have enough entries"); - } - } - } - - private void appendToLog() throws IOException { - appendToLogPlus(1); - } - - private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); - log.sync(txid); - } - - private WALEdit getWALEdits(int count) { - WALEdit edit = new WALEdit(); - for (int i = 0; i < count; i++) { - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, - System.currentTimeMillis(), qualifier)); - } - return edit; - } - - class PathWatcher extends WALActionsListener.Base { - - Path currentPath; - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - currentPath = newPath; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java new file mode 100644 index 00000000000..5337f381ccf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -0,0 +1,440 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestWALEntryStream { + + private static HBaseTestingUtility TEST_UTIL; + private static Configuration conf; + private static FileSystem fs; + private static MiniDFSCluster cluster; + private static final TableName tableName = TableName.valueOf("tablename"); + private static final byte[] family = Bytes.toBytes("column"); + private static final byte[] qualifier = Bytes.toBytes("qualifier"); + private static final HRegionInfo info = + new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); + private static final HTableDescriptor htd = new HTableDescriptor(tableName); + private static NavigableMap scopes; + + private WAL log; + PriorityBlockingQueue walQueue; + private PathWatcher pathWatcher; + + @Rule + public TestName tn = new TestName(); + private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniDFSCluster(3); + + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + walQueue = new PriorityBlockingQueue<>(); + List listeners = new ArrayList(); + pathWatcher = new PathWatcher(); + listeners.add(pathWatcher); + final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); + log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); + } + + @After + public void tearDown() throws Exception { + log.close(); + } + + // Try out different combinations of row count and KeyValue count + @Test + public void testDifferentCounts() throws Exception { + int[] NB_ROWS = { 1500, 60000 }; + int[] NB_KVS = { 1, 100 }; + // whether compression is used + Boolean[] BOOL_VALS = { false, true }; + // long lastPosition = 0; + for (int nbRows : NB_ROWS) { + for (int walEditKVs : NB_KVS) { + for (boolean isCompressionEnabled : BOOL_VALS) { + TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, + isCompressionEnabled); + mvcc.advanceTo(1); + + for (int i = 0; i < nbRows; i++) { + appendToLogPlus(walEditKVs); + } + + log.rollWriter(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + int i = 0; + for (WAL.Entry e : entryStream) { + assertNotNull(e); + i++; + } + assertEquals(nbRows, i); + + // should've read all entries + assertFalse(entryStream.hasNext()); + } + // reset everything for next loop + log.close(); + setUp(); + } + } + } + } + + /** + * Tests basic reading of log appends + */ + @Test + public void testAppendsWithRolls() throws Exception { + appendToLog(); + + long oldPos; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + // There's one edit in the log, read it. Reading past it needs to throw exception + assertTrue(entryStream.hasNext()); + WAL.Entry entry = entryStream.next(); + assertNotNull(entry); + assertFalse(entryStream.hasNext()); + try { + entry = entryStream.next(); + fail(); + } catch (NoSuchElementException e) { + // expected + } + oldPos = entryStream.getPosition(); + } + + appendToLog(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + // Read the newly added entry, make sure we made progress + WAL.Entry entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + oldPos = entryStream.getPosition(); + } + + // We rolled but we still should see the end of the first log and get that item + appendToLog(); + log.rollWriter(); + appendToLog(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + WAL.Entry entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + + // next item should come from the new log + entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + + // no more entries to read + assertFalse(entryStream.hasNext()); + oldPos = entryStream.getPosition(); + } + } + + /** + * Tests that if after a stream is opened, more entries come in and then the log is rolled, we + * don't mistakenly dequeue the current log thinking we're done with it + */ + @Test + public void testLogrollWhileStreaming() throws Exception { + appendToLog("1"); + appendToLog("2");// 2 + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + assertEquals("1", getRow(entryStream.next())); + + appendToLog("3"); // 3 - comes in after reader opened + log.rollWriter(); // log roll happening while we're reading + appendToLog("4"); // 4 - this append is in the rolled log + + assertEquals("2", getRow(entryStream.next())); + assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an + // entry in first log + assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 + // and 3 would be skipped + assertEquals("4", getRow(entryStream.next())); // 4 + assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly + assertFalse(entryStream.hasNext()); + } + } + + /** + * Tests that if writes come in while we have a stream open, we shouldn't miss them + */ + @Test + public void testNewEntriesWhileStreaming() throws Exception { + appendToLog("1"); + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + entryStream.next(); // we've hit the end of the stream at this point + + // some new entries come in while we're streaming + appendToLog("2"); + appendToLog("3"); + + // don't see them + assertFalse(entryStream.hasNext()); + + // But we do if we reset + entryStream.reset(); + assertEquals("2", getRow(entryStream.next())); + assertEquals("3", getRow(entryStream.next())); + assertFalse(entryStream.hasNext()); + } + } + + @Test + public void testResumeStreamingFromPosition() throws Exception { + long lastPosition = 0; + appendToLog("1"); + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + entryStream.next(); // we've hit the end of the stream at this point + appendToLog("2"); + appendToLog("3"); + lastPosition = entryStream.getPosition(); + } + // next stream should picks up where we left off + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + assertEquals("2", getRow(entryStream.next())); + assertEquals("3", getRow(entryStream.next())); + assertFalse(entryStream.hasNext()); // done + assertEquals(1, walQueue.size()); + } + } + + /** + * Tests that if we stop before hitting the end of a stream, we can continue where we left off + * using the last position + */ + @Test + public void testPosition() throws Exception { + long lastPosition = 0; + appendEntriesToLog(3); + // read only one element + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + entryStream.next(); + lastPosition = entryStream.getPosition(); + } + // there should still be two more entries from where we left off + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + assertNotNull(entryStream.next()); + assertNotNull(entryStream.next()); + assertFalse(entryStream.hasNext()); + } + } + + + @Test + public void testEmptyStream() throws Exception { + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + assertFalse(entryStream.hasNext()); + } + } + + @Test + public void testReplicationSourceWALReaderThread() throws Exception { + appendEntriesToLog(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a batcher + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, + fs, conf, getDummyFilter(), new MetricsSource("1")); + Path walPath = walQueue.peek(); + batcher.start(); + WALEntryBatch entryBatch = batcher.take(); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + + appendToLog("foo"); + entryBatch = batcher.take(); + assertEquals(1, entryBatch.getNbEntries()); + assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo"); + } + + private String getRow(WAL.Entry entry) { + Cell cell = entry.getEdit().getCells().get(0); + return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + } + + private void appendToLog(String key) throws IOException { + final long txid = log.append(info, + new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), + getWALEdit(key), true); + log.sync(txid); + } + + private void appendEntriesToLog(int count) throws IOException { + for (int i = 0; i < count; i++) { + appendToLog(); + } + } + + private void appendToLog() throws IOException { + appendToLogPlus(1); + } + + private void appendToLogPlus(int count) throws IOException { + final long txid = log.append(info, + new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), + getWALEdits(count), true); + log.sync(txid); + } + + private WALEdit getWALEdits(int count) { + WALEdit edit = new WALEdit(); + for (int i = 0; i < count; i++) { + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, + System.currentTimeMillis(), qualifier)); + } + return edit; + } + + private WALEdit getWALEdit(String row) { + WALEdit edit = new WALEdit(); + edit.add( + new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); + return edit; + } + + private WALEntryFilter getDummyFilter() { + return new WALEntryFilter() { + + @Override + public Entry filter(Entry entry) { + return entry; + } + }; + } + + private ReplicationQueueInfo getQueueInfo() { + return new ReplicationQueueInfo("1"); + } + + class PathWatcher extends WALActionsListener.Base { + + Path currentPath; + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + walQueue.add(newPath); + currentPath = newPath; + } + } + +}