HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin
This commit is contained in:
parent
f7a986cb67
commit
4341c3f554
|
@ -59,7 +59,7 @@ public class Threads {
|
|||
* @param t thread to run
|
||||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t) {
|
||||
public static <T extends Thread> T setDaemonThreadRunning(T t) {
|
||||
return setDaemonThreadRunning(t, t.getName());
|
||||
}
|
||||
|
||||
|
@ -69,8 +69,7 @@ public class Threads {
|
|||
* @param name new name
|
||||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t,
|
||||
final String name) {
|
||||
public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
|
||||
return setDaemonThreadRunning(t, name, null);
|
||||
}
|
||||
|
||||
|
@ -78,12 +77,11 @@ public class Threads {
|
|||
* Utility method that sets name, daemon status and starts passed thread.
|
||||
* @param t thread to frob
|
||||
* @param name new name
|
||||
* @param handler A handler to set on the thread. Pass null if want to
|
||||
* use default handler.
|
||||
* @param handler A handler to set on the thread. Pass null if want to use default handler.
|
||||
* @return Returns the passed Thread <code>t</code>.
|
||||
*/
|
||||
public static Thread setDaemonThreadRunning(final Thread t,
|
||||
final String name, final UncaughtExceptionHandler handler) {
|
||||
public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
|
||||
UncaughtExceptionHandler handler) {
|
||||
t.setName(name);
|
||||
if (handler != null) {
|
||||
t.setUncaughtExceptionHandler(handler);
|
||||
|
|
|
@ -1586,7 +1586,7 @@ public class HRegionServer extends HasThread implements
|
|||
// Save it in a file, this will allow to see if we crash
|
||||
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
|
||||
|
||||
this.walFactory = setupWALAndReplication();
|
||||
setupWALAndReplication();
|
||||
// Init in here rather than in constructor after thread name has been set
|
||||
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
|
||||
this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
|
||||
|
@ -1855,13 +1855,12 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* Setup WAL log and replication if enabled.
|
||||
* Replication setup is done in here because it wants to be hooked up to WAL.
|
||||
* @return A WAL instance.
|
||||
* @throws IOException
|
||||
*/
|
||||
private WALFactory setupWALAndReplication() throws IOException {
|
||||
private void setupWALAndReplication() throws IOException {
|
||||
// TODO Replication make assumptions here based on the default filesystem impl
|
||||
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
|
||||
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
|
||||
|
||||
Path logDir = new Path(walRootDir, logName);
|
||||
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
|
||||
|
@ -1875,7 +1874,7 @@ public class HRegionServer extends HasThread implements
|
|||
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
|
||||
|
||||
// listeners the wal factory will add to wals it creates.
|
||||
final List<WALActionsListener> listeners = new ArrayList<>();
|
||||
List<WALActionsListener> listeners = new ArrayList<>();
|
||||
listeners.add(new MetricsWAL());
|
||||
if (this.replicationSourceHandler != null &&
|
||||
this.replicationSourceHandler.getWALActionsListener() != null) {
|
||||
|
@ -1883,7 +1882,21 @@ public class HRegionServer extends HasThread implements
|
|||
listeners.add(this.replicationSourceHandler.getWALActionsListener());
|
||||
}
|
||||
|
||||
return new WALFactory(conf, listeners, serverName.toString());
|
||||
// There is a cyclic dependency between ReplicationSourceHandler and WALFactory.
|
||||
// We use WALActionsListener to get the newly rolled WALs, so we need to get the
|
||||
// WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
|
||||
// ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
|
||||
// So we here we need to construct WALFactory first, and then pass it to the initialize method
|
||||
// of ReplicationSourceHandler.
|
||||
WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
|
||||
this.walFactory = factory;
|
||||
if (this.replicationSourceHandler != null) {
|
||||
this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory);
|
||||
}
|
||||
if (this.replicationSinkHandler != null &&
|
||||
this.replicationSinkHandler != this.replicationSourceHandler) {
|
||||
this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory);
|
||||
}
|
||||
}
|
||||
|
||||
public MetricsRegionServer getRegionServerMetrics() {
|
||||
|
@ -2898,7 +2911,7 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* Load the replication service objects, if any
|
||||
*/
|
||||
static private void createNewReplicationInstance(Configuration conf,
|
||||
private static void createNewReplicationInstance(Configuration conf,
|
||||
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
|
||||
|
||||
if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
|
||||
|
@ -2908,47 +2921,41 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
// read in the name of the source replication class from the config file.
|
||||
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
|
||||
// read in the name of the sink replication class from the config file.
|
||||
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
|
||||
// If both the sink and the source class names are the same, then instantiate
|
||||
// only one object.
|
||||
if (sourceClassname.equals(sinkClassname)) {
|
||||
server.replicationSourceHandler = (ReplicationSourceService)
|
||||
newReplicationInstance(sourceClassname,
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService)
|
||||
server.replicationSourceHandler;
|
||||
server.replicationSourceHandler =
|
||||
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
|
||||
walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
|
||||
} else {
|
||||
server.replicationSourceHandler = (ReplicationSourceService)
|
||||
newReplicationInstance(sourceClassname,
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService)
|
||||
newReplicationInstance(sinkClassname,
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
server.replicationSourceHandler =
|
||||
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
|
||||
walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname,
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
}
|
||||
}
|
||||
|
||||
static private ReplicationService newReplicationInstance(String classname,
|
||||
Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
|
||||
Path oldLogDir) throws IOException{
|
||||
|
||||
Class<?> clazz = null;
|
||||
private static ReplicationService newReplicationInstance(String classname, Configuration conf,
|
||||
HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException {
|
||||
Class<? extends ReplicationService> clazz = null;
|
||||
try {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
clazz = Class.forName(classname, true, classLoader);
|
||||
clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class);
|
||||
} catch (java.lang.ClassNotFoundException nfe) {
|
||||
throw new IOException("Could not find class for " + classname);
|
||||
}
|
||||
|
||||
// create an instance of the replication object.
|
||||
ReplicationService service = (ReplicationService)
|
||||
ReflectionUtils.newInstance(clazz, conf);
|
||||
service.initialize(server, walFs, logDir, oldLogDir);
|
||||
return service;
|
||||
// create an instance of the replication object, but do not initialize it here as we need to use
|
||||
// WALFactory when initializing.
|
||||
return ReflectionUtils.newInstance(clazz, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,17 +20,17 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
|
||||
|
||||
/**
|
||||
* Gateway to Cluster Replication.
|
||||
* Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
|
||||
* One such application is a cross-datacenter
|
||||
* replication service that can keep two hbase clusters in sync.
|
||||
* Gateway to Cluster Replication. Used by
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. One such application is a
|
||||
* cross-datacenter replication service that can keep two hbase clusters in sync.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationService {
|
||||
|
@ -39,9 +39,8 @@ public interface ReplicationService {
|
|||
* Initializes the replication service object.
|
||||
* @throws IOException
|
||||
*/
|
||||
void initialize(
|
||||
Server rs, FileSystem fs, Path logdir, Path oldLogDir
|
||||
) throws IOException;
|
||||
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException;
|
||||
|
||||
/**
|
||||
* Start replication services.
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.*;
|
||||
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkArgument;
|
||||
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.management.MemoryType;
|
||||
|
@ -29,6 +32,7 @@ import java.util.Arrays;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -58,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
||||
|
@ -68,6 +73,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.NullScope;
|
||||
|
@ -75,9 +81,6 @@ import org.apache.htrace.Span;
|
|||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceScope;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
/**
|
||||
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
|
||||
* WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
|
||||
|
@ -105,7 +108,7 @@ import com.lmax.disruptor.RingBuffer;
|
|||
* (Need to keep our own file lengths, not rely on HDFS).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class AbstractFSWAL<W> implements WAL {
|
||||
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class);
|
||||
|
||||
|
@ -983,6 +986,28 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
+ filenum + ")";
|
||||
}
|
||||
|
||||
/**
|
||||
* if the given {@code path} is being written currently, then return its length.
|
||||
* <p>
|
||||
* This is used by replication to prevent replicating unacked log entries. See
|
||||
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
|
||||
*/
|
||||
@Override
|
||||
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
|
||||
rollWriterLock.lock();
|
||||
try {
|
||||
Path currentPath = getOldPath();
|
||||
if (path.equals(currentPath)) {
|
||||
W writer = this.writer;
|
||||
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
|
||||
} else {
|
||||
return OptionalLong.empty();
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
|
||||
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
|
||||
|
|
|
@ -707,8 +707,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
@Override
|
||||
protected void doShutdown() throws IOException {
|
||||
waitForSafePoint();
|
||||
this.writer.close();
|
||||
this.writer = null;
|
||||
if (this.writer != null) {
|
||||
this.writer.close();
|
||||
this.writer = null;
|
||||
}
|
||||
closeExecutor.shutdown();
|
||||
IOException error = new IOException("WAL has been closed");
|
||||
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
|
@ -46,8 +45,12 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -62,6 +65,9 @@ import org.apache.htrace.NullScope;
|
|||
import org.apache.htrace.Span;
|
||||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The default implementation of FSWAL.
|
||||
|
|
|
@ -50,13 +50,12 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
|||
private String actualPeerId;
|
||||
|
||||
@Override
|
||||
public void init(final Configuration conf, final FileSystem fs,
|
||||
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final Stoppable stopper,
|
||||
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
final MetricsSource metrics) throws IOException {
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
|
||||
clusterId, replicationEndpoint, metrics);
|
||||
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
|
||||
this.actualPeerId = this.replicationQueueInfo.getPeerId();
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -85,6 +86,7 @@ public class Replication extends WALActionsListener.Base implements
|
|||
private int statsThreadPeriod;
|
||||
// ReplicationLoad to access replication metrics
|
||||
private ReplicationLoad replicationLoad;
|
||||
|
||||
/**
|
||||
* Instantiate the replication management (if rep is enabled).
|
||||
* @param server Hosting server
|
||||
|
@ -93,9 +95,8 @@ public class Replication extends WALActionsListener.Base implements
|
|||
* @param oldLogDir directory where logs are archived
|
||||
* @throws IOException
|
||||
*/
|
||||
public Replication(final Server server, final FileSystem fs,
|
||||
final Path logDir, final Path oldLogDir) throws IOException{
|
||||
initialize(server, fs, logDir, oldLogDir);
|
||||
public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
|
||||
initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -104,8 +105,8 @@ public class Replication extends WALActionsListener.Base implements
|
|||
public Replication() {
|
||||
}
|
||||
|
||||
public void initialize(final Server server, final FileSystem fs,
|
||||
final Path logDir, final Path oldLogDir) throws IOException {
|
||||
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException {
|
||||
this.server = server;
|
||||
this.conf = this.server.getConfiguration();
|
||||
this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
|
||||
|
@ -144,8 +145,8 @@ public class Replication extends WALActionsListener.Base implements
|
|||
throw new IOException("Could not read cluster id", ke);
|
||||
}
|
||||
this.replicationManager =
|
||||
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
|
||||
conf, this.server, fs, logDir, oldLogDir, clusterId);
|
||||
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
|
||||
this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
|
||||
this.statsThreadPeriod =
|
||||
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
|
||||
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -62,6 +60,8 @@ import org.apache.hadoop.hbase.util.Threads;
|
|||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
|
||||
|
||||
/**
|
||||
* Class that handles the source of a replication stream.
|
||||
|
@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
* A stream is considered down when we cannot contact a region server on the
|
||||
* peer cluster for more than 55 seconds by default.
|
||||
* </p>
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
|
||||
|
@ -123,6 +122,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
private ReplicationThrottler throttler;
|
||||
private long defaultBandwidth;
|
||||
private long currentBandwidth;
|
||||
private WALFileLengthProvider walFileLengthProvider;
|
||||
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -147,12 +147,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void init(final Configuration conf, final FileSystem fs,
|
||||
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final Stoppable stopper,
|
||||
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
final MetricsSource metrics)
|
||||
throws IOException {
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
this.stopper = stopper;
|
||||
this.conf = HBaseConfiguration.create(conf);
|
||||
this.waitOnEndpointSeconds =
|
||||
|
@ -181,6 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
currentBandwidth = getCurrentBandwidth();
|
||||
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
|
||||
this.totalBufferUsed = manager.getTotalBufferUsed();
|
||||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
|
||||
+ ", currentBandwidth=" + this.currentBandwidth);
|
||||
}
|
||||
|
@ -560,4 +559,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
totalReplicatedEdits.addAndGet(entries.size());
|
||||
totalBufferUsed.addAndGet(-batchSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WALFileLengthProvider getWALFileLengthProvider() {
|
||||
return walFileLengthProvider;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,11 +53,10 @@ public interface ReplicationSourceInterface {
|
|||
* @param clusterId
|
||||
* @throws IOException
|
||||
*/
|
||||
public void init(final Configuration conf, final FileSystem fs,
|
||||
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final Stoppable stopper,
|
||||
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
final MetricsSource metrics) throws IOException;
|
||||
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
|
||||
|
||||
/**
|
||||
* Add a log to the list of logs to replicate
|
||||
|
@ -146,6 +145,11 @@ public interface ReplicationSourceInterface {
|
|||
*/
|
||||
ReplicationSourceManager getSourceManager();
|
||||
|
||||
/**
|
||||
* @return the wal file length provider
|
||||
*/
|
||||
WALFileLengthProvider getWALFileLengthProvider();
|
||||
|
||||
/**
|
||||
* Try to throttle when the peer config with a bandwidth
|
||||
* @param batchSize entries size will be pushed
|
||||
|
|
|
@ -19,9 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -31,7 +28,6 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
@ -40,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -68,10 +65,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This class is responsible to manage all the replication
|
||||
* sources. There are two classes of sources:
|
||||
|
@ -116,12 +116,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
private final Path logDir;
|
||||
// Path to the wal archive
|
||||
private final Path oldLogDir;
|
||||
private final WALFileLengthProvider walFileLengthProvider;
|
||||
// The number of ms that we wait before moving znodes, HBASE-3596
|
||||
private final long sleepBeforeFailover;
|
||||
// Homemade executer service for replication
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
private final Random rand;
|
||||
private final boolean replicationForBulkLoadDataEnabled;
|
||||
|
||||
private Connection connection;
|
||||
|
@ -141,10 +141,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param oldLogDir the directory where old logs are archived
|
||||
* @param clusterId
|
||||
*/
|
||||
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
|
||||
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
|
||||
final Path oldLogDir, final UUID clusterId) throws IOException {
|
||||
public ReplicationSourceManager(ReplicationQueues replicationQueues,
|
||||
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
|
||||
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException {
|
||||
//CopyOnWriteArrayList is thread-safe.
|
||||
//Generally, reading is more than modifying.
|
||||
this.sources = new CopyOnWriteArrayList<>();
|
||||
|
@ -162,6 +162,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
this.sleepBeforeFailover =
|
||||
conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
|
||||
this.clusterId = clusterId;
|
||||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
this.replicationTracker.registerListener(this);
|
||||
this.replicationPeers.getAllPeerIds();
|
||||
// It's preferable to failover 1 RS at a time, but with good zk servers
|
||||
|
@ -175,8 +176,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
tfb.setNameFormat("ReplicationExecutor-%d");
|
||||
tfb.setDaemon(true);
|
||||
this.executor.setThreadFactory(tfb.build());
|
||||
this.rand = new Random();
|
||||
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
|
||||
this.latestPaths = new HashSet<Path>();
|
||||
replicationForBulkLoadDataEnabled =
|
||||
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||
|
@ -243,7 +243,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* Adds a normal source per registered peer cluster and tries to process all
|
||||
* old region server wal queues
|
||||
*/
|
||||
protected void init() throws IOException, ReplicationException {
|
||||
void init() throws IOException, ReplicationException {
|
||||
for (String id : this.replicationPeers.getConnectedPeerIds()) {
|
||||
addSource(id);
|
||||
if (replicationForBulkLoadDataEnabled) {
|
||||
|
@ -267,13 +267,13 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @return the source that was created
|
||||
* @throws IOException
|
||||
*/
|
||||
protected ReplicationSourceInterface addSource(String id) throws IOException,
|
||||
ReplicationException {
|
||||
@VisibleForTesting
|
||||
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
|
||||
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
|
||||
ReplicationSourceInterface src =
|
||||
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
|
||||
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
|
||||
ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
|
||||
this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
|
||||
walFileLengthProvider);
|
||||
synchronized (this.walsById) {
|
||||
this.sources.add(src);
|
||||
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
||||
|
@ -330,7 +330,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* Get a copy of the wals of the first source on this rs
|
||||
* @return a sorted set of wal names
|
||||
*/
|
||||
protected Map<String, Map<String, SortedSet<String>>> getWALs() {
|
||||
@VisibleForTesting
|
||||
Map<String, Map<String, SortedSet<String>>> getWALs() {
|
||||
return Collections.unmodifiableMap(walsById);
|
||||
}
|
||||
|
||||
|
@ -338,7 +339,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* Get a copy of the wals of the recovered sources on this rs
|
||||
* @return a sorted set of wal names
|
||||
*/
|
||||
protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
|
||||
@VisibleForTesting
|
||||
Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
|
||||
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
|
||||
}
|
||||
|
||||
|
@ -364,12 +366,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @return the normal source for the give peer if it exists, otherwise null.
|
||||
*/
|
||||
public ReplicationSourceInterface getSource(String peerId) {
|
||||
for (ReplicationSourceInterface source: getSources()) {
|
||||
if (source.getPeerId().equals(peerId)) {
|
||||
return source;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -466,12 +463,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @return the created source
|
||||
* @throws IOException
|
||||
*/
|
||||
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
|
||||
final FileSystem fs, final ReplicationSourceManager manager,
|
||||
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
|
||||
final Server server, final String peerId, final UUID clusterId,
|
||||
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
|
||||
throws IOException {
|
||||
private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
|
||||
ReplicationSourceManager manager, ReplicationQueues replicationQueues,
|
||||
ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
|
||||
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException {
|
||||
RegionServerCoprocessorHost rsServerHost = null;
|
||||
TableDescriptors tableDescriptors = null;
|
||||
if (server instanceof HRegionServer) {
|
||||
|
@ -507,8 +503,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
MetricsSource metrics = new MetricsSource(peerId);
|
||||
// init replication source
|
||||
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
|
||||
clusterId, replicationEndpoint, metrics);
|
||||
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
|
||||
replicationEndpoint, walFileLengthProvider, metrics);
|
||||
|
||||
// init replication endpoint
|
||||
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
|
||||
|
@ -674,7 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// Wait a bit before transferring the queues, we may be shutting down.
|
||||
// This sleep may not be enough in some cases.
|
||||
try {
|
||||
Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
|
||||
Thread.sleep(sleepBeforeFailover +
|
||||
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while waiting before transferring a queue.");
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -688,7 +685,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
|
||||
while (peers != null && !peers.isEmpty()) {
|
||||
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
|
||||
peers.get(rand.nextInt(peers.size())));
|
||||
peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
|
||||
long sleep = sleepBeforeFailover/2;
|
||||
if (peer != null) {
|
||||
newQueues.put(peer.getFirst(), peer.getSecond());
|
||||
|
@ -748,7 +745,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// enqueue sources
|
||||
ReplicationSourceInterface src =
|
||||
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
|
||||
server, peerId, this.clusterId, peerConfig, peer);
|
||||
server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
|
||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
// see removePeer
|
||||
synchronized (oldsources) {
|
||||
|
|
|
@ -37,18 +37,18 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
|
||||
/**
|
||||
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
|
||||
|
@ -127,8 +127,8 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
public void run() {
|
||||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
|
||||
source.getWALFileLengthProvider(), source.getSourceMetrics())) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
|
@ -147,7 +147,7 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
currentPosition = entryStream.getPosition();
|
||||
entryStream.reset(); // reuse stream
|
||||
}
|
||||
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
|
||||
} catch (IOException e) { // stream related
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
|
@ -202,8 +202,9 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
// if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
private void handleEofException(Exception e) {
|
||||
if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
|
||||
private void handleEofException(IOException e) {
|
||||
if (e instanceof EOFException ||
|
||||
e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
|
||||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
|
|
|
@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -50,7 +50,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
|
||||
class WALEntryStream implements Closeable {
|
||||
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
|
||||
|
||||
private Reader reader;
|
||||
|
@ -59,24 +59,11 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
private Entry currentEntry;
|
||||
// position after reading current entry
|
||||
private long currentPosition = 0;
|
||||
private PriorityBlockingQueue<Path> logQueue;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private MetricsSource metrics;
|
||||
|
||||
/**
|
||||
* Create an entry stream over the given queue
|
||||
* @param logQueue the queue of WAL paths
|
||||
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
|
||||
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param metrics replication metrics
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
MetricsSource metrics)
|
||||
throws IOException {
|
||||
this(logQueue, fs, conf, 0, metrics);
|
||||
}
|
||||
private final PriorityBlockingQueue<Path> logQueue;
|
||||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private final WALFileLengthProvider walFileLengthProvider;
|
||||
private final MetricsSource metrics;
|
||||
|
||||
/**
|
||||
* Create an entry stream over the given queue at the given start position
|
||||
|
@ -88,51 +75,40 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
long startPosition, MetricsSource metrics) throws IOException {
|
||||
long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
|
||||
throws IOException {
|
||||
this.logQueue = logQueue;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.currentPosition = startPosition;
|
||||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there is another WAL {@link Entry}
|
||||
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
|
||||
*/
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
public boolean hasNext() throws IOException {
|
||||
if (currentEntry == null) {
|
||||
try {
|
||||
tryAdvanceEntry();
|
||||
} catch (Exception e) {
|
||||
throw new WALEntryStreamRuntimeException(e);
|
||||
}
|
||||
tryAdvanceEntry();
|
||||
}
|
||||
return currentEntry != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the next WAL entry in this stream
|
||||
* @throws WALEntryStreamRuntimeException if there was an IOException
|
||||
* @throws IOException
|
||||
* @throws NoSuchElementException if no more entries in the stream.
|
||||
*/
|
||||
@Override
|
||||
public Entry next() {
|
||||
if (!hasNext()) throw new NoSuchElementException();
|
||||
public Entry next() throws IOException {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
Entry save = currentEntry;
|
||||
currentEntry = null; // gets reloaded by hasNext()
|
||||
return save;
|
||||
}
|
||||
|
||||
/**
|
||||
* Not supported.
|
||||
*/
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -141,14 +117,6 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
closeReader();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the iterator over WAL entries in the queue.
|
||||
*/
|
||||
@Override
|
||||
public Iterator<Entry> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the position of the last Entry returned by next()
|
||||
*/
|
||||
|
@ -195,24 +163,27 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
|
||||
private void tryAdvanceEntry() throws IOException {
|
||||
if (checkReader()) {
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
|
||||
if (logQueue.size() > 1) { // log was rolled
|
||||
// Before dequeueing, we should always get one more attempt at reading.
|
||||
// This is in case more entries came in after we opened the reader,
|
||||
// and a new log was enqueued while we were reading. See HBASE-6758
|
||||
resetReader();
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) {
|
||||
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
|
||||
dequeueCurrentLog();
|
||||
if (openNextLog()) {
|
||||
readNextEntryAndSetPosition();
|
||||
}
|
||||
boolean beingWritten = readNextEntryAndSetPosition();
|
||||
if (currentEntry == null && !beingWritten) {
|
||||
// no more entries in this log file, and the file is already closed, i.e, rolled
|
||||
// Before dequeueing, we should always get one more attempt at reading.
|
||||
// This is in case more entries came in after we opened the reader, and the log is rolled
|
||||
// while we were reading. See HBASE-6758
|
||||
resetReader();
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) {
|
||||
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
|
||||
dequeueCurrentLog();
|
||||
if (openNextLog()) {
|
||||
readNextEntryAndSetPosition();
|
||||
}
|
||||
}
|
||||
} // no other logs, we've simply hit the end of the current open log. Do nothing
|
||||
}
|
||||
}
|
||||
// if currentEntry != null then just return
|
||||
// if currentEntry == null but the file is still being written, then we should not switch to
|
||||
// the next log either, just return here and try next time to see if there are more entries in
|
||||
// the current file
|
||||
}
|
||||
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
|
||||
}
|
||||
|
@ -270,15 +241,30 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
metrics.decrSizeOfLogQueue();
|
||||
}
|
||||
|
||||
private void readNextEntryAndSetPosition() throws IOException {
|
||||
/**
|
||||
* Returns whether the file is opened for writing.
|
||||
*/
|
||||
private boolean readNextEntryAndSetPosition() throws IOException {
|
||||
Entry readEntry = reader.next();
|
||||
long readerPos = reader.getPosition();
|
||||
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
|
||||
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
|
||||
// see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
|
||||
// data, so we need to make sure that we do not read beyond the committed file length.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
|
||||
fileLength.getAsLong() + ", but we have advanced to " + readerPos);
|
||||
}
|
||||
resetReader();
|
||||
return true;
|
||||
}
|
||||
if (readEntry != null) {
|
||||
metrics.incrLogEditsRead();
|
||||
metrics.incrLogReadInBytes(readerPos - currentPosition);
|
||||
}
|
||||
currentEntry = readEntry; // could be null
|
||||
setPosition(readerPos);
|
||||
return fileLength.isPresent();
|
||||
}
|
||||
|
||||
private void closeReader() throws IOException {
|
||||
|
@ -301,7 +287,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
Path nextPath = logQueue.peek();
|
||||
if (nextPath != null) {
|
||||
openReader(nextPath);
|
||||
if (reader != null) return true;
|
||||
if (reader != null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -408,14 +396,4 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static class WALEntryStreamRuntimeException extends RuntimeException {
|
||||
private static final long serialVersionUID = -6298201811259982568L;
|
||||
|
||||
public WALEntryStreamRuntimeException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Used by replication to prevent replicating unacked log entries. See
|
||||
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@FunctionalInterface
|
||||
public interface WALFileLengthProvider {
|
||||
|
||||
OptionalLong getLogFileSizeIfBeingWritten(Path path);
|
||||
}
|
|
@ -115,7 +115,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<WAL> getWALs() throws IOException {
|
||||
public List<WAL> getWALs() {
|
||||
if (wal == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -66,7 +67,7 @@ class DisabledWALProvider implements WALProvider {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<WAL> getWALs() throws IOException {
|
||||
public List<WAL> getWALs() {
|
||||
List<WAL> wals = new ArrayList<>(1);
|
||||
wals.add(disabled);
|
||||
return wals;
|
||||
|
@ -232,6 +233,11 @@ class DisabledWALProvider implements WALProvider {
|
|||
public String toString() {
|
||||
return "WAL disabled.";
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
|
||||
return OptionalLong.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -169,7 +169,7 @@ public class RegionGroupingProvider implements WALProvider {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<WAL> getWALs() throws IOException {
|
||||
public List<WAL> getWALs() {
|
||||
List<WAL> wals = new ArrayList<>();
|
||||
for (WALProvider provider : cached.values()) {
|
||||
wals.addAll(provider.getWALs());
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
@ -35,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
|
||||
|
@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface WAL extends Closeable {
|
||||
public interface WAL extends Closeable, WALFileLengthProvider {
|
||||
|
||||
/**
|
||||
* Registers WALActionsListener
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
|
@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|||
* Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class WALFactory {
|
||||
public class WALFactory implements WALFileLengthProvider {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(WALFactory.class);
|
||||
|
||||
|
@ -230,7 +232,7 @@ public class WALFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public List<WAL> getWALs() throws IOException {
|
||||
public List<WAL> getWALs() {
|
||||
return provider.getWALs();
|
||||
}
|
||||
|
||||
|
@ -450,4 +452,9 @@ public class WALFactory {
|
|||
public final WALProvider getMetaWALProvider() {
|
||||
return this.metaProvider.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
|
||||
return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public interface WALProvider {
|
|||
|
||||
/** @return the List of WALs that are used by this server
|
||||
*/
|
||||
List<WAL> getWALs() throws IOException;
|
||||
List<WAL> getWALs();
|
||||
|
||||
/**
|
||||
* persist outstanding WALs to storage and stop accepting new appends.
|
||||
|
@ -76,18 +76,20 @@ public interface WALProvider {
|
|||
*/
|
||||
void close() throws IOException;
|
||||
|
||||
// Writers are used internally. Users outside of the WAL should be relying on the
|
||||
// interface provided by WAL.
|
||||
interface Writer extends Closeable {
|
||||
void sync() throws IOException;
|
||||
void append(WAL.Entry entry) throws IOException;
|
||||
interface WriterBase extends Closeable {
|
||||
long getLength();
|
||||
}
|
||||
|
||||
interface AsyncWriter extends Closeable {
|
||||
// Writers are used internally. Users outside of the WAL should be relying on the
|
||||
// interface provided by WAL.
|
||||
interface Writer extends WriterBase {
|
||||
void sync() throws IOException;
|
||||
void append(WAL.Entry entry) throws IOException;
|
||||
}
|
||||
|
||||
interface AsyncWriter extends WriterBase {
|
||||
CompletableFuture<Long> sync();
|
||||
void append(WAL.Entry entry);
|
||||
long getLength();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
|
@ -42,16 +43,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
|||
String peerClusterId;
|
||||
Path currentPath;
|
||||
MetricsSource metrics;
|
||||
WALFileLengthProvider walFileLengthProvider;
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
|
||||
throws IOException {
|
||||
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
this.manager = manager;
|
||||
this.peerClusterId = peerClusterId;
|
||||
this.metrics = metrics;
|
||||
this.walFileLengthProvider = walFileLengthProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,4 +137,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
|||
@Override
|
||||
public void postShipEdits(List<Entry> entries, int batchSize) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public WALFileLengthProvider getWALFileLengthProvider() {
|
||||
return walFileLengthProvider;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -166,8 +167,8 @@ public class TestReplicationSource {
|
|||
testConf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
|
||||
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||
source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
|
||||
null, replicationEndpoint, null);
|
||||
source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null,
|
||||
replicationEndpoint, p -> OptionalLong.empty(), null);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Future<?> future = executor.submit(new Runnable() {
|
||||
|
||||
|
|
|
@ -61,9 +61,6 @@ import org.apache.hadoop.hbase.Stoppable;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -97,6 +94,9 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
|
||||
/**
|
||||
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
|
||||
|
@ -646,8 +646,8 @@ public abstract class TestReplicationSourceManager {
|
|||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
|
||||
throws IOException {
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
throw new IOException("Failing deliberately");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -42,13 +43,10 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -56,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -67,11 +66,8 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestWALEntryStream {
|
||||
|
||||
|
@ -84,8 +80,13 @@ public class TestWALEntryStream {
|
|||
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
||||
private static final HRegionInfo info =
|
||||
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
|
||||
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
private static NavigableMap<byte[], Integer> scopes;
|
||||
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
||||
|
||||
private static NavigableMap<byte[], Integer> getScopes() {
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(family, 1);
|
||||
return scopes;
|
||||
}
|
||||
|
||||
private WAL log;
|
||||
PriorityBlockingQueue<Path> walQueue;
|
||||
|
@ -103,10 +104,6 @@ public class TestWALEntryStream {
|
|||
|
||||
cluster = TEST_UTIL.getDFSCluster();
|
||||
fs = cluster.getFileSystem();
|
||||
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : htd.getFamiliesKeys()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -151,10 +148,10 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
int i = 0;
|
||||
for (WAL.Entry e : entryStream) {
|
||||
assertNotNull(e);
|
||||
while (entryStream.hasNext()) {
|
||||
assertNotNull(entryStream.next());
|
||||
i++;
|
||||
}
|
||||
assertEquals(nbRows, i);
|
||||
|
@ -176,10 +173,9 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testAppendsWithRolls() throws Exception {
|
||||
appendToLog();
|
||||
|
||||
long oldPos;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
// There's one edit in the log, read it. Reading past it needs to throw exception
|
||||
assertTrue(entryStream.hasNext());
|
||||
WAL.Entry entry = entryStream.next();
|
||||
|
@ -196,8 +192,8 @@ public class TestWALEntryStream {
|
|||
|
||||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
|
||||
log, new MetricsSource("1"))) {
|
||||
// Read the newly added entry, make sure we made progress
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
|
@ -210,8 +206,8 @@ public class TestWALEntryStream {
|
|||
log.rollWriter();
|
||||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
|
||||
log, new MetricsSource("1"))) {
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
|
@ -236,7 +232,7 @@ public class TestWALEntryStream {
|
|||
appendToLog("1");
|
||||
appendToLog("2");// 2
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
assertEquals("1", getRow(entryStream.next()));
|
||||
|
||||
appendToLog("3"); // 3 - comes in after reader opened
|
||||
|
@ -261,7 +257,7 @@ public class TestWALEntryStream {
|
|||
public void testNewEntriesWhileStreaming() throws Exception {
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
|
||||
// some new entries come in while we're streaming
|
||||
|
@ -284,7 +280,7 @@ public class TestWALEntryStream {
|
|||
long lastPosition = 0;
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
appendToLog("2");
|
||||
appendToLog("3");
|
||||
|
@ -292,7 +288,7 @@ public class TestWALEntryStream {
|
|||
}
|
||||
// next stream should picks up where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals("3", getRow(entryStream.next()));
|
||||
assertFalse(entryStream.hasNext()); // done
|
||||
|
@ -309,14 +305,14 @@ public class TestWALEntryStream {
|
|||
long lastPosition = 0;
|
||||
appendEntriesToLog(3);
|
||||
// read only one element
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
|
||||
log, new MetricsSource("1"))) {
|
||||
entryStream.next();
|
||||
lastPosition = entryStream.getPosition();
|
||||
}
|
||||
// there should still be two more entries from where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
|
||||
assertNotNull(entryStream.next());
|
||||
assertNotNull(entryStream.next());
|
||||
assertFalse(entryStream.hasNext());
|
||||
|
@ -327,7 +323,7 @@ public class TestWALEntryStream {
|
|||
@Test
|
||||
public void testEmptyStream() throws Exception {
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
@ -338,7 +334,7 @@ public class TestWALEntryStream {
|
|||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
|
@ -351,6 +347,7 @@ public class TestWALEntryStream {
|
|||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||
when(source.getSourceManager()).thenReturn(mockSourceManager);
|
||||
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
|
||||
when(source.getWALFileLengthProvider()).thenReturn(log);
|
||||
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
|
||||
walQueue, 0, getDummyFilter(), source);
|
||||
Path walPath = walQueue.peek();
|
||||
|
@ -425,10 +422,6 @@ public class TestWALEntryStream {
|
|||
};
|
||||
}
|
||||
|
||||
private ReplicationQueueInfo getQueueInfo() {
|
||||
return new ReplicationQueueInfo("1");
|
||||
}
|
||||
|
||||
class PathWatcher extends WALActionsListener.Base {
|
||||
|
||||
Path currentPath;
|
||||
|
@ -440,4 +433,30 @@ public class TestWALEntryStream {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
|
||||
appendToLog("1");
|
||||
appendToLog("2");
|
||||
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
|
||||
AtomicLong fileLength = new AtomicLong(size - 1);
|
||||
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
|
||||
p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
|
||||
assertTrue(entryStream.hasNext());
|
||||
assertNotNull(entryStream.next());
|
||||
// can not get log 2
|
||||
assertFalse(entryStream.hasNext());
|
||||
Thread.sleep(1000);
|
||||
entryStream.reset();
|
||||
// still can not get log 2
|
||||
assertFalse(entryStream.hasNext());
|
||||
|
||||
// can get log 2 now
|
||||
fileLength.set(size);
|
||||
entryStream.reset();
|
||||
assertTrue(entryStream.hasNext());
|
||||
assertNotNull(entryStream.next());
|
||||
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ public class IOTestProvider implements WALProvider {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<WAL> getWALs() throws IOException {
|
||||
public List<WAL> getWALs() {
|
||||
List<WAL> wals = new ArrayList<>(1);
|
||||
wals.add(log);
|
||||
return wals;
|
||||
|
|
Loading…
Reference in New Issue