HBASE-18170 Refactor ReplicationSourceWALReaderThread

This commit is contained in:
Guanghao Zhang 2017-06-17 00:45:52 +08:00
parent fa93c0f59a
commit 74c6f44877
7 changed files with 190 additions and 120 deletions

View File

@ -61,22 +61,33 @@ public class RecoveredReplicationSource extends ReplicationSource {
} }
@Override @Override
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) { protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final RecoveredReplicationSourceShipperThread worker = final RecoveredReplicationSourceShipper worker =
new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this, new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
this.replicationQueues); this.replicationQueues);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) { if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else { } else {
LOG.debug("Starting up worker for wal group " + walGroupId); LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler()); worker.startup(getUncaughtExceptionHandler());
worker.setWALReader( worker.setWALReader(
startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition())); startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker); workerThreads.put(walGroupId, worker);
} }
} }
@Override
protected ReplicationSourceWALReader startNewWALReader(String threadName,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler());
return walReader;
}
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false; boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths = PriorityBlockingQueue<Path> newPaths =
@ -161,7 +172,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
synchronized (workerThreads) { synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allTasksDone = true; boolean allTasksDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) { for (ReplicationSourceShipper worker : workerThreads.values()) {
if (!worker.isFinished()) { if (!worker.isFinished()) {
allTasksDone = false; allTasksDone = false;
break; break;

View File

@ -28,20 +28,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
/** /**
* Used by a {@link RecoveredReplicationSource}. * Used by a {@link RecoveredReplicationSource}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread { public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class); private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class);
protected final RecoveredReplicationSource source; protected final RecoveredReplicationSource source;
private final ReplicationQueues replicationQueues; private final ReplicationQueues replicationQueues;
public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId, public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source, PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationQueues replicationQueues) { ReplicationQueues replicationQueues) {
super(conf, walGroupId, queue, source); super(conf, walGroupId, queue, source);

View File

@ -0,0 +1,55 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
/**
* Used by a {@link RecoveredReplicationSourceShipper}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceWALReader.class);
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
super(fs, conf, logQueue, startPosition, filter, source);
}
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, currentPath));
}
}

View File

@ -119,12 +119,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint; private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries. // A filter (or a chain of filters) for the WAL entries.
private WALEntryFilter walEntryFilter; protected WALEntryFilter walEntryFilter;
// throttler // throttler
private ReplicationThrottler throttler; private ReplicationThrottler throttler;
private long defaultBandwidth; private long defaultBandwidth;
private long currentBandwidth; private long currentBandwidth;
protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads = protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed; private AtomicLong totalBufferUsed;
@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// new wal group observed after source startup, start a new worker thread to track it // new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread // notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker // still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipperThread(logPrefix, queue); tryStartNewShipper(logPrefix, queue);
} }
} }
queue.put(log); queue.put(log);
@ -255,15 +255,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
this.walEntryFilter = new ChainWALEntryFilter(filters);
int sleepMultiplier = 1; int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) { while (this.isSourceActive() && this.peerClusterId == null) {
@ -285,40 +276,50 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return; return;
} }
LOG.info("Replicating " + clusterId + " -> " + peerClusterId); LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers // start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey(); String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue(); PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipperThread(walGroupId, queue); tryStartNewShipper(walGroupId, queue);
} }
} }
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) { private void initializeWALEntryFilter() {
final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf, // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
this.walEntryFilter = new ChainWALEntryFilter(filters);
}
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
walGroupId, queue, this); walGroupId, queue, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) { if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else { } else {
LOG.debug("Starting up worker for wal group " + walGroupId); LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler()); worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
worker.getStartPosition())); worker.getStartPosition()));
workerThreads.put(walGroupId, worker); workerThreads.put(walGroupId, worker);
} }
} }
protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName, protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { PriorityBlockingQueue<Path> queue, long startPosition) {
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter, ReplicationSourceWALReader walReader =
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager, threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler()); getUncaughtExceptionHandler());
return walReader;
} }
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
@ -446,8 +447,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
+ " because an error occurred: " + reason, cause); + " because an error occurred: " + reason, cause);
} }
this.sourceRunning = false; this.sourceRunning = false;
Collection<ReplicationSourceShipperThread> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipperThread worker : workers) { for (ReplicationSourceShipper worker : workers) {
worker.stopWorker(); worker.stopWorker();
worker.entryReader.interrupt(); worker.entryReader.interrupt();
worker.interrupt(); worker.interrupt();
@ -457,7 +458,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
future = this.replicationEndpoint.stop(); future = this.replicationEndpoint.stop();
} }
if (join) { if (join) {
for (ReplicationSourceShipperThread worker : workers) { for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries); Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
} }
@ -486,7 +487,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override @Override
public Path getCurrentPath() { public Path getCurrentPath() {
// only for testing // only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) { for (ReplicationSourceShipper worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath(); if (worker.getCurrentPath() != null) return worker.getCurrentPath();
} }
return null; return null;
@ -524,9 +525,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits) sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n"); .append(", current progress: \n");
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) { for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey(); String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue(); ReplicationSourceShipper worker = entry.getValue();
long position = worker.getCurrentPosition(); long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath(); Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: "); sb.append("walGroup [").append(walGroupId).append("]: ");

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -51,8 +51,8 @@ import com.google.common.cache.LoadingCache;
* ReplicationSourceWALReaderThread * ReplicationSourceWALReaderThread
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationSourceShipperThread extends Thread { public class ReplicationSourceShipper extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class); private static final Log LOG = LogFactory.getLog(ReplicationSourceShipper.class);
// Hold the state of a replication worker thread // Hold the state of a replication worker thread
public enum WorkerState { public enum WorkerState {
@ -72,7 +72,7 @@ public class ReplicationSourceShipperThread extends Thread {
protected volatile Path currentPath; protected volatile Path currentPath;
// Current state of the worker thread // Current state of the worker thread
private WorkerState state; private WorkerState state;
protected ReplicationSourceWALReaderThread entryReader; protected ReplicationSourceWALReader entryReader;
// How long should we sleep for each retry // How long should we sleep for each retry
protected final long sleepForRetries; protected final long sleepForRetries;
@ -90,7 +90,7 @@ public class ReplicationSourceShipperThread extends Thread {
} }
); );
public ReplicationSourceShipperThread(Configuration conf, String walGroupId, public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
this.conf = conf; this.conf = conf;
this.walGroupId = walGroupId; this.walGroupId = walGroupId;
@ -310,7 +310,7 @@ public class ReplicationSourceShipperThread extends Thread {
return this.lastLoggedPosition; return this.lastLoggedPosition;
} }
public void setWALReader(ReplicationSourceWALReaderThread entryReader) { public void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader; this.entryReader = entryReader;
} }

View File

@ -58,26 +58,28 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ReplicationSourceWALReaderThread extends Thread { public class ReplicationSourceWALReader extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class); private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReader.class);
private PriorityBlockingQueue<Path> logQueue; private final PriorityBlockingQueue<Path> logQueue;
private FileSystem fs; private final FileSystem fs;
private Configuration conf; private final Configuration conf;
private BlockingQueue<WALEntryBatch> entryBatchQueue; private final WALEntryFilter filter;
private final ReplicationSource source;
protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total // max (heap) size of each batch - multiply by number of batches in queue to get total
private long replicationBatchSizeCapacity; private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total // max count of each batch - multiply by number of batches in queue to get total
private int replicationBatchCountCapacity; protected final int replicationBatchCountCapacity;
// position in the WAL to start reading at // position in the WAL to start reading at
private long currentPosition; private long currentPosition;
private WALEntryFilter filter; private final long sleepForRetries;
private long sleepForRetries; private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
//Indicates whether this particular worker is running //Indicates whether this particular worker is running
private boolean isReaderRunning = true; private boolean isReaderRunning = true;
private ReplicationQueueInfo replicationQueueInfo;
private int maxRetriesMultiplier;
private MetricsSource metrics;
private AtomicLong totalBufferUsed; private AtomicLong totalBufferUsed;
private long totalBufferQuota; private long totalBufferQuota;
@ -85,42 +87,39 @@ public class ReplicationSourceWALReaderThread extends Thread {
/** /**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue. * entries, and puts them on a batch queue.
* @param manager replication manager
* @param replicationQueueInfo
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param fs the files system to use * @param fs the files system to use
* @param conf configuration to use * @param conf configuration to use
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param filter The filter to use while reading * @param filter The filter to use while reading
* @param metrics replication metrics * @param source replication source
*/ */
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue, PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
long startPosition, ReplicationSource source) {
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue; this.logQueue = logQueue;
this.currentPosition = startPosition; this.currentPosition = startPosition;
this.fs = fs; this.fs = fs;
this.conf = conf; this.conf = conf;
this.filter = filter; this.filter = filter;
this.source = source;
this.replicationBatchSizeCapacity = this.replicationBatchSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
// memory used will be batchSizeCapacity * (nb.batches + 1) // memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue // the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1); int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = manager.getTotalBufferUsed(); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries = this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics; this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ ", replicationBatchQueueCapacity=" + batchCount); + ", replicationBatchQueueCapacity=" + batchCount);
@ -131,37 +130,12 @@ public class ReplicationSourceWALReaderThread extends Thread {
int sleepMultiplier = 1; int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) { if (!checkQuota()) {
continue; continue;
} }
WALEntryBatch batch = null; WALEntryBatch batch = readWALEntries(entryStream);
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
if (updateSerialReplPos(batch, entry)) {
batch.lastWalPosition = entryStream.getPosition();
break;
}
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication", LOG.trace(String.format("Read %s WAL entries eligible for replication",
@ -170,16 +144,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
entryBatchQueue.put(batch); entryBatchQueue.put(batch);
sleepMultiplier = 1; sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL } else { // got no entries and didn't advance position in WAL
LOG.trace("Didn't read any new entries from WAL"); handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
if (replicationQueueInfo.isQueueRecovered()) {
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
} else {
Thread.sleep(sleepForRetries);
}
} }
currentPosition = entryStream.getPosition(); currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream entryStream.reset(); // reuse stream
@ -200,12 +165,47 @@ public class ReplicationSourceWALReaderThread extends Thread {
} }
} }
private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
if (updateSerialReplPos(batch, entry)) {
batch.lastWalPosition = entryStream.getPosition();
break;
}
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
return batch;
}
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
}
// if we get an EOF due to a zero-length log, and there are other logs in queue // if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log // enabled, then dump the log
private void handleEofException(Exception e) { private void handleEofException(Exception e) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1 if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
try { try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
@ -241,7 +241,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
private Entry filterEntry(Entry entry) { private Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry); Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) { if (entry != null && filtered == null) {
metrics.incrLogEditsFiltered(); source.getSourceMetrics().incrLogEditsFiltered();
} }
return filtered; return filtered;
} }
@ -414,7 +414,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
* @param lastWalPath Path of the WAL the last entry in this batch was read from * @param lastWalPath Path of the WAL the last entry in this batch was read from
* @param lastWalPosition Position in the WAL the last entry in this batch was read from * @param lastWalPosition Position in the WAL the last entry in this batch was read from
*/ */
private WALEntryBatch(int maxNbEntries, Path lastWalPath) { WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries); this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath; this.lastWalPath = lastWalPath;
} }

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -348,8 +348,11 @@ public class TestWALEntryStream {
// start up a batcher // start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, ReplicationSource source = Mockito.mock(ReplicationSource.class);
fs, conf, getDummyFilter(), new MetricsSource("1")); when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek(); Path walPath = walQueue.peek();
batcher.start(); batcher.start();
WALEntryBatch entryBatch = batcher.take(); WALEntryBatch entryBatch = batcher.take();