diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d3bcff11e21..158330e8a9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -61,22 +61,33 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue queue) { - final RecoveredReplicationSourceShipperThread worker = - new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this, + protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + final RecoveredReplicationSourceShipper worker = + new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, this.replicationQueues); - ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); + ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); worker.startup(getUncaughtExceptionHandler()); worker.setWALReader( - startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition())); + startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); workerThreads.put(walGroupId, worker); } } + @Override + protected ReplicationSourceWALReader startNewWALReader(String threadName, + String walGroupId, PriorityBlockingQueue queue, long startPosition) { + ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, + conf, queue, startPosition, walEntryFilter, this); + Threads.setDaemonThreadRunning(walReader, threadName + + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + getUncaughtExceptionHandler()); + return walReader; + } + public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { boolean hasPathChanged = false; PriorityBlockingQueue newPaths = @@ -161,7 +172,7 @@ public class RecoveredReplicationSource extends ReplicationSource { synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit boolean allTasksDone = true; - for (ReplicationSourceShipperThread worker : workerThreads.values()) { + for (ReplicationSourceShipper worker : workerThreads.values()) { if (!worker.isFinished()) { allTasksDone = false; break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java similarity index 95% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 65aeb2f88bd..a737910a795 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -28,20 +28,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.util.Threads; /** * Used by a {@link RecoveredReplicationSource}. */ @InterfaceAudience.Private -public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread { +public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper { - private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class); + private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class); protected final RecoveredReplicationSource source; private final ReplicationQueues replicationQueues; - public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId, + public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, RecoveredReplicationSource source, ReplicationQueues replicationQueues) { super(conf, walGroupId, queue, source); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java new file mode 100644 index 00000000000..6462a2a7d31 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java @@ -0,0 +1,55 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.replication.WALEntryFilter; + +/** + * Used by a {@link RecoveredReplicationSourceShipper}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader { + private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceWALReader.class); + + public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + ReplicationSource source) { + super(fs, conf, logQueue, startPosition, filter, source); + } + + protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) + throws InterruptedException { + LOG.trace("Didn't read any new entries from WAL"); + // we're done with queue recovery, shut ourself down + setReaderRunning(false); + // shuts down shipper thread immediately + entryBatchQueue.put(batch != null ? batch + : new WALEntryBatch(replicationBatchCountCapacity, currentPath)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1dbf07f117f..3d4353fbfc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -119,12 +119,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // ReplicationEndpoint which will handle the actual replication private ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. - private WALEntryFilter walEntryFilter; + protected WALEntryFilter walEntryFilter; // throttler private ReplicationThrottler throttler; private long defaultBandwidth; private long currentBandwidth; - protected final ConcurrentHashMap workerThreads = + protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); private AtomicLong totalBufferUsed; @@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker - tryStartNewShipperThread(logPrefix, queue); + tryStartNewShipper(logPrefix, queue); } } queue.put(log); @@ -255,15 +255,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf throw new RuntimeException(ex); } - // get the WALEntryFilter from ReplicationEndpoint and add it to default filters - ArrayList filters = Lists.newArrayList( - (WALEntryFilter)new SystemTableWALEntryFilter()); - WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); - if (filterFromEndpoint != null) { - filters.add(filterFromEndpoint); - } - this.walEntryFilter = new ChainWALEntryFilter(filters); - int sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isSourceActive() && this.peerClusterId == null) { @@ -285,40 +276,50 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + + initializeWALEntryFilter(); // start workers for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue queue = entry.getValue(); - tryStartNewShipperThread(walGroupId, queue); + tryStartNewShipper(walGroupId, queue); } } - protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue queue) { - final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf, + private void initializeWALEntryFilter() { + // get the WALEntryFilter from ReplicationEndpoint and add it to default filters + ArrayList filters = Lists.newArrayList( + (WALEntryFilter)new SystemTableWALEntryFilter()); + WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); + if (filterFromEndpoint != null) { + filters.add(filterFromEndpoint); + } + filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); + this.walEntryFilter = new ChainWALEntryFilter(filters); + } + + protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this); - ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker); + ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); worker.startup(getUncaughtExceptionHandler()); - worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue, + worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); workerThreads.put(walGroupId, worker); } } - protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName, - String walGroupId, PriorityBlockingQueue queue, long startPosition) { - ArrayList filters = Lists.newArrayList(walEntryFilter, - new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); - ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); - ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager, - replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics); - Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + PriorityBlockingQueue queue, long startPosition) { + ReplicationSourceWALReader walReader = + new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, + threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode, getUncaughtExceptionHandler()); - return walReader; } public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { @@ -446,8 +447,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf + " because an error occurred: " + reason, cause); } this.sourceRunning = false; - Collection workers = workerThreads.values(); - for (ReplicationSourceShipperThread worker : workers) { + Collection workers = workerThreads.values(); + for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); worker.entryReader.interrupt(); worker.interrupt(); @@ -457,7 +458,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf future = this.replicationEndpoint.stop(); } if (join) { - for (ReplicationSourceShipperThread worker : workers) { + for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); } @@ -486,7 +487,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public Path getCurrentPath() { // only for testing - for (ReplicationSourceShipperThread worker : workerThreads.values()) { + for (ReplicationSourceShipper worker : workerThreads.values()) { if (worker.getCurrentPath() != null) return worker.getCurrentPath(); } return null; @@ -524,9 +525,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf StringBuilder sb = new StringBuilder(); sb.append("Total replicated edits: ").append(totalReplicatedEdits) .append(", current progress: \n"); - for (Map.Entry entry : workerThreads.entrySet()) { + for (Map.Entry entry : workerThreads.entrySet()) { String walGroupId = entry.getKey(); - ReplicationSourceShipperThread worker = entry.getValue(); + ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); Path currentPath = worker.getCurrentPath(); sb.append("walGroup [").append(walGroupId).append("]: "); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java similarity index 97% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 6807da27b5e..3e1e50be6c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.util.Bytes; @@ -51,8 +51,8 @@ import com.google.common.cache.LoadingCache; * ReplicationSourceWALReaderThread */ @InterfaceAudience.Private -public class ReplicationSourceShipperThread extends Thread { - private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class); +public class ReplicationSourceShipper extends Thread { + private static final Log LOG = LogFactory.getLog(ReplicationSourceShipper.class); // Hold the state of a replication worker thread public enum WorkerState { @@ -72,7 +72,7 @@ public class ReplicationSourceShipperThread extends Thread { protected volatile Path currentPath; // Current state of the worker thread private WorkerState state; - protected ReplicationSourceWALReaderThread entryReader; + protected ReplicationSourceWALReader entryReader; // How long should we sleep for each retry protected final long sleepForRetries; @@ -90,7 +90,7 @@ public class ReplicationSourceShipperThread extends Thread { } ); - public ReplicationSourceShipperThread(Configuration conf, String walGroupId, + public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, ReplicationSourceInterface source) { this.conf = conf; this.walGroupId = walGroupId; @@ -310,7 +310,7 @@ public class ReplicationSourceShipperThread extends Thread { return this.lastLoggedPosition; } - public void setWALReader(ReplicationSourceWALReaderThread entryReader) { + public void setWALReader(ReplicationSourceWALReader entryReader) { this.entryReader = entryReader; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java similarity index 83% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index c1af6e675dc..04b596c6a1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -58,26 +58,28 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ReplicationSourceWALReaderThread extends Thread { - private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class); +public class ReplicationSourceWALReader extends Thread { + private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReader.class); - private PriorityBlockingQueue logQueue; - private FileSystem fs; - private Configuration conf; - private BlockingQueue entryBatchQueue; + private final PriorityBlockingQueue logQueue; + private final FileSystem fs; + private final Configuration conf; + private final WALEntryFilter filter; + private final ReplicationSource source; + + protected final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total - private long replicationBatchSizeCapacity; + private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total - private int replicationBatchCountCapacity; + protected final int replicationBatchCountCapacity; // position in the WAL to start reading at private long currentPosition; - private WALEntryFilter filter; - private long sleepForRetries; + private final long sleepForRetries; + private final int maxRetriesMultiplier; + private final boolean eofAutoRecovery; + //Indicates whether this particular worker is running private boolean isReaderRunning = true; - private ReplicationQueueInfo replicationQueueInfo; - private int maxRetriesMultiplier; - private MetricsSource metrics; private AtomicLong totalBufferUsed; private long totalBufferQuota; @@ -85,42 +87,39 @@ public class ReplicationSourceWALReaderThread extends Thread { /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. - * @param manager replication manager - * @param replicationQueueInfo - * @param logQueue The WAL queue to read off of - * @param startPosition position in the first WAL to start reading from * @param fs the files system to use * @param conf configuration to use + * @param logQueue The WAL queue to read off of + * @param startPosition position in the first WAL to start reading from * @param filter The filter to use while reading - * @param metrics replication metrics + * @param source replication source */ - public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, - ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue logQueue, - long startPosition, - FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) { - this.replicationQueueInfo = replicationQueueInfo; + public ReplicationSourceWALReader(FileSystem fs, Configuration conf, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; this.fs = fs; this.conf = conf; this.filter = filter; + this.source = source; this.replicationBatchSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.totalBufferUsed = manager.getTotalBufferUsed(); + this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per - this.metrics = metrics; + this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); - LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() - + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + LOG.info("peerClusterZnode=" + source.getPeerClusterZnode() + + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity + ", replicationBatchQueueCapacity=" + batchCount); @@ -131,37 +130,12 @@ public class ReplicationSourceWALReaderThread extends Thread { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { + new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!checkQuota()) { continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { - if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); - } - Entry entry = entryStream.next(); - if (updateSerialReplPos(batch, entry)) { - batch.lastWalPosition = entryStream.getPosition(); - break; - } - entry = filterEntry(entry); - if (entry != null) { - WALEdit edit = entry.getEdit(); - if (edit != null && !edit.isEmpty()) { - long entrySize = getEntrySize(entry); - batch.addEntry(entry); - updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySize); - // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity - || batch.getNbEntries() >= replicationBatchCountCapacity) { - break; - } - } - } - } + WALEntryBatch batch = readWALEntries(entryStream); if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { if (LOG.isTraceEnabled()) { LOG.trace(String.format("Read %s WAL entries eligible for replication", @@ -170,16 +144,7 @@ public class ReplicationSourceWALReaderThread extends Thread { entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL - LOG.trace("Didn't read any new entries from WAL"); - if (replicationQueueInfo.isQueueRecovered()) { - // we're done with queue recovery, shut ourself down - setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); - } else { - Thread.sleep(sleepForRetries); - } + handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath()); } currentPosition = entryStream.getPosition(); entryStream.reset(); // reuse stream @@ -200,12 +165,47 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException { + WALEntryBatch batch = null; + while (entryStream.hasNext()) { + if (batch == null) { + batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + } + Entry entry = entryStream.next(); + if (updateSerialReplPos(batch, entry)) { + batch.lastWalPosition = entryStream.getPosition(); + break; + } + entry = filterEntry(entry); + if (entry != null) { + WALEdit edit = entry.getEdit(); + if (edit != null && !edit.isEmpty()) { + long entrySize = getEntrySize(entry); + batch.addEntry(entry); + updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySize); + // Stop if too many entries or too big + if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity + || batch.getNbEntries() >= replicationBatchCountCapacity) { + break; + } + } + } + } + return batch; + } + + protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) + throws InterruptedException { + LOG.trace("Didn't read any new entries from WAL"); + Thread.sleep(sleepForRetries); + } + // if we get an EOF due to a zero-length log, and there are other logs in queue // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log private void handleEofException(Exception e) { - if (e.getCause() instanceof EOFException && logQueue.size() > 1 - && conf.getBoolean("replication.source.eof.autorecovery", false)) { + if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); @@ -241,7 +241,7 @@ public class ReplicationSourceWALReaderThread extends Thread { private Entry filterEntry(Entry entry) { Entry filtered = filter.filter(entry); if (entry != null && filtered == null) { - metrics.incrLogEditsFiltered(); + source.getSourceMetrics().incrLogEditsFiltered(); } return filtered; } @@ -414,7 +414,7 @@ public class ReplicationSourceWALReaderThread extends Thread { * @param lastWalPath Path of the WAL the last entry in this batch was read from * @param lastWalPosition Position in the WAL the last entry in this batch was read from */ - private WALEntryBatch(int maxNbEntries, Path lastWalPath) { + WALEntryBatch(int maxNbEntries, Path lastWalPath) { this.walEntries = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 5337f381ccf..ebbdef116ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -348,8 +348,11 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, - fs, conf, getDummyFilter(), new MetricsSource("1")); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + when(source.getSourceManager()).thenReturn(mockSourceManager); + when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, + walQueue, 0, getDummyFilter(), source); Path walPath = walQueue.peek(); batcher.start(); WALEntryBatch entryBatch = batcher.take();