HBASE-18170 Refactor ReplicationSourceWALReaderThread

This commit is contained in:
Guanghao Zhang 2017-06-17 00:45:52 +08:00
parent d49208b056
commit c6e71f159c
7 changed files with 190 additions and 120 deletions

View File

@ -61,22 +61,33 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
@Override
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
final RecoveredReplicationSourceShipperThread worker =
new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final RecoveredReplicationSourceShipper worker =
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
this.replicationQueues);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(
startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
@Override
protected ReplicationSourceWALReader startNewWALReader(String threadName,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler());
return walReader;
}
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
@ -161,7 +172,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allTasksDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
for (ReplicationSourceShipper worker : workerThreads.values()) {
if (!worker.isFinished()) {
allTasksDone = false;
break;

View File

@ -28,20 +28,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
/**
* Used by a {@link RecoveredReplicationSource}.
*/
@InterfaceAudience.Private
public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class);
protected final RecoveredReplicationSource source;
private final ReplicationQueues replicationQueues;
public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationQueues replicationQueues) {
super(conf, walGroupId, queue, source);

View File

@ -0,0 +1,55 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
/**
* Used by a {@link RecoveredReplicationSourceShipper}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceWALReader.class);
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
super(fs, conf, logQueue, startPosition, filter, source);
}
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, currentPath));
}
}

View File

@ -119,12 +119,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
private WALEntryFilter walEntryFilter;
protected WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed;
@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipperThread(logPrefix, queue);
tryStartNewShipper(logPrefix, queue);
}
}
queue.put(log);
@ -255,15 +255,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
throw new RuntimeException(ex);
}
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
this.walEntryFilter = new ChainWALEntryFilter(filters);
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
@ -285,40 +276,50 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipperThread(walGroupId, queue);
tryStartNewShipper(walGroupId, queue);
}
}
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
private void initializeWALEntryFilter() {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
this.walEntryFilter = new ChainWALEntryFilter(filters);
}
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
walGroupId, queue, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader =
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler());
return walReader;
}
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
@ -446,8 +447,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
for (ReplicationSourceShipperThread worker : workers) {
Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
worker.entryReader.interrupt();
worker.interrupt();
@ -457,7 +458,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
future = this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceShipperThread worker : workers) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
@ -486,7 +487,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
for (ReplicationSourceShipper worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
@ -524,9 +525,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
ReplicationSourceShipper worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
@ -51,8 +51,8 @@ import com.google.common.cache.LoadingCache;
* ReplicationSourceWALReaderThread
*/
@InterfaceAudience.Private
public class ReplicationSourceShipperThread extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
public class ReplicationSourceShipper extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipper.class);
// Hold the state of a replication worker thread
public enum WorkerState {
@ -72,7 +72,7 @@ public class ReplicationSourceShipperThread extends Thread {
protected volatile Path currentPath;
// Current state of the worker thread
private WorkerState state;
protected ReplicationSourceWALReaderThread entryReader;
protected ReplicationSourceWALReader entryReader;
// How long should we sleep for each retry
protected final long sleepForRetries;
@ -90,7 +90,7 @@ public class ReplicationSourceShipperThread extends Thread {
}
);
public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
this.conf = conf;
this.walGroupId = walGroupId;
@ -310,7 +310,7 @@ public class ReplicationSourceShipperThread extends Thread {
return this.lastLoggedPosition;
}
public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
public void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}

View File

@ -58,26 +58,28 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReplicationSourceWALReaderThread extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
public class ReplicationSourceWALReader extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReader.class);
private PriorityBlockingQueue<Path> logQueue;
private FileSystem fs;
private Configuration conf;
private BlockingQueue<WALEntryBatch> entryBatchQueue;
private final PriorityBlockingQueue<Path> logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALEntryFilter filter;
private final ReplicationSource source;
protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private long replicationBatchSizeCapacity;
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
private int replicationBatchCountCapacity;
protected final int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private WALEntryFilter filter;
private long sleepForRetries;
private final long sleepForRetries;
private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
private ReplicationQueueInfo replicationQueueInfo;
private int maxRetriesMultiplier;
private MetricsSource metrics;
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
@ -85,42 +87,39 @@ public class ReplicationSourceWALReaderThread extends Thread {
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
* @param manager replication manager
* @param replicationQueueInfo
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param fs the files system to use
* @param conf configuration to use
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param filter The filter to use while reading
* @param metrics replication metrics
* @param source replication source
*/
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
long startPosition,
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
this.replicationQueueInfo = replicationQueueInfo;
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
this.logQueue = logQueue;
this.currentPosition = startPosition;
this.fs = fs;
this.conf = conf;
this.filter = filter;
this.source = source;
this.replicationBatchSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics;
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ ", replicationBatchQueueCapacity=" + batchCount);
@ -131,37 +130,12 @@ public class ReplicationSourceWALReaderThread extends Thread {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) {
continue;
}
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
if (updateSerialReplPos(batch, entry)) {
batch.lastWalPosition = entryStream.getPosition();
break;
}
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
WALEntryBatch batch = readWALEntries(entryStream);
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication",
@ -170,16 +144,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
LOG.trace("Didn't read any new entries from WAL");
if (replicationQueueInfo.isQueueRecovered()) {
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
} else {
Thread.sleep(sleepForRetries);
}
handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
}
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
@ -200,12 +165,47 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
}
private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
if (updateSerialReplPos(batch, entry)) {
batch.lastWalPosition = entryStream.getPosition();
break;
}
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
return batch;
}
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
}
// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(Exception e) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
@ -241,7 +241,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
private Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) {
metrics.incrLogEditsFiltered();
source.getSourceMetrics().incrLogEditsFiltered();
}
return filtered;
}
@ -414,7 +414,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* @param lastWalPath Path of the WAL the last entry in this batch was read from
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
*/
private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -348,8 +348,11 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();