HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

This commit is contained in:
zhangduo 2018-03-12 12:21:44 +08:00
parent d5aaeee88b
commit 6060d3ba56
7 changed files with 218 additions and 229 deletions

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource {
} }
@Override @Override
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
final RecoveredReplicationSourceShipper worker = PriorityBlockingQueue<Path> queue) {
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
this.queueStorage); }
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) { private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
} else { LOG.trace("Didn't read any new entries from WAL");
LOG.debug("Starting up worker for wal group " + walGroupId); // we're done with queue recovery, shut ourself down
worker.startup(this::uncaughtException); reader.setReaderRunning(false);
worker.setWALReader( // shuts down shipper thread immediately
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); entryBatchQueue.put(new WALEntryBatch(0, currentPath));
workerThreads.put(walGroupId, worker);
}
} }
@Override @Override
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) { PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = if (replicationPeer.getPeerConfig().isSerial()) {
new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
Threads.setDaemonThreadRunning(walReader, this) {
threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
this::uncaughtException); @Override
return walReader; protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
} else {
return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
}
};
}
} }
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
@ -166,21 +176,14 @@ public class RecoveredReplicationSource extends ReplicationSource {
return path; return path;
} }
public void tryFinish() { void tryFinish() {
// use synchronize to make sure one last thread will clean the queue // use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) { synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allTasksDone = true; boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
for (ReplicationSourceShipper worker : workerThreads.values()) {
if (!worker.isFinished()) {
allTasksDone = false;
break;
}
}
if (allTasksDone) { if (allTasksDone) {
manager.removeRecoveredSource(this); manager.removeRecoveredSource(this);
LOG.info("Finished recovering queue " + queueId + " with the following stats: " LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
+ getStats());
} }
} }
} }

View File

@ -48,46 +48,18 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
} }
@Override @Override
public void run() { protected void postShipEdits(WALEntryBatch entryBatch) {
setWorkerState(WorkerState.RUNNING); if (entryBatch.getWalEntries().isEmpty()) {
// Loop until we close down LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
while (isActive()) { + source.getQueueId());
int sleepMultiplier = 1; source.getSourceMetrics().incrCompletedRecoveryQueue();
// Sleep until replication is enabled again setWorkerState(WorkerState.FINISHED);
if (!source.isPeerEnabled()) {
if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
while (entryReader == null) {
if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
}
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
if (entryBatch.getWalEntries().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
} }
}
@Override
protected void postFinish() {
source.tryFinish(); source.tryFinish();
// If the worker exits run loop without finishing its task, mark it as stopped.
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
}
} }
@Override @Override

View File

@ -1,56 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
/**
* Used by a {@link RecoveredReplicationSourceShipper}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
private static final Logger LOG =
LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
super(fs, conf, logQueue, startPosition, filter, source);
}
@Override
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
}
}

View File

@ -84,7 +84,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
// per group queue size, keep no more than this number of logs in each wal group // per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup; protected int queueSizePerGroup;
protected ReplicationQueueStorage queueStorage; protected ReplicationQueueStorage queueStorage;
private ReplicationPeer replicationPeer; protected ReplicationPeer replicationPeer;
protected Configuration conf; protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo; protected ReplicationQueueInfo replicationQueueInfo;
@ -284,26 +284,32 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.walEntryFilter = new ChainWALEntryFilter(filters); this.walEntryFilter = new ChainWALEntryFilter(filters);
} }
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this); ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) { if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId);
} else { } else {
LOG.debug("Starting up worker for wal group " + walGroupId); LOG.debug("Starting up worker for wal group {}", walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
worker.setWALReader(walReader);
worker.startup(this::uncaughtException); worker.startup(this::uncaughtException);
worker.setWALReader(
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
} }
} }
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, protected ReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
}
protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) { PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = return replicationPeer.getPeerConfig().isSerial()
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::uncaughtException);
} }
protected final void uncaughtException(Thread t, Throwable e) { protected final void uncaughtException(Thread t, Throwable e) {
@ -382,10 +388,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return replicationPeer.isPeerEnabled(); return replicationPeer.isPeerEnabled();
} }
public boolean isSerial() {
return replicationPeer.getPeerConfig().isSerial();
}
private void initialize() { private void initialize() {
int sleepMultiplier = 1; int sleepMultiplier = 1;
while (this.isSourceActive()) { while (this.isSourceActive()) {

View File

@ -83,7 +83,7 @@ public class ReplicationSourceShipper extends Thread {
} }
@Override @Override
public void run() { public final void run() {
setWorkerState(WorkerState.RUNNING); setWorkerState(WorkerState.RUNNING);
// Loop until we close down // Loop until we close down
while (isActive()) { while (isActive()) {
@ -95,28 +95,31 @@ public class ReplicationSourceShipper extends Thread {
} }
continue; continue;
} }
while (entryReader == null) {
if (sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
}
try { try {
WALEntryBatch entryBatch = entryReader.take(); WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch); shipEdits(entryBatch);
postShipEdits(entryBatch);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e); LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
// If the worker exits run loop without finishing its task, mark it as stopped. // If the worker exits run loop without finishing its task, mark it as stopped.
if (state != WorkerState.FINISHED) { if (!isFinished()) {
setWorkerState(WorkerState.STOPPED); setWorkerState(WorkerState.STOPPED);
} else {
postFinish();
} }
} }
// To be implemented by recovered shipper
protected void postShipEdits(WALEntryBatch entryBatch) {
}
// To be implemented by recovered shipper
protected void postFinish() {
}
/** /**
* Do the shipping logic * Do the shipping logic
*/ */
@ -229,8 +232,8 @@ public class ReplicationSourceShipper extends Thread {
public void startup(UncaughtExceptionHandler handler) { public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName(); String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," Threads.setDaemonThreadRunning(this,
+ source.getQueueId(), handler); name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
} }
public PriorityBlockingQueue<Path> getLogQueue() { public PriorityBlockingQueue<Path> getLogQueue() {

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -51,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ReplicationSourceWALReader extends Thread { class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
private final PriorityBlockingQueue<Path> logQueue; private final PriorityBlockingQueue<Path> logQueue;
@ -64,28 +63,19 @@ public class ReplicationSourceWALReader extends Thread {
// max (heap) size of each batch - multiply by number of batches in queue to get total // max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity; private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total // max count of each batch - multiply by number of batches in queue to get total
protected final int replicationBatchCountCapacity; private final int replicationBatchCountCapacity;
// position in the WAL to start reading at // position in the WAL to start reading at
private long currentPosition; private long currentPosition;
private final long sleepForRetries; private final long sleepForRetries;
private final int maxRetriesMultiplier; private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery; private final boolean eofAutoRecovery;
// used to store the first cell in an entry before filtering. This is because that if serial
// replication is enabled, we may find out that an entry can not be pushed after filtering. And
// when we try the next time, the cells maybe null since the entry has already been filtered,
// especially for region event wal entries. And this can also used to determine whether we can
// skip filtering.
private Cell firstCellInEntryBeforeFiltering;
//Indicates whether this particular worker is running //Indicates whether this particular worker is running
private boolean isReaderRunning = true; private boolean isReaderRunning = true;
private AtomicLong totalBufferUsed; private AtomicLong totalBufferUsed;
private long totalBufferQuota; private long totalBufferQuota;
private final SerialReplicationChecker serialReplicationChecker;
/** /**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue. * entries, and puts them on a batch queue.
@ -120,7 +110,6 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
LOG.info("peerClusterZnode=" + source.getQueueId() LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@ -169,75 +158,35 @@ public class ReplicationSourceWALReader extends Thread {
} }
} }
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
throws IOException { protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
entryStream.next(); WALEdit edit = entry.getEdit();
firstCellInEntryBeforeFiltering = null; if (edit == null || edit.isEmpty()) {
batch.setLastWalPosition(entryStream.getPosition()); return false;
}
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
batch.getNbEntries() >= replicationBatchCountCapacity;
} }
private WALEntryBatch readWALEntries(WALEntryStream entryStream) protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (!entryStream.hasNext()) { if (!entryStream.hasNext()) {
return null; return null;
} }
long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream);
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do { do {
Entry entry = entryStream.peek(); Entry entry = entryStream.next();
boolean isSerial = source.isSerial(); batch.setLastWalPosition(entryStream.getPosition());
boolean doFiltering = true; entry = filterEntry(entry);
if (isSerial) {
if (firstCellInEntryBeforeFiltering == null) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the
// row key for the edit. And we need to do this before filtering since all the cells may
// be filtered out, especially that for the markers.
firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
} else {
// if this is not null then we know that the entry has already been filtered.
doFiltering = false;
}
}
if (doFiltering) {
entry = filterEntry(entry);
}
if (entry != null) { if (entry != null) {
if (isSerial) { if (addEntryToBatch(batch, entry)) {
if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) { break;
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
} }
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
} else {
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
} }
} while (entryStream.hasNext()); } while (entryStream.hasNext());
return batch; return batch;
@ -286,7 +235,11 @@ public class ReplicationSourceWALReader extends Thread {
return true; return true;
} }
private Entry filterEntry(Entry entry) { protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
protected final Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry); Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) { if (entry != null && filtered == null) {
source.getSourceMetrics().incrLogEditsFiltered(); source.getSourceMetrics().incrLogEditsFiltered();

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/**
* WAL reader for a serial replication peer.
*/
@InterfaceAudience.Private
public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
// used to store the first cell in an entry before filtering. This is because that if serial
// replication is enabled, we may find out that an entry can not be pushed after filtering. And
// when we try the next time, the cells maybe null since the entry has already been filtered,
// especially for region event wal entries. And this can also used to determine whether we can
// skip filtering.
private Cell firstCellInEntryBeforeFiltering;
private final SerialReplicationChecker checker;
public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
super(fs, conf, logQueue, startPosition, filter, source);
checker = new SerialReplicationChecker(conf, source);
}
@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
if (!entryStream.hasNext()) {
return null;
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
do {
Entry entry = entryStream.peek();
boolean doFiltering = true;
if (firstCellInEntryBeforeFiltering == null) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the
// row key for the edit. And we need to do this before filtering since all the cells may
// be filtered out, especially that for the markers.
firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
} else {
// if this is not null then we know that the entry has already been filtered.
doFiltering = false;
}
if (doFiltering) {
entry = filterEntry(entry);
}
if (entry != null) {
if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
if (addEntryToBatch(batch, entry)) {
break;
}
} else {
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
}
} while (entryStream.hasNext());
return batch;
}
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException {
entryStream.next();
firstCellInEntryBeforeFiltering = null;
batch.setLastWalPosition(entryStream.getPosition());
}
}