From 16a4dd6b8f98cb1116007764cb86f6835a7ca84f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 18 Mar 2018 18:09:45 +0800 Subject: [PATCH] HBASE-20206 WALEntryStream should not switch WAL file silently --- .../replication/ReplicationQueueStorage.java | 2 +- .../ZKReplicationQueueStorage.java | 41 ++-- .../TestReplicationStateBasic.java | 3 +- .../TestZKReplicationQueueStorage.java | 6 +- .../RecoveredReplicationSource.java | 33 --- .../RecoveredReplicationSourceShipper.java | 13 +- .../regionserver/ReplicationSource.java | 2 +- .../ReplicationSourceManager.java | 102 ++++----- .../ReplicationSourceShipper.java | 96 +++++---- .../ReplicationSourceWALReader.java | 50 ++++- .../SerialReplicationSourceWALReader.java | 29 ++- .../regionserver/WALEntryBatch.java | 22 ++ .../regionserver/WALEntryStream.java | 5 +- .../TestReplicationSourceManager.java | 17 +- .../regionserver/TestWALEntryStream.java | 196 +++++++++++++----- 15 files changed, 390 insertions(+), 227 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 4c93da68389..cfe9c9c8e80 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -63,7 +63,7 @@ public interface ReplicationQueueStorage { * @param serverName the name of the regionserver * @param queueId a String that identifies the queue * @param fileName name of the WAL - * @param position the current position in the file + * @param position the current position in the file. Will ignore if less than or equal to 0. * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. */ void setWALPosition(ServerName serverName, String queueId, String fileName, long position, diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index adbf2598e94..63f43e85347 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -193,27 +193,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase Map lastSeqIds) throws ReplicationException { try { List listOfOps = new ArrayList<>(); - listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), - ZKUtil.positionToByteArray(position))); - // Persist the max sequence id(s) of regions for serial replication atomically. - if (lastSeqIds != null && lastSeqIds.size() > 0) { - for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); - String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); - /* - * Make sure the existence of path - * /hbase/replication/regions//-. As the javadoc in - * multiOrSequential() method said, if received a NodeExistsException, all operations will - * fail. So create the path here, and in fact, no need to add this operation to listOfOps, - * because only need to make sure that update file position and sequence id atomically. - */ - ZKUtil.createWithParents(zookeeper, path); - // Persist the max sequence id of region to zookeeper. - listOfOps - .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); - } + if (position > 0) { + listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position))); + } + // Persist the max sequence id(s) of regions for serial replication atomically. + for (Entry lastSeqEntry : lastSeqIds.entrySet()) { + String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + /* + * Make sure the existence of path + * /hbase/replication/regions//-. As the javadoc in + * multiOrSequential() method said, if received a NodeExistsException, all operations will + * fail. So create the path here, and in fact, no need to add this operation to listOfOps, + * because only need to make sure that update file position and sequence id atomically. + */ + ZKUtil.createWithParents(zookeeper, path); + // Persist the max sequence id of region to zookeeper. + listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + if (!listOfOps.isEmpty()) { + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } - ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { throw new ReplicationException("Failed to set log position (serverName=" + serverName + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 5999c1ffb00..21b09aa89b4 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.Path; @@ -127,7 +128,7 @@ public abstract class TestReplicationStateBasic { assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); - rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); + rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap()); assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index 8ff52f3c16b..c8138703c00 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -136,9 +137,10 @@ public class TestZKReplicationQueueStorage { for (int i = 0; i < 10; i++) { assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); + STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, + Collections.emptyMap()); STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, - null); + Collections.emptyMap()); } for (int i = 0; i < 10; i++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 169b4691b61..f1ad99d3049 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, - BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { - LOG.trace("Didn't read any new entries from WAL"); - // we're done with queue recovery, shut ourself down - reader.setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(new WALEntryBatch(0, currentPath)); - } - - @Override - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { - if (replicationPeer.getPeerConfig().isSerial()) { - return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, - this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } else { - return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) { - - @Override - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { - handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); - } - }; - } - } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { boolean hasPathChanged = false; PriorityBlockingQueue newPaths = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 1ae5cb9e9af..d74211ea374 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -48,13 +48,10 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - protected void postShipEdits(WALEntryBatch entryBatch) { - if (entryBatch.getWalEntries().isEmpty()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getQueueId()); - source.getSourceMetrics().incrCompletedRecoveryQueue(); - setWorkerState(WorkerState.FINISHED); - } + protected void noMoreData() { + LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId()); + source.getSourceMetrics().incrCompletedRecoveryQueue(); + setWorkerState(WorkerState.FINISHED); } @Override @@ -63,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - public long getStartPosition() { + long getStartPosition() { long startPosition = getRecoveredQueueStartPos(); int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 348091951e0..236c575e40b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } - protected ReplicationSourceWALReader createNewWALReader(String walGroupId, + private ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 06fe9776992..23e1115bb96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -82,25 +83,28 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * operations. *
  • Need synchronized on {@link #walsById}. There are four methods which modify it, * {@link #addPeer(String)}, {@link #removePeer(String)}, - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById} - * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So - * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}. - * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. - * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the - * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no - * race with {@link #removePeer(String)}. The only case need synchronized is - * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.
  • - *
  • No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which - * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and - * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. - * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. + * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. + * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in + * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and + * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is + * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then - * remove the wals from {@link #walsByIdRecoveredQueues}. And - * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to - * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So - * there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and - * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need - * synchronized on {@link #walsByIdRecoveredQueues}.
  • + * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only + * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and + * {@link #preLogRoll(Path)}. + *
  • No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which + * modify it, {@link #removePeer(String)} , + * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and + * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. + * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by + * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the + * {@link ReplicationSourceInterface} firstly, then remove the wals from + * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} + * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a + * {@link ReplicationSourceInterface}. So there is no race here. For + * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there + * is already synchronized on {@link #oldsources}. So no need synchronized on + * {@link #walsByIdRecoveredQueues}.
  • *
  • Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.
  • *
  • Need synchronized on {@link #oldsources} to avoid adding recovered source for the * to-be-removed peer.
  • @@ -124,11 +128,11 @@ public class ReplicationSourceManager implements ReplicationListener { // All logs we are currently tracking // Index structure of the map is: queue_id->logPrefix/logGroup->logs // For normal replication source, the peer id is same with the queue id - private final ConcurrentMap>> walsById; + private final ConcurrentMap>> walsById; // Logs for recovered sources we are currently tracking // the map is: queue_id->logPrefix/logGroup->logs // For recovered source, the queue id's format is peer_id-servername-* - private final ConcurrentMap>> walsByIdRecoveredQueues; + private final ConcurrentMap>> walsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; @@ -335,14 +339,14 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { this.sources.put(peerId, src); - Map> walsByGroup = new HashMap<>(); + Map> walsByGroup = new HashMap<>(); this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); - SortedSet logs = new TreeSet<>(); + NavigableSet logs = new TreeSet<>(); logs.add(name); walsByGroup.put(walPrefix, logs); // Abort RS and throw exception to make add peer failed @@ -474,50 +478,51 @@ public class ReplicationSourceManager implements ReplicationListener { /** * This method will log the current position to storage. And also clean old logs from the * replication queue. - * @param log Path to the log currently being replicated * @param queueId id of the replication queue - * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server + * @param entryBatch the wal entry batch we just shipped */ - public void logPositionAndCleanOldLogs(Path log, String queueId, long position, - Map lastSeqIds, boolean queueRecovered) { - String fileName = log.getName(); + public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, + WALEntryBatch entryBatch) { + String fileName = entryBatch.getLastWalPath().getName(); abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, - position, lastSeqIds)); - cleanOldLogs(fileName, queueId, queueRecovered); + entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); + cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); } /** * Cleans a log file and all older logs from replication queue. Called when we are sure that a log * file is closed and has no more entries. * @param log Path to the log + * @param inclusive whether we should also remove the given log file * @param queueId id of the replication queue * @param queueRecovered Whether this is a recovered queue */ @VisibleForTesting - void cleanOldLogs(String log, String queueId, boolean queueRecovered) { + void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); if (queueRecovered) { - SortedSet wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); - if (wals != null && !wals.first().equals(log)) { - cleanOldLogs(wals, log, queueId); + NavigableSet wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); + if (wals != null) { + cleanOldLogs(wals, log, inclusive, queueId); } } else { // synchronized on walsById to avoid race with preLogRoll synchronized (this.walsById) { - SortedSet wals = walsById.get(queueId).get(logPrefix); + NavigableSet wals = walsById.get(queueId).get(logPrefix); if (wals != null && !wals.first().equals(log)) { - cleanOldLogs(wals, log, queueId); + cleanOldLogs(wals, log, inclusive, queueId); } } } } - private void cleanOldLogs(SortedSet wals, String key, String id) { - SortedSet walSet = wals.headSet(key); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + private void cleanOldLogs(NavigableSet wals, String key, boolean inclusive, String id) { + NavigableSet walSet = wals.headSet(key, inclusive); + if (walSet.isEmpty()) { + return; } + LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); for (String wal : walSet) { abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); } @@ -542,11 +547,12 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on walsById to avoid race with cleanOldLogs synchronized (this.walsById) { // Update walsById map - for (Map.Entry>> entry : this.walsById.entrySet()) { + for (Map.Entry>> entry : this.walsById + .entrySet()) { String peerId = entry.getKey(); - Map> walsByPrefix = entry.getValue(); + Map> walsByPrefix = entry.getValue(); boolean existingPrefix = false; - for (Map.Entry> walsEntry : walsByPrefix.entrySet()) { + for (Map.Entry> walsEntry : walsByPrefix.entrySet()) { SortedSet wals = walsEntry.getValue(); if (this.sources.isEmpty()) { // If there's no slaves, don't need to keep the old wals since @@ -560,8 +566,8 @@ public class ReplicationSourceManager implements ReplicationListener { } if (!existingPrefix) { // The new log belongs to a new group, add it into this peer - LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); - SortedSet wals = new TreeSet<>(); + LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); + NavigableSet wals = new TreeSet<>(); wals.add(logName); walsByPrefix.put(logPrefix, wals); } @@ -700,11 +706,11 @@ public class ReplicationSourceManager implements ReplicationListener { continue; } // track sources in walsByIdRecoveredQueues - Map> walsByGroup = new HashMap<>(); + Map> walsByGroup = new HashMap<>(); walsByIdRecoveredQueues.put(queueId, walsByGroup); for (String wal : walsSet) { String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - SortedSet wals = walsByGroup.get(walPrefix); + NavigableSet wals = walsByGroup.get(walPrefix); if (wals == null) { wals = new TreeSet<>(); walsByGroup.put(walPrefix, wals); @@ -749,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @return a sorted set of wal names */ @VisibleForTesting - Map>> getWALs() { + Map>> getWALs() { return Collections.unmodifiableMap(walsById); } @@ -758,7 +764,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @return a sorted set of wal names */ @VisibleForTesting - Map>> getWalsByIdRecoveredQueues() { + Map>> getWalsByIdRecoveredQueues() { return Collections.unmodifiableMap(walsByIdRecoveredQueues); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index aa5251e4fe4..2097d00817d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -52,17 +51,18 @@ public class ReplicationSourceShipper extends Thread { FINISHED, // The worker is done processing a recovered queue } - protected final Configuration conf; + private final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue queue; - protected final ReplicationSourceInterface source; + private final ReplicationSourceInterface source; // Last position in the log that we sent to ZooKeeper - protected long lastLoggedPosition = -1; + // It will be accessed by the stats thread so make it volatile + private volatile long currentPosition = -1; // Path of the current log - protected volatile Path currentPath; + private Path currentPath; // Current state of the worker thread - private WorkerState state; + private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; // How long should we sleep for each retry @@ -97,8 +97,12 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.take(); - shipEdits(entryBatch); - postShipEdits(entryBatch); + // the NO_MORE_DATA instance has no path so do not all shipEdits + if (entryBatch == WALEntryBatch.NO_MORE_DATA) { + noMoreData(); + } else { + shipEdits(entryBatch); + } } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); Thread.currentThread().interrupt(); @@ -113,7 +117,7 @@ public class ReplicationSourceShipper extends Thread { } // To be implemented by recovered shipper - protected void postShipEdits(WALEntryBatch entryBatch) { + protected void noMoreData() { } // To be implemented by recovered shipper @@ -123,14 +127,11 @@ public class ReplicationSourceShipper extends Thread { /** * Do the shipping logic */ - protected final void shipEdits(WALEntryBatch entryBatch) { + private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); - long lastReadPosition = entryBatch.getLastWalPosition(); - currentPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + if (updateLogPosition(entryBatch)) { // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to to indicate that we're current source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), @@ -168,16 +169,12 @@ public class ReplicationSourceShipper extends Thread { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - - if (this.lastLoggedPosition != lastReadPosition) { - // Clean up hfile references - int size = entries.size(); - for (int i = 0; i < size; i++) { - cleanUpHFileRefs(entries.get(i).getEdit()); - } - // Log and clean up WAL logs - updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); } + // Log and clean up WAL logs + updateLogPosition(entryBatch); source.postShipEdits(entries, currentSize); // FIXME check relationship between wal group and overall @@ -224,10 +221,29 @@ public class ReplicationSourceShipper extends Thread { } } - private void updateLogPosition(long lastReadPosition, Map lastSeqIds) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), - lastReadPosition, lastSeqIds, source.isRecovered()); - lastLoggedPosition = lastReadPosition; + private boolean updateLogPosition(WALEntryBatch batch) { + boolean updated = false; + // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file + // record on zk, so let's call it. The last wal position maybe zero if end of file is true and + // there is no entry in the batch. It is OK because that the queue storage will ignore the zero + // position and the file will be removed soon in cleanOldLogs. + if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || + batch.getLastWalPosition() != currentPosition) { + source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), + source.isRecovered(), batch); + updated = true; + } + // if end of file is true, then we can just skip to the next file in queue. + // the only exception is for recovered queue, if we reach the end of the queue, then there will + // no more files so here the currentPath may be null. + if (batch.isEndOfFile()) { + currentPath = entryReader.getCurrentPath(); + currentPosition = 0L; + } else { + currentPath = batch.getLastWalPath(); + currentPosition = batch.getLastWalPosition(); + } + return updated; } public void startup(UncaughtExceptionHandler handler) { @@ -236,39 +252,31 @@ public class ReplicationSourceShipper extends Thread { name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } - public PriorityBlockingQueue getLogQueue() { - return this.queue; + Path getCurrentPath() { + return entryReader.getCurrentPath(); } - public Path getCurrentPath() { - return this.entryReader.getCurrentPath(); + long getCurrentPosition() { + return currentPosition; } - public long getCurrentPosition() { - return this.lastLoggedPosition; - } - - public void setWALReader(ReplicationSourceWALReader entryReader) { + void setWALReader(ReplicationSourceWALReader entryReader) { this.entryReader = entryReader; } - public long getStartPosition() { + long getStartPosition() { return 0; } - protected final boolean isActive() { + private boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } - public void setWorkerState(WorkerState state) { + protected final void setWorkerState(WorkerState state) { this.state = state; } - public WorkerState getWorkerState() { - return state; - } - - public void stopWorker() { + void stopWorker() { setWorkerState(WorkerState.STOPPED); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b125133864c..21548566cee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -59,7 +59,7 @@ class ReplicationSourceWALReader extends Thread { private final WALEntryFilter filter; private final ReplicationSource source; - protected final BlockingQueue entryBatchQueue; + private final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total @@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread { continue; } WALEntryBatch batch = readWALEntries(entryStream); + currentPosition = entryStream.getPosition(); if (batch != null) { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. @@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread { sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL handleEmptyWALEntryBatch(entryStream.getCurrentPath()); + entryStream.reset(); // reuse stream } - currentPosition = entryStream.getPosition(); - entryStream.reset(); // reuse stream } } catch (IOException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { @@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } + protected static final boolean switched(WALEntryStream entryStream, Path path) { + return !path.equals(entryStream.getCurrentPath()); + } + protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.next(); batch.setLastWalPosition(entryStream.getPosition()); entry = filterEntry(entry); @@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread { break; } } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } - protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); - Thread.sleep(sleepForRetries); + if (source.isRecovered()) { + // we're done with queue recovery, shut ourself down + setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); + } else { + Thread.sleep(sleepForRetries); + } } // if we get an EOF due to a zero-length log, and there are other logs in queue diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 5e9a9f627ea..9edcc8a17a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { + Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { - return null; + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } else { + // when reading from the entry stream first time we will enter here + currentPath = entryStream.getCurrentPath(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); - do { + for (;;) { Entry entry = entryStream.peek(); boolean doFiltering = true; if (firstCellInEntryBeforeFiltering == null) { @@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader // actually remove the entry. removeEntryFromStream(entryStream, batch); } - } while (entryStream.hasNext()); + boolean hasNext = entryStream.hasNext(); + // always return if we have switched to a new file. + if (switched(entryStream, currentPath)) { + batch.setEndOfFile(true); + break; + } + if (!hasNext) { + break; + } + } return batch; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 31c3ac74c91..960d47341c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private class WALEntryBatch { + + // used by recovered replication queue to indicate that all the entries have been read. + public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); + private List walEntries; // last WAL that was read private Path lastWalPath; @@ -43,6 +47,8 @@ class WALEntryBatch { private long heapSize = 0; // save the last sequenceid for each region if the table has serial-replication scope private Map lastSeqIds = new HashMap<>(); + // indicate that this is the end of the current file + private boolean endOfFile; /** * @param lastWalPath Path of the WAL the last entry in this batch was read from @@ -52,6 +58,14 @@ class WALEntryBatch { this.lastWalPath = lastWalPath; } + + static WALEntryBatch endOfFile(Path lastWalPath) { + WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); + batch.setLastWalPosition(-1L); + batch.setEndOfFile(true); + return batch; + } + public void addEntry(Entry entry) { walEntries.add(entry); } @@ -120,6 +134,14 @@ class WALEntryBatch { return lastSeqIds; } + public boolean isEndOfFile() { + return endOfFile; + } + + public void setEndOfFile(boolean endOfFile) { + this.endOfFile = endOfFile; + } + public void incrementNbRowKeys(int increment) { nbRowKeys += increment; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c639a483361..b2c199e2433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -155,7 +155,6 @@ class WALEntryStream implements Closeable { /** * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned * false) - * @throws IOException */ public void reset() throws IOException { if (reader != null && currentPath != null) { @@ -304,6 +303,9 @@ class WALEntryStream implements Closeable { if (reader != null) { return true; } + } else { + // no more files in queue, this could only happen for recovered queue. + setCurrentPath(null); } return false; } @@ -394,6 +396,7 @@ class WALEntryStream implements Closeable { private void resetReader() throws IOException { try { + currentEntry = null; reader.reset(); seek(); } catch (FileNotFoundException fnfe) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 6d75fec9fdd..eb46cd77f4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; @@ -308,25 +309,25 @@ public abstract class TestReplicationSourceManager { for (int i = 0; i < 3; i++) { wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, true); + edit, true); } wal.sync(); int logNumber = 0; - for (Map.Entry> entry : manager.getWALs().get(slaveId).entrySet()) { + for (Map.Entry> entry : manager.getWALs().get(slaveId) + .entrySet()) { logNumber += entry.getValue().size(); } assertEquals(6, logNumber); wal.rollWriter(); - manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, null, false); + manager.logPositionAndCleanOldLogs("1", false, + new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); wal.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, - true); + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + edit, true); wal.sync(); assertEquals(1, manager.getWALs().size()); @@ -396,7 +397,7 @@ public abstract class TestReplicationSourceManager { assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); - manager.cleanOldLogs(file2, id, true); + manager.cleanOldLogs(file2, false, id, true); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index eb7d5a07537..2670756ef1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; @@ -75,7 +76,7 @@ public class TestWALEntryStream { HBaseClassTestRule.forClass(TestWALEntryStream.class); private static HBaseTestingUtility TEST_UTIL; - private static Configuration conf; + private static Configuration CONF; private static FileSystem fs; private static MiniDFSCluster cluster; private static final TableName tableName = TableName.valueOf("tablename"); @@ -102,7 +103,7 @@ public class TestWALEntryStream { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); - conf = TEST_UTIL.getConfiguration(); + CONF = TEST_UTIL.getConfiguration(); TEST_UTIL.startMiniDFSCluster(3); cluster = TEST_UTIL.getDFSCluster(); @@ -118,7 +119,7 @@ public class TestWALEntryStream { public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(conf, tn.getMethodName()); + final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); wals.getWALProvider().addWALActionsListener(pathWatcher); log = wals.getWAL(info); } @@ -144,13 +145,13 @@ public class TestWALEntryStream { mvcc.advanceTo(1); for (int i = 0; i < nbRows; i++) { - appendToLogPlus(walEditKVs); + appendToLogAndSync(walEditKVs); } log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -174,10 +175,10 @@ public class TestWALEntryStream { */ @Test public void testAppendsWithRolls() throws Exception { - appendToLog(); + appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -189,9 +190,9 @@ public class TestWALEntryStream { oldPos = entryStream.getPosition(); } - appendToLog(); + appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, log, null, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -201,11 +202,11 @@ public class TestWALEntryStream { } // We rolled but we still should see the end of the first log and get that item - appendToLog(); + appendToLogAndSync(); log.rollWriter(); - appendToLog(); + appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, log, null, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -231,7 +232,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -256,7 +257,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -279,7 +280,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -287,7 +288,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -302,16 +303,16 @@ public class TestWALEntryStream { @Test public void testPosition() throws Exception { long lastPosition = 0; - appendEntriesToLog(3); + appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -322,38 +323,44 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } + private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Server mockServer = Mockito.mock(Server.class); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.getSourceManager()).thenReturn(mockSourceManager); + when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + when(source.getWALFileLengthProvider()).thenReturn(log); + when(source.getServer()).thenReturn(mockServer); + when(source.isRecovered()).thenReturn(recovered); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + reader.start(); + return reader; + } + @Test - public void testReplicationSourceWALReaderThread() throws Exception { - appendEntriesToLog(3); + public void testReplicationSourceWALReader() throws Exception { + appendEntriesToLogAndSync(3); // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); position = entryStream.getPosition(); } - // start up a batcher - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - Server mockServer= Mockito.mock(Server.class); - ReplicationSource source = Mockito.mock(ReplicationSource.class); - when(source.getSourceManager()).thenReturn(mockSourceManager); - when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); - when(source.getWALFileLengthProvider()).thenReturn(log); - when(source.getServer()).thenReturn(mockServer); - ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, - walQueue, 0, getDummyFilter(), source); + // start up a reader Path walPath = walQueue.peek(); - batcher.start(); - WALEntryBatch entryBatch = batcher.take(); + ReplicationSourceWALReader reader = createReader(false, CONF); + WALEntryBatch entryBatch = reader.take(); // should've batched up our entries assertNotNull(entryBatch); @@ -363,11 +370,96 @@ public class TestWALEntryStream { assertEquals(3, entryBatch.getNbRowKeys()); appendToLog("foo"); - entryBatch = batcher.take(); + entryBatch = reader.take(); assertEquals(1, entryBatch.getNbEntries()); assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); } + @Test + public void testReplicationSourceWALReaderRecovered() throws Exception { + appendEntriesToLogAndSync(10); + Path walPath = walQueue.peek(); + log.rollWriter(); + appendEntriesToLogAndSync(5); + log.shutdown(); + + Configuration conf = new Configuration(CONF); + conf.setInt("replication.source.nb.capacity", 10); + + ReplicationSourceWALReader reader = createReader(true, conf); + + WALEntryBatch batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(10, batch.getNbEntries()); + assertFalse(batch.isEndOfFile()); + + batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(0, batch.getNbEntries()); + assertTrue(batch.isEndOfFile()); + + walPath = walQueue.peek(); + batch = reader.take(); + assertEquals(walPath, batch.getLastWalPath()); + assertEquals(5, batch.getNbEntries()); + // Actually this should be true but we haven't handled this yet since for a normal queue the + // last one is always open... Not a big deal for now. + assertFalse(batch.isEndOfFile()); + + assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); + } + + // Testcase for HBASE-20206 + @Test + public void testReplicationSourceWALReaderWrongPosition() throws Exception { + appendEntriesToLogAndSync(1); + Path walPath = walQueue.peek(); + log.rollWriter(); + appendEntriesToLogAndSync(20); + TEST_UTIL.waitFor(5000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return fs.getFileStatus(walPath).getLen() > 0; + } + + @Override + public String explainFailure() throws Exception { + return walPath + " has not been closed yet"; + } + + }); + long walLength = fs.getFileStatus(walPath).getLen(); + + ReplicationSourceWALReader reader = createReader(false, CONF); + + WALEntryBatch entryBatch = reader.take(); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + + walLength, entryBatch.getLastWalPosition() <= walLength); + assertEquals(1, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + Path walPath2 = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(20, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + + log.rollWriter(); + appendEntriesToLogAndSync(10); + entryBatch = reader.take(); + assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(0, entryBatch.getNbEntries()); + assertTrue(entryBatch.isEndOfFile()); + + Path walPath3 = walQueue.peek(); + entryBatch = reader.take(); + assertEquals(walPath3, entryBatch.getLastWalPath()); + assertEquals(10, entryBatch.getNbEntries()); + assertFalse(entryBatch.isEndOfFile()); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); @@ -380,22 +472,28 @@ public class TestWALEntryStream { log.sync(txid); } - private void appendEntriesToLog(int count) throws IOException { + private void appendEntriesToLogAndSync(int count) throws IOException { + long txid = -1L; for (int i = 0; i < count; i++) { - appendToLog(); + txid = appendToLog(1); } - } - - private void appendToLog() throws IOException { - appendToLogPlus(1); - } - - private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); log.sync(txid); } + private void appendToLogAndSync() throws IOException { + appendToLogAndSync(1); + } + + private void appendToLogAndSync(int count) throws IOException { + long txid = appendToLog(count); + log.sync(txid); + } + + private long appendToLog(int count) throws IOException { + return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); + } + private WALEdit getWALEdits(int count) { WALEdit edit = new WALEdit(); for (int i = 0; i < count; i++) { @@ -439,7 +537,7 @@ public class TestWALEntryStream { appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next());