HBASE-23205 Correctly update the position of WALs currently being replicated (#944)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
1451063115
commit
e0c419d76d
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
|
@ -439,14 +440,30 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
|
||||
@Override
|
||||
@VisibleForTesting
|
||||
public Path getCurrentPath() {
|
||||
// only for testing
|
||||
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
|
||||
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Path getLastLoggedPath() {
|
||||
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
|
||||
return worker.getLastLoggedPath();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getLastLoggedPosition() {
|
||||
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
|
||||
return worker.getLastLoggedPosition();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private boolean isSourceActive() {
|
||||
return !this.stopper.isStopped() && this.sourceRunning;
|
||||
}
|
||||
|
@ -481,8 +498,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
|
||||
String walGroupId = entry.getKey();
|
||||
ReplicationSourceShipperThread worker = entry.getValue();
|
||||
long position = worker.getCurrentPosition();
|
||||
Path currentPath = worker.getCurrentPath();
|
||||
long position = worker.getLastLoggedPosition();
|
||||
Path currentPath = worker.getLastLoggedPath();
|
||||
sb.append("walGroup [").append(walGroupId).append("]: ");
|
||||
if (currentPath != null) {
|
||||
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
|
||||
|
@ -517,7 +534,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
int queueSize = queues.get(walGroupId).size();
|
||||
replicationDelay =
|
||||
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
|
||||
Path currentPath = worker.getCurrentPath();
|
||||
Path currentPath = worker.getLastLoggedPath();
|
||||
fileSize = -1;
|
||||
if (currentPath != null) {
|
||||
try {
|
||||
|
@ -535,7 +552,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
.withQueueSize(queueSize)
|
||||
.withWalGroup(walGroupId)
|
||||
.withCurrentPath(currentPath)
|
||||
.withCurrentPosition(worker.getCurrentPosition())
|
||||
.withCurrentPosition(worker.getLastLoggedPosition())
|
||||
.withFileSize(fileSize)
|
||||
.withAgeOfLastShippedOp(ageOfLastShippedOp)
|
||||
.withReplicationDelay(replicationDelay);
|
||||
|
@ -555,7 +572,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
// Last position in the log that we sent to ZooKeeper
|
||||
private long lastLoggedPosition = -1;
|
||||
// Path of the current log
|
||||
private volatile Path currentPath;
|
||||
private volatile Path lastLoggedPath;
|
||||
// Current state of the worker thread
|
||||
private WorkerState state;
|
||||
ReplicationSourceWALReaderThread entryReader;
|
||||
|
@ -600,13 +617,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
WALEntryBatch entryBatch = entryReader.take();
|
||||
shipEdits(entryBatch);
|
||||
releaseBufferQuota((int) entryBatch.getHeapSize());
|
||||
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
|
||||
&& entryBatch.getLastSeqIds().isEmpty()) {
|
||||
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
|
||||
+ peerClusterZnode);
|
||||
if (!entryBatch.hasMoreEntries()) {
|
||||
LOG.debug("Finished recovering queue for group "
|
||||
+ walGroupId + " of peer " + peerClusterZnode);
|
||||
metrics.incrCompletedRecoveryQueue();
|
||||
setWorkerState(WorkerState.FINISHED);
|
||||
continue;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while waiting for next replication entry batch", e);
|
||||
|
@ -614,7 +629,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
}
|
||||
|
||||
if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
|
||||
if (getWorkerState() == WorkerState.FINISHED) {
|
||||
// use synchronize to make sure one last thread will clean the queue
|
||||
synchronized (this) {
|
||||
Threads.sleep(100);// wait a short while for other worker thread to fully exit
|
||||
|
@ -694,15 +709,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
protected void shipEdits(WALEntryBatch entryBatch) {
|
||||
List<Entry> entries = entryBatch.getWalEntries();
|
||||
long lastReadPosition = entryBatch.getLastWalPosition();
|
||||
currentPath = entryBatch.getLastWalPath();
|
||||
lastLoggedPath = entryBatch.getLastWalPath();
|
||||
int sleepMultiplier = 0;
|
||||
if (entries.isEmpty()) {
|
||||
if (lastLoggedPosition != lastReadPosition) {
|
||||
updateLogPosition(lastReadPosition);
|
||||
// if there was nothing to ship and it's not an error
|
||||
// set "ageOfLastShippedOp" to <now> to indicate that we're current
|
||||
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
|
||||
}
|
||||
updateLogPosition(lastReadPosition);
|
||||
// if there was nothing to ship and it's not an error
|
||||
// set "ageOfLastShippedOp" to <now> to indicate that we're current
|
||||
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
|
||||
return;
|
||||
}
|
||||
int currentSize = (int) entryBatch.getHeapSize();
|
||||
|
@ -787,8 +800,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
|
||||
private void updateLogPosition(long lastReadPosition) {
|
||||
manager.setPendingShipment(false);
|
||||
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
|
||||
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
|
||||
this.replicationQueueInfo.isQueueRecovered(), false);
|
||||
lastLoggedPosition = lastReadPosition;
|
||||
}
|
||||
|
@ -800,7 +812,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
public void uncaughtException(final Thread t, final Throwable e) {
|
||||
RSRpcServices.exitIfOOME(e);
|
||||
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
|
||||
+ getCurrentPath(), e);
|
||||
+ getLastLoggedPath(), e);
|
||||
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
|
||||
}
|
||||
};
|
||||
|
@ -941,8 +953,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
return this.entryReader.getCurrentPath();
|
||||
}
|
||||
|
||||
public long getCurrentPosition() {
|
||||
return this.lastLoggedPosition;
|
||||
public Path getLastLoggedPath() {
|
||||
return lastLoggedPath;
|
||||
}
|
||||
|
||||
public long getLastLoggedPosition() {
|
||||
return lastLoggedPosition;
|
||||
}
|
||||
|
||||
private boolean isWorkerActive() {
|
||||
|
|
|
@ -123,8 +123,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
private AtomicLong totalBufferUsed = new AtomicLong();
|
||||
|
||||
private boolean pendingShipment;
|
||||
|
||||
/**
|
||||
* Creates a replication manager and sets the watch on all the other registered region servers
|
||||
* @param replicationQueues the interface for manipulating replication queues
|
||||
|
@ -191,19 +189,13 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param holdLogInZK if true then the log is retained in ZK
|
||||
*/
|
||||
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
|
||||
boolean queueRecovered, boolean holdLogInZK) {
|
||||
if (!this.pendingShipment) {
|
||||
String fileName = log.getName();
|
||||
this.replicationQueues.setLogPosition(id, fileName, position);
|
||||
if (holdLogInZK) {
|
||||
return;
|
||||
}
|
||||
cleanOldLogs(fileName, id, queueRecovered);
|
||||
boolean queueRecovered, boolean holdLogInZK) {
|
||||
String fileName = log.getName();
|
||||
this.replicationQueues.setLogPosition(id, fileName, position);
|
||||
if (holdLogInZK) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void setPendingShipment(boolean pendingShipment) {
|
||||
this.pendingShipment = pendingShipment;
|
||||
cleanOldLogs(fileName, id, queueRecovered);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
@ -69,7 +67,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
// max count of each batch - multiply by number of batches in queue to get total
|
||||
private int replicationBatchCountCapacity;
|
||||
// position in the WAL to start reading at
|
||||
private long currentPosition;
|
||||
private long lastReadPosition;
|
||||
private Path lastReadPath;
|
||||
private WALEntryFilter filter;
|
||||
private long sleepForRetries;
|
||||
//Indicates whether this particular worker is running
|
||||
|
@ -81,8 +80,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
private AtomicLong totalBufferUsed;
|
||||
private long totalBufferQuota;
|
||||
|
||||
private ReplicationSourceManager replicationSourceManager;
|
||||
|
||||
/**
|
||||
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
|
||||
* entries, and puts them on a batch queue.
|
||||
|
@ -101,7 +98,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
|
||||
this.replicationQueueInfo = replicationQueueInfo;
|
||||
this.logQueue = logQueue;
|
||||
this.currentPosition = startPosition;
|
||||
this.lastReadPath = logQueue.peek();
|
||||
this.lastReadPosition = startPosition;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.filter = filter;
|
||||
|
@ -111,7 +109,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
// memory used will be batchSizeCapacity * (nb.batches + 1)
|
||||
// the +1 is for the current thread reading before placing onto the queue
|
||||
int batchCount = conf.getInt("replication.source.nb.batches", 1);
|
||||
this.replicationSourceManager = manager;
|
||||
this.totalBufferUsed = manager.getTotalBufferUsed();
|
||||
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
|
||||
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
||||
|
@ -133,61 +130,45 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
|
||||
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
WALEntryBatch batch = null;
|
||||
while (entryStream.hasNext()) {
|
||||
if (batch == null) {
|
||||
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
|
||||
}
|
||||
WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
|
||||
boolean hasNext;
|
||||
while ((hasNext = entryStream.hasNext()) == true) {
|
||||
Entry entry = entryStream.next();
|
||||
entry = filterEntry(entry);
|
||||
if (entry != null) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry);
|
||||
replicationSourceManager.setPendingShipment(true);
|
||||
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||
// Stop if too many entries or too big
|
||||
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|
||||
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
|
||||
this.replicationQueueInfo.getPeerClusterZnode(),
|
||||
entryStream.getPosition(),
|
||||
this.replicationQueueInfo.isQueueRecovered(), false);
|
||||
}
|
||||
}
|
||||
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(String.format("Read %s WAL entries eligible for replication",
|
||||
batch.getNbEntries()));
|
||||
}
|
||||
entryBatchQueue.put(batch);
|
||||
|
||||
updateBatch(entryStream, batch, hasNext);
|
||||
if (isShippable(batch)) {
|
||||
sleepMultiplier = 1;
|
||||
} else { // got no entries and didn't advance position in WAL
|
||||
LOG.trace("Didn't read any new entries from WAL");
|
||||
if (replicationQueueInfo.isQueueRecovered()) {
|
||||
// we're done with queue recovery, shut ourself down
|
||||
entryBatchQueue.put(batch);
|
||||
if (!batch.hasMoreEntries()) {
|
||||
// we're done with queue recovery, shut ourselves down
|
||||
setReaderRunning(false);
|
||||
// shuts down shipper thread immediately
|
||||
entryBatchQueue.put(batch != null ? batch
|
||||
: new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
|
||||
} else {
|
||||
Thread.sleep(sleepForRetries);
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(sleepForRetries);
|
||||
}
|
||||
currentPosition = entryStream.getPosition();
|
||||
entryStream.reset(); // reuse stream
|
||||
resetStream(entryStream);
|
||||
}
|
||||
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
|
@ -205,6 +186,38 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
|
||||
logMessage(batch);
|
||||
batch.updatePosition(entryStream);
|
||||
batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
|
||||
}
|
||||
|
||||
private void logMessage(WALEntryBatch batch) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
if (batch.isEmpty()) {
|
||||
LOG.trace("Didn't read any new entries from WAL");
|
||||
} else {
|
||||
LOG.trace(String.format("Read %s WAL entries eligible for replication",
|
||||
batch.getNbEntries()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isShippable(WALEntryBatch batch) {
|
||||
return !batch.isEmpty() || checkIfWALRolled(batch) || !batch.hasMoreEntries();
|
||||
}
|
||||
|
||||
private boolean checkIfWALRolled(WALEntryBatch batch) {
|
||||
return lastReadPath == null && batch.lastWalPath != null
|
||||
|| lastReadPath != null && !lastReadPath.equals(batch.lastWalPath);
|
||||
}
|
||||
|
||||
private void resetStream(WALEntryStream stream) throws IOException {
|
||||
lastReadPosition = stream.getPosition();
|
||||
lastReadPath = stream.getCurrentPath();
|
||||
stream.reset(); // reuse stream
|
||||
}
|
||||
|
||||
// if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
|
@ -214,8 +227,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
logQueue.remove();
|
||||
currentPosition = 0;
|
||||
lastReadPath = logQueue.remove();
|
||||
lastReadPosition = 0;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + logQueue.peek());
|
||||
|
@ -224,12 +237,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
// if we've read some WAL entries, get the Path we read from
|
||||
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
|
||||
if (batchQueueHead != null) {
|
||||
return batchQueueHead.lastWalPath;
|
||||
}
|
||||
// otherwise, we must be currently reading from the head of the log queue
|
||||
return logQueue.peek();
|
||||
}
|
||||
|
||||
|
@ -380,6 +387,10 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
this.isReaderRunning = readerRunning;
|
||||
}
|
||||
|
||||
public long getLastReadPosition() {
|
||||
return this.lastReadPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds a batch of WAL entries to replicate, along with some statistics
|
||||
*
|
||||
|
@ -396,17 +407,14 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
private int nbHFiles = 0;
|
||||
// heap size of data we need to replicate
|
||||
private long heapSize = 0;
|
||||
// save the last sequenceid for each region if the table has serial-replication scope
|
||||
private Map<String, Long> lastSeqIds = new HashMap<>();
|
||||
// whether more entries to read exist in WALs or not
|
||||
private boolean moreEntries = true;
|
||||
|
||||
/**
|
||||
* @param walEntries
|
||||
* @param lastWalPath Path of the WAL the last entry in this batch was read from
|
||||
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
|
||||
* @param maxNbEntries the number of entries a batch can have
|
||||
*/
|
||||
private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
|
||||
private WALEntryBatch(int maxNbEntries) {
|
||||
this.walEntries = new ArrayList<>(maxNbEntries);
|
||||
this.lastWalPath = lastWalPath;
|
||||
}
|
||||
|
||||
public void addEntry(Entry entry) {
|
||||
|
@ -466,13 +474,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
return heapSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the last sequenceid for each region if the table has serial-replication scope
|
||||
*/
|
||||
public Map<String, Long> getLastSeqIds() {
|
||||
return lastSeqIds;
|
||||
}
|
||||
|
||||
private void incrementNbRowKeys(int increment) {
|
||||
nbRowKeys += increment;
|
||||
}
|
||||
|
@ -484,5 +485,22 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
private void incrementHeapSize(long increment) {
|
||||
heapSize += increment;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return walEntries.isEmpty();
|
||||
}
|
||||
|
||||
public void updatePosition(WALEntryStream entryStream) {
|
||||
lastWalPath = entryStream.getCurrentPath();
|
||||
lastWalPosition = entryStream.getPosition();
|
||||
}
|
||||
|
||||
public boolean hasMoreEntries() {
|
||||
return moreEntries;
|
||||
}
|
||||
|
||||
public void setMoreEntries(boolean moreEntries) {
|
||||
this.moreEntries = moreEntries;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,13 +16,30 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -34,36 +51,38 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestReplicationSource {
|
||||
|
||||
|
@ -92,6 +111,32 @@ public class TestReplicationSource {
|
|||
if (FS.exists(logDir)) FS.delete(logDir, true);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
if (!FS.exists(logDir)) {
|
||||
FS.mkdirs(logDir);
|
||||
}
|
||||
if (!FS.exists(oldLogDir)) {
|
||||
FS.mkdirs(oldLogDir);
|
||||
}
|
||||
|
||||
ReplicationEndpointForTest.contructedCount.set(0);
|
||||
ReplicationEndpointForTest.startedCount.set(0);
|
||||
ReplicationEndpointForTest.replicateCount.set(0);
|
||||
ReplicationEndpointForTest.stoppedCount.set(0);
|
||||
ReplicationEndpointForTest.lastEntries = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (FS.exists(oldLogDir)) {
|
||||
FS.delete(oldLogDir, true);
|
||||
}
|
||||
if (FS.exists(logDir)) {
|
||||
FS.delete(logDir, true);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL_PEER.shutdownMiniHBaseCluster();
|
||||
|
@ -108,8 +153,6 @@ public class TestReplicationSource {
|
|||
@Test
|
||||
public void testLogMoving() throws Exception{
|
||||
Path logPath = new Path(logDir, "log");
|
||||
if (!FS.exists(logDir)) FS.mkdirs(logDir);
|
||||
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
|
||||
WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
|
||||
TEST_UTIL.getConfiguration());
|
||||
for(int i = 0; i < 3; i++) {
|
||||
|
@ -166,7 +209,6 @@ public class TestReplicationSource {
|
|||
Configuration testConf = HBaseConfiguration.create();
|
||||
testConf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||
source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
|
||||
null, replicationEndpoint, null);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
@ -189,10 +231,184 @@ public class TestReplicationSource {
|
|||
|
||||
}
|
||||
|
||||
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
byte[] b = Bytes.toBytes(Integer.toString(i));
|
||||
KeyValue kv = new KeyValue(b,b,b);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(kv);
|
||||
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
|
||||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
key.setScopes(scopes);
|
||||
writer.append(new WAL.Entry(key, edit));
|
||||
writer.sync(false);
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOException {
|
||||
WAL.Reader reader = wals.createReader(FS, log2);
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
reader.next();
|
||||
}
|
||||
return reader.getPosition();
|
||||
}
|
||||
|
||||
private static final class Mocks {
|
||||
private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
|
||||
private final ReplicationQueues queues = mock(ReplicationQueues.class);
|
||||
private final ReplicationPeers peers = mock(ReplicationPeers.class);
|
||||
private final MetricsSource metrics = mock(MetricsSource.class);
|
||||
private final ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||
private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
|
||||
|
||||
private Mocks() {
|
||||
when(peers.getStatusOfPeer(anyString())).thenReturn(true);
|
||||
when(context.getReplicationPeer()).thenReturn(peer);
|
||||
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
}
|
||||
|
||||
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
|
||||
throws IOException {
|
||||
final ReplicationSource source = new ReplicationSource();
|
||||
endpoint.init(context);
|
||||
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
|
||||
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
|
||||
return source;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
|
||||
final int numWALEntries = 5;
|
||||
conf.setInt("replication.source.nb.capacity", numWALEntries);
|
||||
|
||||
Mocks mocks = new Mocks();
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override
|
||||
public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
final Path log2 = new Path(logDir, "log.2");
|
||||
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
|
||||
|
||||
appendEntries(writer1, 3);
|
||||
appendEntries(writer2, 2);
|
||||
|
||||
long pos = getPosition(wals, log2, 2);
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
source.run();
|
||||
|
||||
source.enqueueLog(log1);
|
||||
// log rolled
|
||||
source.enqueueLog(log2);
|
||||
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
return endpoint.replicateCount.get() > 0;
|
||||
}
|
||||
});
|
||||
|
||||
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mocks.manager, times(1))
|
||||
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
|
||||
anyBoolean(), anyBoolean());
|
||||
assertTrue(endpoint.lastEntries.size() == 5);
|
||||
assertThat(pathCaptor.getValue(), is(log2));
|
||||
assertThat(positionCaptor.getValue(), is(pos));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
|
||||
Mocks mocks = new Mocks();
|
||||
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
final Path log2 = new Path(logDir, "log.2");
|
||||
|
||||
WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()).close();
|
||||
WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()).close();
|
||||
final long startPos = getPosition(wals, log2, 0);
|
||||
|
||||
source.run();
|
||||
source.enqueueLog(log1);
|
||||
source.enqueueLog(log2);
|
||||
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
return log2.equals(source.getLastLoggedPath())
|
||||
&& source.getLastLoggedPosition() >= startPos;
|
||||
}
|
||||
});
|
||||
|
||||
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
|
||||
verify(mocks.manager, times(1))
|
||||
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
|
||||
anyBoolean(), anyBoolean());
|
||||
assertThat(pathCaptor.getValue(), is(log2));
|
||||
assertThat(positionCaptor.getValue(), is(startPos));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exception {
|
||||
Mocks mocks = new Mocks();
|
||||
// set table cfs to filter all cells out
|
||||
final TableName replicatedTable = TableName.valueOf("replicated_table");
|
||||
final Map<TableName, List<String>> cfs =
|
||||
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
|
||||
when(mocks.peer.getTableCFs()).thenReturn(cfs);
|
||||
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
final Path log2 = new Path(logDir, "log.2");
|
||||
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
|
||||
|
||||
appendEntries(writer1, 3);
|
||||
appendEntries(writer2, 2);
|
||||
final long pos = getPosition(wals, log2, 2);
|
||||
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
source.enqueueLog(log1);
|
||||
source.enqueueLog(log2);
|
||||
source.run();
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
// wait until reader read all cells
|
||||
return log2.equals(source.getLastLoggedPath()) && source.getLastLoggedPosition() >= pos;
|
||||
}
|
||||
});
|
||||
|
||||
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
|
||||
// all old wals should be removed by updating wal position, even if all cells are filtered out.
|
||||
verify(mocks.manager, times(1))
|
||||
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
|
||||
anyBoolean(), anyBoolean());
|
||||
assertThat(pathCaptor.getValue(), is(log2));
|
||||
assertThat(positionCaptor.getValue(), is(pos));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that recovered queues are preserved on a regionserver shutdown.
|
||||
* See HBASE-18192
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testServerShutdownRecoveredQueue() throws Exception {
|
||||
|
|
|
@ -23,17 +23,15 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -47,17 +45,22 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -77,7 +80,6 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
|
@ -358,8 +360,9 @@ public class TestWALEntryStream {
|
|||
// start up a batcher
|
||||
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
|
||||
fs, conf, getDummyFilter(), new MetricsSource("1"));
|
||||
ReplicationSourceWALReaderThread batcher =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
|
||||
fs, conf, getDummyFilter(), new MetricsSource("1"));
|
||||
Path walPath = walQueue.peek();
|
||||
batcher.start();
|
||||
WALEntryBatch entryBatch = batcher.take();
|
||||
|
@ -378,37 +381,36 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception {
|
||||
public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception {
|
||||
appendEntriesToLog(3);
|
||||
// get ending position
|
||||
log.rollWriter();
|
||||
appendEntriesToLog(2);
|
||||
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue),
|
||||
fs, conf, new MetricsSource("1"))) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
position = entryStream.getPosition();
|
||||
}
|
||||
// start up a readerThread with a WALEntryFilter that always filter the entries
|
||||
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
||||
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread(
|
||||
mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() {
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
return null;
|
||||
}
|
||||
}, new MetricsSource("1"));
|
||||
readerThread.start();
|
||||
Thread.sleep(100);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mockSourceManager, times(3))
|
||||
.logPositionAndCleanOldLogs(any(Path.class),
|
||||
anyString(),
|
||||
positionCaptor.capture(),
|
||||
anyBoolean(),
|
||||
anyBoolean());
|
||||
assertEquals(position, positionCaptor.getValue().longValue());
|
||||
ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
|
||||
Path walPath = walQueue.toArray(new Path[2])[1];
|
||||
reader.start();
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
assertNotNull(entryBatch);
|
||||
assertEquals(5, entryBatch.getWalEntries().size());
|
||||
assertEquals(position, entryBatch.getLastWalPosition());
|
||||
assertEquals(walPath, entryBatch.getLastWalPath());
|
||||
assertFalse(entryBatch.hasMoreEntries());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -436,6 +438,96 @@ public class TestWALEntryStream {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationSourceWALReaderThreadWithFilter() throws Exception {
|
||||
final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
|
||||
final Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
|
||||
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
|
||||
// add filterable entries
|
||||
appendToLogPlus(3, notReplicatedCf);
|
||||
appendToLogPlus(3, notReplicatedCf);
|
||||
appendToLogPlus(3, notReplicatedCf);
|
||||
|
||||
// add non filterable entries
|
||||
appendEntriesToLog(2);
|
||||
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
final ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"));
|
||||
reader.start();
|
||||
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
assertNotNull(entryBatch);
|
||||
assertFalse(entryBatch.isEmpty());
|
||||
List<Entry> walEntries = entryBatch.getWalEntries();
|
||||
assertEquals(2, walEntries.size());
|
||||
for (Entry entry : walEntries) {
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
assertTrue(cells.size() == 1);
|
||||
assertTrue(CellUtil.matchingFamily(cells.get(0), family));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() throws Exception {
|
||||
final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
|
||||
final Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
|
||||
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
|
||||
appendToLogPlus(3, notReplicatedCf);
|
||||
|
||||
Path firstWAL = walQueue.peek();
|
||||
final long eof = getPosition(firstWAL);
|
||||
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
final ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
|
||||
0, fs, conf, filter, new MetricsSource("1"));
|
||||
reader.start();
|
||||
|
||||
// reader won't put any batch, even if EOF reached.
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return reader.getLastReadPosition() >= eof;
|
||||
}
|
||||
});
|
||||
assertNull(reader.poll(0));
|
||||
|
||||
log.rollWriter();
|
||||
|
||||
// should get empty batch with current wal position, after wal rolled
|
||||
WALEntryBatch entryBatch = reader.take();
|
||||
|
||||
Path lastWAL= walQueue.peek();
|
||||
long positionToBeLogged = getPosition(lastWAL);
|
||||
|
||||
assertNotNull(entryBatch);
|
||||
assertTrue(entryBatch.isEmpty());
|
||||
assertEquals(1, walQueue.size());
|
||||
assertNotEquals(firstWAL, entryBatch.getLastWalPath());
|
||||
assertEquals(lastWAL, entryBatch.getLastWalPath());
|
||||
assertEquals(positionToBeLogged, entryBatch.getLastWalPosition());
|
||||
}
|
||||
|
||||
private long getPosition(Path walPath) throws IOException {
|
||||
WALEntryStream entryStream =
|
||||
new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)),
|
||||
fs, conf, new MetricsSource("1"));
|
||||
entryStream.hasNext();
|
||||
return entryStream.getPosition();
|
||||
}
|
||||
|
||||
private String getRow(WAL.Entry entry) {
|
||||
Cell cell = entry.getEdit().getCells().get(0);
|
||||
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
|
@ -459,17 +551,25 @@ public class TestWALEntryStream {
|
|||
}
|
||||
|
||||
private void appendToLogPlus(int count) throws IOException {
|
||||
appendToLogPlus(count, family, qualifier);
|
||||
}
|
||||
|
||||
private void appendToLogPlus(int count, byte[] cf) throws IOException {
|
||||
appendToLogPlus(count, cf, qualifier);
|
||||
}
|
||||
|
||||
private void appendToLogPlus(int count, byte[] cf, byte[] cq) throws IOException {
|
||||
final long txid = log.append(htd, info,
|
||||
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
|
||||
getWALEdits(count), true);
|
||||
getWALEdits(count, cf, cq), true);
|
||||
log.sync(txid);
|
||||
}
|
||||
|
||||
private WALEdit getWALEdits(int count) {
|
||||
private WALEdit getWALEdits(int count, byte[] cf, byte[] cq) {
|
||||
WALEdit edit = new WALEdit();
|
||||
for (int i = 0; i < count; i++) {
|
||||
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
|
||||
System.currentTimeMillis(), qualifier));
|
||||
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), cf, cq,
|
||||
System.currentTimeMillis(), cq));
|
||||
}
|
||||
return edit;
|
||||
}
|
||||
|
@ -491,8 +591,16 @@ public class TestWALEntryStream {
|
|||
};
|
||||
}
|
||||
|
||||
private ReplicationQueueInfo getRecoveredQueueInfo() {
|
||||
return getQueueInfo("1-1");
|
||||
}
|
||||
|
||||
private ReplicationQueueInfo getQueueInfo() {
|
||||
return new ReplicationQueueInfo("1");
|
||||
return getQueueInfo("1");
|
||||
}
|
||||
|
||||
private ReplicationQueueInfo getQueueInfo(String znode) {
|
||||
return new ReplicationQueueInfo(znode);
|
||||
}
|
||||
|
||||
class PathWatcher extends WALActionsListener.Base {
|
||||
|
@ -505,5 +613,4 @@ public class TestWALEntryStream {
|
|||
currentPath = newPath;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue