HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin

This commit is contained in:
zhangduo 2017-09-14 17:26:36 +08:00
parent e7e43255b0
commit d90f77ab7d
25 changed files with 368 additions and 271 deletions

View File

@ -59,7 +59,7 @@ public class Threads {
* @param t thread to run
* @return Returns the passed Thread <code>t</code>.
*/
public static Thread setDaemonThreadRunning(final Thread t) {
public static <T extends Thread> T setDaemonThreadRunning(T t) {
return setDaemonThreadRunning(t, t.getName());
}
@ -69,8 +69,7 @@ public class Threads {
* @param name new name
* @return Returns the passed Thread <code>t</code>.
*/
public static Thread setDaemonThreadRunning(final Thread t,
final String name) {
public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
return setDaemonThreadRunning(t, name, null);
}
@ -78,12 +77,11 @@ public class Threads {
* Utility method that sets name, daemon status and starts passed thread.
* @param t thread to frob
* @param name new name
* @param handler A handler to set on the thread. Pass null if want to
* use default handler.
* @param handler A handler to set on the thread. Pass null if want to use default handler.
* @return Returns the passed Thread <code>t</code>.
*/
public static Thread setDaemonThreadRunning(final Thread t,
final String name, final UncaughtExceptionHandler handler) {
public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
UncaughtExceptionHandler handler) {
t.setName(name);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);

View File

@ -1586,7 +1586,7 @@ public class HRegionServer extends HasThread implements
// Save it in a file, this will allow to see if we crash
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
this.walFactory = setupWALAndReplication();
setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
@ -1855,13 +1855,12 @@ public class HRegionServer extends HasThread implements
/**
* Setup WAL log and replication if enabled.
* Replication setup is done in here because it wants to be hooked up to WAL.
* @return A WAL instance.
* @throws IOException
*/
private WALFactory setupWALAndReplication() throws IOException {
private void setupWALAndReplication() throws IOException {
// TODO Replication make assumptions here based on the default filesystem impl
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Path logDir = new Path(walRootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
@ -1875,7 +1874,7 @@ public class HRegionServer extends HasThread implements
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
// listeners the wal factory will add to wals it creates.
final List<WALActionsListener> listeners = new ArrayList<>();
List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(new MetricsWAL());
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
@ -1883,7 +1882,21 @@ public class HRegionServer extends HasThread implements
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}
return new WALFactory(conf, listeners, serverName.toString());
// There is a cyclic dependency between ReplicationSourceHandler and WALFactory.
// We use WALActionsListener to get the newly rolled WALs, so we need to get the
// WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
// ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
// So we here we need to construct WALFactory first, and then pass it to the initialize method
// of ReplicationSourceHandler.
WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
this.walFactory = factory;
if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory);
}
if (this.replicationSinkHandler != null &&
this.replicationSinkHandler != this.replicationSourceHandler) {
this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory);
}
}
public MetricsRegionServer getRegionServerMetrics() {
@ -2898,7 +2911,7 @@ public class HRegionServer extends HasThread implements
/**
* Load the replication service objects, if any
*/
static private void createNewReplicationInstance(Configuration conf,
private static void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
@ -2908,47 +2921,41 @@ public class HRegionServer extends HasThread implements
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSourceHandler;
server.replicationSourceHandler =
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
} else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
conf, server, walFs, walDir, oldWALDir);
server.replicationSourceHandler =
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname,
conf, server, walFs, walDir, oldWALDir);
}
}
static private ReplicationService newReplicationInstance(String classname,
Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
Path oldLogDir) throws IOException{
Class<?> clazz = null;
private static ReplicationService newReplicationInstance(String classname, Configuration conf,
HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException {
Class<? extends ReplicationService> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader);
clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Could not find class for " + classname);
}
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, walFs, logDir, oldLogDir);
return service;
// create an instance of the replication object, but do not initialize it here as we need to use
// WALFactory when initializing.
return ReflectionUtils.newInstance(clazz, conf);
}
/**

View File

@ -20,17 +20,17 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
/**
* Gateway to Cluster Replication.
* Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
* One such application is a cross-datacenter
* replication service that can keep two hbase clusters in sync.
* Gateway to Cluster Replication. Used by
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. One such application is a
* cross-datacenter replication service that can keep two hbase clusters in sync.
*/
@InterfaceAudience.Private
public interface ReplicationService {
@ -39,9 +39,8 @@ public interface ReplicationService {
* Initializes the replication service object.
* @throws IOException
*/
void initialize(
Server rs, FileSystem fs, Path logdir, Path oldLogDir
) throws IOException;
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir,
WALFileLengthProvider walFileLengthProvider) throws IOException;
/**
* Start replication services.

View File

@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.*;
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import com.lmax.disruptor.RingBuffer;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.management.MemoryType;
@ -29,6 +32,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -58,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
@ -68,6 +73,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.NullScope;
@ -75,9 +81,6 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.RingBuffer;
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
* WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
@ -105,7 +108,7 @@ import com.lmax.disruptor.RingBuffer;
* (Need to keep our own file lengths, not rely on HDFS).
*/
@InterfaceAudience.Private
public abstract class AbstractFSWAL<W> implements WAL {
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class);
@ -983,6 +986,28 @@ public abstract class AbstractFSWAL<W> implements WAL {
+ filenum + ")";
}
/**
* if the given {@code path} is being written currently, then return its length.
* <p>
* This is used by replication to prevent replicating unacked log entries. See
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
*/
@Override
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
rollWriterLock.lock();
try {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
} finally {
rollWriterLock.unlock();
}
}
/**
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment

View File

@ -707,8 +707,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
this.writer.close();
this.writer = null;
if (this.writer != null) {
this.writer.close();
this.writer = null;
}
closeExecutor.shutdown();
IOException error = new IOException("WAL has been closed");
syncFutures.forEach(f -> f.done(f.getTxid(), error));

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
@ -46,8 +45,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -62,6 +65,9 @@ import org.apache.htrace.NullScope;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* The default implementation of FSWAL.

View File

@ -50,13 +50,12 @@ public class RecoveredReplicationSource extends ReplicationSource {
private String actualPeerId;
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics) throws IOException {
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
clusterId, replicationEndpoint, metrics);
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -85,6 +86,7 @@ public class Replication extends WALActionsListener.Base implements
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
@ -93,9 +95,8 @@ public class Replication extends WALActionsListener.Base implements
* @param oldLogDir directory where logs are archived
* @throws IOException
*/
public Replication(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException{
initialize(server, fs, logDir, oldLogDir);
public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty());
}
/**
@ -104,8 +105,8 @@ public class Replication extends WALActionsListener.Base implements
public Replication() {
}
public void initialize(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir) throws IOException {
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
WALFileLengthProvider walFileLengthProvider) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
@ -144,8 +145,8 @@ public class Replication extends WALActionsListener.Base implements
throw new IOException("Could not read cluster id", ke);
}
this.replicationManager =
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
conf, this.server, fs, logDir, oldLogDir, clusterId);
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);

View File

@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -62,6 +60,8 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
* Class that handles the source of a replication stream.
@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* </p>
*
*/
@InterfaceAudience.Private
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
@ -123,6 +122,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
private WALFileLengthProvider walFileLengthProvider;
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
@ -147,12 +147,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @throws IOException
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics)
throws IOException {
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
@ -181,6 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@ -560,4 +559,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize);
}
@Override
public WALFileLengthProvider getWALFileLengthProvider() {
return walFileLengthProvider;
}
}

