HBASE-20206 WALEntryStream should not switch WAL file silently

This commit is contained in:
zhangduo 2018-03-18 18:09:45 +08:00
parent 644bfe36b2
commit 16a4dd6b8f
15 changed files with 390 additions and 227 deletions

View File

@ -63,7 +63,7 @@ public interface ReplicationQueueStorage {
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
* @param position the current position in the file
* @param position the current position in the file. Will ignore if less than or equal to 0.
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/
void setWALPosition(ServerName serverName, String queueId, String fileName, long position,

View File

@ -193,27 +193,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
Map<String, Long> lastSeqIds) throws ReplicationException {
try {
List<ZKUtilOp> listOfOps = new ArrayList<>();
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position)));
// Persist the max sequence id(s) of regions for serial replication atomically.
if (lastSeqIds != null && lastSeqIds.size() > 0) {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* multiOrSequential() method said, if received a NodeExistsException, all operations will
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* because only need to make sure that update file position and sequence id atomically.
*/
ZKUtil.createWithParents(zookeeper, path);
// Persist the max sequence id of region to zookeeper.
listOfOps
.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
if (position > 0) {
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position)));
}
// Persist the max sequence id(s) of regions for serial replication atomically.
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* multiOrSequential() method said, if received a NodeExistsException, all operations will
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* because only need to make sure that update file position and sequence id atomically.
*/
ZKUtil.createWithParents(zookeeper, path);
// Persist the max sequence id of region to zookeeper.
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
if (!listOfOps.isEmpty()) {
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
}
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.Path;
@ -127,7 +128,7 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@ -136,9 +137,10 @@ public class TestZKReplicationQueueStorage {
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
Collections.emptyMap());
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
null);
Collections.emptyMap());
}
for (int i = 0; i < 10; i++) {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
}
private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
reader.setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(new WALEntryBatch(0, currentPath));
}
@Override
protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
if (replicationPeer.getPeerConfig().isSerial()) {
return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
this) {
@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
} else {
return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
}
}
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =

View File