View File

@ -53,11 +53,10 @@ public interface ReplicationSourceInterface {
* @param clusterId
* @throws IOException
*/
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics) throws IOException;
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
@ -146,6 +145,11 @@ public interface ReplicationSourceInterface {
*/
ReplicationSourceManager getSourceManager();
/**
* @return the wal file length provider
*/
WALFileLengthProvider getWALFileLengthProvider();
/**
* Try to throttle when the peer config with a bandwidth
* @param batchSize entries size will be pushed

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -31,7 +28,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -40,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -68,10 +65,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
@ -116,12 +116,12 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Path logDir;
// Path to the wal archive
private final Path oldLogDir;
private final WALFileLengthProvider walFileLengthProvider;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
private Connection connection;
@ -141,10 +141,10 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) throws IOException {
public ReplicationSourceManager(ReplicationQueues replicationQueues,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<>();
@ -162,6 +162,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sleepBeforeFailover =
conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationTracker.registerListener(this);
this.replicationPeers.getAllPeerIds();
// It's preferable to failover 1 RS at a time, but with good zk servers
@ -175,8 +176,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
@ -243,7 +243,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@ -267,13 +267,13 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the source that was created
* @throws IOException
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
@VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
walFileLengthProvider);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@ -330,7 +330,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names
*/
protected Map<String, Map<String, SortedSet<String>>> getWALs() {
@VisibleForTesting
Map<String, Map<String, SortedSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@ -338,7 +339,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
@VisibleForTesting
Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
@ -364,12 +366,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
for (ReplicationSourceInterface source: getSources()) {
if (source.getPeerId().equals(peerId)) {
return source;
}
}
return null;
return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
}
@VisibleForTesting
@ -466,12 +463,11 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the created source
* @throws IOException
*/
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Server server, final String peerId, final UUID clusterId,
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
throws IOException {
private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
ReplicationSourceManager manager, ReplicationQueues replicationQueues,
ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
WALFileLengthProvider walFileLengthProvider) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@ -507,8 +503,8 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
clusterId, replicationEndpoint, metrics);
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
@ -674,7 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
Thread.sleep(sleepBeforeFailover +
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
@ -688,7 +685,7 @@ public class ReplicationSourceManager implements ReplicationListener {
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
while (peers != null && !peers.isEmpty()) {
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
peers.get(rand.nextInt(peers.size())));
peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
long sleep = sleepBeforeFailover/2;
if (peer != null) {
newQueues.put(peer.getFirst(), peer.getSecond());
@ -748,7 +745,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// enqueue sources
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer);
server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {

View File

@ -37,18 +37,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
@ -127,8 +127,8 @@ public class ReplicationSourceWALReader extends Thread {
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
source.getWALFileLengthProvider(), source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) {
continue;
@ -147,7 +147,7 @@ public class ReplicationSourceWALReader extends Thread {
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
}
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
} catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
@ -202,8 +202,9 @@ public class ReplicationSourceWALReader extends Thread {
// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(Exception e) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
private void handleEofException(IOException e) {
if (e instanceof EOFException ||
e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
@ -50,7 +50,7 @@ import org.apache.hadoop.ipc.RemoteException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
class WALEntryStream implements Closeable {
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
private Reader reader;
@ -59,24 +59,11 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
private Entry currentEntry;
// position after reading current entry
private long currentPosition = 0;
private PriorityBlockingQueue<Path> logQueue;
private FileSystem fs;
private Configuration conf;
private MetricsSource metrics;
/**
* Create an entry stream over the given queue
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
MetricsSource metrics)
throws IOException {
this(logQueue, fs, conf, 0, metrics);
}
private final PriorityBlockingQueue<Path> logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALFileLengthProvider walFileLengthProvider;
private final MetricsSource metrics;
/**
* Create an entry stream over the given queue at the given start position
@ -88,51 +75,40 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
long startPosition, MetricsSource metrics) throws IOException {
long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
this.metrics = metrics;
}
/**
* @return true if there is another WAL {@link Entry}
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
*/
@Override
public boolean hasNext() {
public boolean hasNext() throws IOException {
if (currentEntry == null) {
try {
tryAdvanceEntry();
} catch (Exception e) {
throw new WALEntryStreamRuntimeException(e);
}
tryAdvanceEntry();
}
return currentEntry != null;
}
/**
* @return the next WAL entry in this stream
* @throws WALEntryStreamRuntimeException if there was an IOException
* @throws IOException
* @throws NoSuchElementException if no more entries in the stream.
*/
@Override
public Entry next() {
if (!hasNext()) throw new NoSuchElementException();
public Entry next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
}
/**
* Not supported.
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
/**
* {@inheritDoc}
*/
@ -141,14 +117,6 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
closeReader();
}
/**
* @return the iterator over WAL entries in the queue.
*/
@Override
public Iterator<Entry> iterator() {
return this;
}
/**
* @return the position of the last Entry returned by next()
*/
@ -195,24 +163,27 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
readNextEntryAndSetPosition();
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
if (logQueue.size() > 1) { // log was rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader,
// and a new log was enqueued while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
}
boolean beingWritten = readNextEntryAndSetPosition();
if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
}
}
} // no other logs, we've simply hit the end of the current open log. Do nothing
}
}
// if currentEntry != null then just return
// if currentEntry == null but the file is still being written, then we should not switch to
// the next log either, just return here and try next time to see if there are more entries in
// the current file
}
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
@ -270,15 +241,30 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
metrics.decrSizeOfLogQueue();
}
private void readNextEntryAndSetPosition() throws IOException {
/**
* Returns whether the file is opened for writing.
*/
private boolean readNextEntryAndSetPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
// see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
// data, so we need to make sure that we do not read beyond the committed file length.
if (LOG.isDebugEnabled()) {
LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
fileLength.getAsLong() + ", but we have advanced to " + readerPos);
}
resetReader();
return true;
}
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
return fileLength.isPresent();
}
private void closeReader() throws IOException {
@ -301,7 +287,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
Path nextPath = logQueue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) return true;
if (reader != null) {
return true;
}
}
return false;
}
@ -408,14 +396,4 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
}
return size;
}
@InterfaceAudience.Private
public static class WALEntryStreamRuntimeException extends RuntimeException {
private static final long serialVersionUID = -6298201811259982568L;
public WALEntryStreamRuntimeException(Exception e) {
super(e);
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.OptionalLong;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used by replication to prevent replicating unacked log entries. See
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
*/
@InterfaceAudience.Private
@FunctionalInterface
public interface WALFileLengthProvider {
OptionalLong getLogFileSizeIfBeingWritten(Path path);
}

View File

@ -115,7 +115,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
@Override
public List<WAL> getWALs() throws IOException {
public List<WAL> getWALs() {
if (wal == null) {
return Collections.emptyList();
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@ -66,7 +67,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public List<WAL> getWALs() throws IOException {
public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>(1);
wals.add(disabled);
return wals;
@ -232,6 +233,11 @@ class DisabledWALProvider implements WALProvider {
public String toString() {
return "WAL disabled.";
}
@Override
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
return OptionalLong.empty();
}
}
@Override

View File

@ -169,7 +169,7 @@ public class RegionGroupingProvider implements WALProvider {
}
@Override
public List<WAL> getWALs() throws IOException {
public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>();
for (WALProvider provider : cached.values()) {
wals.addAll(provider.getWALs());

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
@ -35,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface WAL extends Closeable {
public interface WAL extends Closeable, WALFileLengthProvider {
/**
* Registers WALActionsListener

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@ -38,6 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
* Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
*/
@InterfaceAudience.Private
public class WALFactory {
public class WALFactory implements WALFileLengthProvider {
private static final Log LOG = LogFactory.getLog(WALFactory.class);
@ -230,7 +232,7 @@ public class WALFactory {
}
}
public List<WAL> getWALs() throws IOException {
public List<WAL> getWALs() {
return provider.getWALs();
}
@ -450,4 +452,9 @@ public class WALFactory {
public final WALProvider getMetaWALProvider() {
return this.metaProvider.get();
}
@Override
public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
}
}

View File

@ -60,7 +60,7 @@ public interface WALProvider {
/** @return the List of WALs that are used by this server
*/
List<WAL> getWALs() throws IOException;
List<WAL> getWALs();
/**
* persist outstanding WALs to storage and stop accepting new appends.
@ -76,18 +76,20 @@ public interface WALProvider {
*/
void close() throws IOException;
// Writers are used internally. Users outside of the WAL should be relying on the
// interface provided by WAL.
interface Writer extends Closeable {
void sync() throws IOException;
void append(WAL.Entry entry) throws IOException;
interface WriterBase extends Closeable {
long getLength();
}
interface AsyncWriter extends Closeable {
// Writers are used internally. Users outside of the WAL should be relying on the
// interface provided by WAL.
interface Writer extends WriterBase {
void sync() throws IOException;
void append(WAL.Entry entry) throws IOException;
}
interface AsyncWriter extends WriterBase {
CompletableFuture<Long> sync();
void append(WAL.Entry entry);
long getLength();
}
/**

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -42,16 +43,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
String peerClusterId;
Path currentPath;
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
throws IOException {
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
}
@Override
@ -135,4 +137,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
}
@Override
public WALFileLengthProvider getWALFileLengthProvider() {
return walFileLengthProvider;
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -166,8 +167,8 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
null, replicationEndpoint, null);
source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null,
replicationEndpoint, p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {

View File

@ -61,9 +61,6 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -97,6 +94,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
/**
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
@ -646,8 +646,8 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
throws IOException {
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");
}
}

View File

@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@ -42,13 +43,10 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -56,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -67,11 +66,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
@ -84,8 +80,13 @@ public class TestWALEntryStream {
private static final byte[] qualifier = Bytes.toBytes("qualifier");
private static final HRegionInfo info =
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
private static NavigableMap<byte[], Integer> scopes;
private static final NavigableMap<byte[], Integer> scopes = getScopes();
private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
return scopes;
}
private WAL log;
PriorityBlockingQueue<Path> walQueue;
@ -103,10 +104,6 @@ public class TestWALEntryStream {
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
}
@AfterClass
@ -151,10 +148,10 @@ public class TestWALEntryStream {
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
int i = 0;
for (WAL.Entry e : entryStream) {
assertNotNull(e);
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
@ -176,10 +173,9 @@ public class TestWALEntryStream {
@Test
public void testAppendsWithRolls() throws Exception {
appendToLog();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
@ -196,8 +192,8 @@ public class TestWALEntryStream {
appendToLog();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@ -210,8 +206,8 @@ public class TestWALEntryStream {
log.rollWriter();
appendToLog();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@ -236,7 +232,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@ -261,7 +257,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@ -284,7 +280,7 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@ -292,7 +288,7 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@ -309,14 +305,14 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendEntriesToLog(3);
// read only one element
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
log, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@ -327,7 +323,7 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
@ -338,7 +334,7 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
@ -351,6 +347,7 @@ public class TestWALEntryStream {
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();
@ -425,10 +422,6 @@ public class TestWALEntryStream {
};
}
private ReplicationQueueInfo getQueueInfo() {
return new ReplicationQueueInfo("1");
}
class PathWatcher extends WALActionsListener.Base {
Path currentPath;
@ -440,4 +433,30 @@ public class TestWALEntryStream {
}
}
@Test
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1");
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2
assertFalse(entryStream.hasNext());
Thread.sleep(1000);
entryStream.reset();
// still can not get log 2
assertFalse(entryStream.hasNext());
// can get log 2 now
fileLength.set(size);
entryStream.reset();
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
}
}
}

View File

@ -107,7 +107,7 @@ public class IOTestProvider implements WALProvider {
}
@Override
public List<WAL> getWALs() throws IOException {
public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>(1);
wals.add(log);
return wals;