@ -48,13 +48,10 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
protected void postShipEdits(WALEntryBatch entryBatch) {
if (entryBatch.getWalEntries().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
protected void noMoreData() {
LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
@Override
@ -63,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
public long getStartPosition() {
long getStartPosition() {
long startPosition = getRecoveredQueueStartPos();
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {

View File

@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
}
protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -82,25 +83,28 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* operations.</li>
* <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
* {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById}
* is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So
* there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}.
* {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
* So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no
* race with {@link #removePeer(String)}. The only case need synchronized is
* {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
* {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
* {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
* {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
* called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
* remove the wals from {@link #walsByIdRecoveredQueues}. And
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to
* {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
* there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and
* {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
* synchronized on {@link #walsByIdRecoveredQueues}.</li>
* remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
* case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
* will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
* {@link ReplicationSourceInterface}. So there is no race here. For
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
* is already synchronized on {@link #oldsources}. So no need synchronized on
* {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
@ -124,11 +128,11 @@ public class ReplicationSourceManager implements ReplicationListener {
// All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs
// For normal replication source, the peer id is same with the queue id
private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById;
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs
// For recovered source, the queue id's format is peer_id-servername-*
private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
@ -335,14 +339,14 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on latestPaths to avoid missing the new log
synchronized (this.latestPaths) {
this.sources.put(peerId, src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
if (this.latestPaths.size() > 0) {
for (Path logPath : latestPaths) {
String name = logPath.getName();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
SortedSet<String> logs = new TreeSet<>();
NavigableSet<String> logs = new TreeSet<>();
logs.add(name);
walsByGroup.put(walPrefix, logs);
// Abort RS and throw exception to make add peer failed
@ -474,50 +478,51 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* This method will log the current position to storage. And also clean old logs from the
* replication queue.
* @param log Path to the log currently being replicated
* @param queueId id of the replication queue
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param entryBatch the wal entry batch we just shipped
*/
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
Map<String, Long> lastSeqIds, boolean queueRecovered) {
String fileName = log.getName();
public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
position, lastSeqIds));
cleanOldLogs(fileName, queueId, queueRecovered);
entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
}
/**
* Cleans a log file and all older logs from replication queue. Called when we are sure that a log
* file is closed and has no more entries.
* @param log Path to the log
* @param inclusive whether we should also remove the given log file
* @param queueId id of the replication queue
* @param queueRecovered Whether this is a recovered queue
*/
@VisibleForTesting
void cleanOldLogs(String log, String queueId, boolean queueRecovered) {
void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (queueRecovered) {
SortedSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
if (wals != null && !wals.first().equals(log)) {
cleanOldLogs(wals, log, queueId);
NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
if (wals != null) {
cleanOldLogs(wals, log, inclusive, queueId);
}
} else {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
SortedSet<String> wals = walsById.get(queueId).get(logPrefix);
NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
if (wals != null && !wals.first().equals(log)) {
cleanOldLogs(wals, log, queueId);
cleanOldLogs(wals, log, inclusive, queueId);
}
}
}
}
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
NavigableSet<String> walSet = wals.headSet(key, inclusive);
if (walSet.isEmpty()) {
return;
}
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
for (String wal : walSet) {
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
}
@ -542,11 +547,12 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on walsById to avoid race with cleanOldLogs
synchronized (this.walsById) {
// Update walsById map
for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
.entrySet()) {
String peerId = entry.getKey();
Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since
@ -560,8 +566,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
SortedSet<String> wals = new TreeSet<>();
LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
NavigableSet<String> wals = new TreeSet<>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
@ -700,11 +706,11 @@ public class ReplicationSourceManager implements ReplicationListener {
continue;
}
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
SortedSet<String> wals = walsByGroup.get(walPrefix);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
@ -749,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
Map<String, Map<String, SortedSet<String>>> getWALs() {
Map<String, Map<String, NavigableSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@ -758,7 +764,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -52,17 +51,18 @@ public class ReplicationSourceShipper extends Thread {
FINISHED, // The worker is done processing a recovered queue
}
protected final Configuration conf;
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
protected final ReplicationSourceInterface source;
private final ReplicationSourceInterface source;
// Last position in the log that we sent to ZooKeeper
protected long lastLoggedPosition = -1;
// It will be accessed by the stats thread so make it volatile
private volatile long currentPosition = -1;
// Path of the current log
protected volatile Path currentPath;
private Path currentPath;
// Current state of the worker thread
private WorkerState state;
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
// How long should we sleep for each retry
@ -97,8 +97,12 @@ public class ReplicationSourceShipper extends Thread {
}
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
postShipEdits(entryBatch);
// the NO_MORE_DATA instance has no path so do not all shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
} else {
shipEdits(entryBatch);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
@ -113,7 +117,7 @@ public class ReplicationSourceShipper extends Thread {
}
// To be implemented by recovered shipper
protected void postShipEdits(WALEntryBatch entryBatch) {
protected void noMoreData() {
}
// To be implemented by recovered shipper
@ -123,14 +127,11 @@ public class ReplicationSourceShipper extends Thread {
/**
* Do the shipping logic
*/
protected final void shipEdits(WALEntryBatch entryBatch) {
private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
if (updateLogPosition(entryBatch)) {
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@ -168,16 +169,12 @@ public class ReplicationSourceShipper extends Thread {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
if (this.lastLoggedPosition != lastReadPosition) {
// Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
// Log and clean up WAL logs
updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
source.postShipEdits(entries, currentSize);
// FIXME check relationship between wal group and overall
@ -224,10 +221,29 @@ public class ReplicationSourceShipper extends Thread {
}
}
private void updateLogPosition(long lastReadPosition, Map<String, Long> lastSeqIds) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, lastSeqIds, source.isRecovered());
lastLoggedPosition = lastReadPosition;
private boolean updateLogPosition(WALEntryBatch batch) {
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
// there is no entry in the batch. It is OK because that the queue storage will ignore the zero
// position and the file will be removed soon in cleanOldLogs.
if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
batch.getLastWalPosition() != currentPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
source.isRecovered(), batch);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
// the only exception is for recovered queue, if we reach the end of the queue, then there will
// no more files so here the currentPath may be null.
if (batch.isEndOfFile()) {
currentPath = entryReader.getCurrentPath();
currentPosition = 0L;
} else {
currentPath = batch.getLastWalPath();
currentPosition = batch.getLastWalPosition();
}
return updated;
}
public void startup(UncaughtExceptionHandler handler) {
@ -236,39 +252,31 @@ public class ReplicationSourceShipper extends Thread {
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
}
public PriorityBlockingQueue<Path> getLogQueue() {
return this.queue;
Path getCurrentPath() {
return entryReader.getCurrentPath();
}
public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
long getCurrentPosition() {
return currentPosition;
}
public long getCurrentPosition() {
return this.lastLoggedPosition;
}
public void setWALReader(ReplicationSourceWALReader entryReader) {
void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}
public long getStartPosition() {
long getStartPosition() {
return 0;
}
protected final boolean isActive() {
private boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
public void setWorkerState(WorkerState state) {
protected final void setWorkerState(WorkerState state) {
this.state = state;
}
public WorkerState getWorkerState() {
return state;
}
public void stopWorker() {
void stopWorker() {
setWorkerState(WorkerState.STOPPED);
}

View File

@ -59,7 +59,7 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;
protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
private final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread {
continue;
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
entryStream.reset(); // reuse stream
}
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
}
} catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread {
batch.getNbEntries() >= replicationBatchCountCapacity;
}
protected static final boolean switched(WALEntryStream entryStream, Path path) {
return !path.equals(entryStream.getCurrentPath());
}
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
return null;
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
} else {
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
do {
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
entry = filterEntry(entry);
@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread {
break;
}
}
} while (entryStream.hasNext());
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file
if (switched(entryStream, currentPath)) {
batch.setEndOfFile(true);
break;
}
if (!hasNext) {
break;
}
}
return batch;
}
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
if (source.isRecovered()) {
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
} else {
Thread.sleep(sleepForRetries);
}
}
// if we get an EOF due to a zero-length log, and there are other logs in queue

View File

@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
return null;
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
} else {
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
do {
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
if (firstCellInEntryBeforeFiltering == null) {
@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
}
} while (entryStream.hasNext());
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file.
if (switched(entryStream, currentPath)) {
batch.setEndOfFile(true);
break;
}
if (!hasNext) {
break;
}
}
return batch;
}

View File

@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
class WALEntryBatch {
// used by recovered replication queue to indicate that all the entries have been read.
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
@ -43,6 +47,8 @@ class WALEntryBatch {
private long heapSize = 0;
// save the last sequenceid for each region if the table has serial-replication scope
private Map<String, Long> lastSeqIds = new HashMap<>();
// indicate that this is the end of the current file
private boolean endOfFile;
/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
@ -52,6 +58,14 @@ class WALEntryBatch {
this.lastWalPath = lastWalPath;
}
static WALEntryBatch endOfFile(Path lastWalPath) {
WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
batch.setLastWalPosition(-1L);
batch.setEndOfFile(true);
return batch;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
@ -120,6 +134,14 @@ class WALEntryBatch {
return lastSeqIds;
}
public boolean isEndOfFile() {
return endOfFile;
}
public void setEndOfFile(boolean endOfFile) {
this.endOfFile = endOfFile;
}
public void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}

View File

@ -155,7 +155,6 @@ class WALEntryStream implements Closeable {
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
* @throws IOException
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
@ -304,6 +303,9 @@ class WALEntryStream implements Closeable {
if (reader != null) {
return true;
}
} else {
// no more files in queue, this could only happen for recovered queue.
setCurrentPath(null);
}
return false;
}
@ -394,6 +396,7 @@ class WALEntryStream implements Closeable {
private void resetReader() throws IOException {
try {
currentEntry = null;
reader.reset();
seek();
} catch (FileNotFoundException fnfe) {

View File

@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
@ -308,25 +309,25 @@ public abstract class TestReplicationSourceManager {
for (int i = 0; i < 3; i++) {
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit, true);
edit, true);
}
wal.sync();
int logNumber = 0;
for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
.entrySet()) {
logNumber += entry.getValue().size();
}
assertEquals(6, logNumber);
wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, null, false);
manager.logPositionAndCleanOldLogs("1", false,
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit, true);
wal.sync();
assertEquals(1, manager.getWALs().size());
@ -396,7 +397,7 @@ public abstract class TestReplicationSourceManager {
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
manager.cleanOldLogs(file2, id, true);
manager.cleanOldLogs(file2, false, id, true);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@ -75,7 +76,7 @@ public class TestWALEntryStream {
HBaseClassTestRule.forClass(TestWALEntryStream.class);
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static Configuration CONF;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
@ -102,7 +103,7 @@ public class TestWALEntryStream {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
CONF = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
@ -118,7 +119,7 @@ public class TestWALEntryStream {
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
pathWatcher = new PathWatcher();
final WALFactory wals = new WALFactory(conf, tn.getMethodName());
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
@ -144,13 +145,13 @@ public class TestWALEntryStream {
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
appendToLogPlus(walEditKVs);
appendToLogAndSync(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@ -174,10 +175,10 @@ public class TestWALEntryStream {
*/
@Test
public void testAppendsWithRolls() throws Exception {
appendToLog();
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
@ -189,9 +190,9 @@ public class TestWALEntryStream {
oldPos = entryStream.getPosition();
}
appendToLog();
appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
log, null, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
@ -201,11 +202,11 @@ public class TestWALEntryStream {
}
// We rolled but we still should see the end of the first log and get that item
appendToLog();
appendToLogAndSync();
log.rollWriter();
appendToLog();
appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
log, null, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@ -231,7 +232,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@ -256,7 +257,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@ -279,7 +280,7 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@ -287,7 +288,7 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@ -302,16 +303,16 @@ public class TestWALEntryStream {
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLog(3);
appendEntriesToLogAndSync(3);
// read only one element
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
log, null, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@ -322,38 +323,44 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
reader.start();
return reader;
}
@Test
public void testReplicationSourceWALReaderThread() throws Exception {
appendEntriesToLog(3);
public void testReplicationSourceWALReader() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer= Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
// start up a reader
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
// should've batched up our entries
assertNotNull(entryBatch);
@ -363,11 +370,96 @@ public class TestWALEntryStream {
assertEquals(3, entryBatch.getNbRowKeys());
appendToLog("foo");
entryBatch = batcher.take();
entryBatch = reader.take();
assertEquals(1, entryBatch.getNbEntries());
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
}
@Test
public void testReplicationSourceWALReaderRecovered() throws Exception {
appendEntriesToLogAndSync(10);
Path walPath = walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(5);
log.shutdown();
Configuration conf = new Configuration(CONF);
conf.setInt("replication.source.nb.capacity", 10);
ReplicationSourceWALReader reader = createReader(true, conf);
WALEntryBatch batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(10, batch.getNbEntries());
assertFalse(batch.isEndOfFile());
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(0, batch.getNbEntries());
assertTrue(batch.isEndOfFile());
walPath = walQueue.peek();
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
// Actually this should be true but we haven't handled this yet since for a normal queue the
// last one is always open... Not a big deal for now.
assertFalse(batch.isEndOfFile());
assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
}
// Testcase for HBASE-20206
@Test
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
appendEntriesToLogAndSync(1);
Path walPath = walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(20);
TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return fs.getFileStatus(walPath).getLen() > 0;
}
@Override
public String explainFailure() throws Exception {
return walPath + " has not been closed yet";
}
});
long walLength = fs.getFileStatus(walPath).getLen();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
assertEquals(walPath, entryBatch.getLastWalPath());
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
walLength, entryBatch.getLastWalPosition() <= walLength);
assertEquals(1, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
Path walPath2 = walQueue.peek();
entryBatch = reader.take();
assertEquals(walPath2, entryBatch.getLastWalPath());
assertEquals(20, entryBatch.getNbEntries());
assertFalse(entryBatch.isEndOfFile());
log.rollWriter();
appendEntriesToLogAndSync(10);
entryBatch = reader.take();
assertEquals(walPath2, entryBatch.getLastWalPath());
assertEquals(0, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
Path walPath3 = walQueue.peek();
entryBatch = reader.take();
assertEquals(walPath3, entryBatch.getLastWalPath());
assertEquals(10, entryBatch.getNbEntries());
assertFalse(entryBatch.isEndOfFile());
}
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
@ -380,22 +472,28 @@ public class TestWALEntryStream {
log.sync(txid);
}
private void appendEntriesToLog(int count) throws IOException {
private void appendEntriesToLogAndSync(int count) throws IOException {
long txid = -1L;
for (int i = 0; i < count; i++) {
appendToLog();
txid = appendToLog(1);
}
}
private void appendToLog() throws IOException {
appendToLogPlus(1);
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
log.sync(txid);
}
private void appendToLogAndSync() throws IOException {
appendToLogAndSync(1);
}
private void appendToLogAndSync(int count) throws IOException {
long txid = appendToLog(count);
log.sync(txid);
}
private long appendToLog(int count) throws IOException {
return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
@ -439,7 +537,7 @@ public class TestWALEntryStream {
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());