From 4341c3f554cf85e73d3bb536bdda33a83f463f16 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 14 Sep 2017 17:26:36 +0800 Subject: [PATCH] HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin --- .../org/apache/hadoop/hbase/util/Threads.java | 12 +- .../hbase/regionserver/HRegionServer.java | 71 +++++----- .../regionserver/ReplicationService.java | 17 ++- .../hbase/regionserver/wal/AbstractFSWAL.java | 35 ++++- .../hbase/regionserver/wal/AsyncFSWAL.java | 6 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 12 +- .../RecoveredReplicationSource.java | 11 +- .../replication/regionserver/Replication.java | 15 +- .../regionserver/ReplicationSource.java | 22 +-- .../ReplicationSourceInterface.java | 14 +- .../ReplicationSourceManager.java | 71 +++++----- .../ReplicationSourceWALReader.java | 25 ++-- .../regionserver/WALEntryStream.java | 130 ++++++++---------- .../regionserver/WALFileLengthProvider.java | 34 +++++ .../hbase/wal/AbstractFSWALProvider.java | 2 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 8 +- .../hbase/wal/RegionGroupingProvider.java | 2 +- .../java/org/apache/hadoop/hbase/wal/WAL.java | 6 +- .../apache/hadoop/hbase/wal/WALFactory.java | 11 +- .../apache/hadoop/hbase/wal/WALProvider.java | 18 +-- .../replication/ReplicationSourceDummy.java | 13 +- .../replication/TestReplicationSource.java | 5 +- .../TestReplicationSourceManager.java | 10 +- .../regionserver/TestWALEntryStream.java | 87 +++++++----- .../hadoop/hbase/wal/IOTestProvider.java | 2 +- 25 files changed, 368 insertions(+), 271 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 35bf2b749bb..b39a5e8fd79 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -59,7 +59,7 @@ public class Threads { * @param t thread to run * @return Returns the passed Thread t. */ - public static Thread setDaemonThreadRunning(final Thread t) { + public static T setDaemonThreadRunning(T t) { return setDaemonThreadRunning(t, t.getName()); } @@ -69,8 +69,7 @@ public class Threads { * @param name new name * @return Returns the passed Thread t. */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name) { + public static T setDaemonThreadRunning(T t, String name) { return setDaemonThreadRunning(t, name, null); } @@ -78,12 +77,11 @@ public class Threads { * Utility method that sets name, daemon status and starts passed thread. * @param t thread to frob * @param name new name - * @param handler A handler to set on the thread. Pass null if want to - * use default handler. + * @param handler A handler to set on the thread. Pass null if want to use default handler. * @return Returns the passed Thread t. */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name, final UncaughtExceptionHandler handler) { + public static T setDaemonThreadRunning(T t, String name, + UncaughtExceptionHandler handler) { t.setName(name); if (handler != null) { t.setUncaughtExceptionHandler(handler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 62987c00c5b..f648c2fbff0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1586,7 +1586,7 @@ public class HRegionServer extends HasThread implements // Save it in a file, this will allow to see if we crash ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); - this.walFactory = setupWALAndReplication(); + setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); @@ -1855,13 +1855,12 @@ public class HRegionServer extends HasThread implements /** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. - * @return A WAL instance. * @throws IOException */ - private WALFactory setupWALAndReplication() throws IOException { + private void setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl - final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); Path logDir = new Path(walRootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); @@ -1875,7 +1874,7 @@ public class HRegionServer extends HasThread implements createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir); // listeners the wal factory will add to wals it creates. - final List listeners = new ArrayList<>(); + List listeners = new ArrayList<>(); listeners.add(new MetricsWAL()); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { @@ -1883,7 +1882,21 @@ public class HRegionServer extends HasThread implements listeners.add(this.replicationSourceHandler.getWALActionsListener()); } - return new WALFactory(conf, listeners, serverName.toString()); + // There is a cyclic dependency between ReplicationSourceHandler and WALFactory. + // We use WALActionsListener to get the newly rolled WALs, so we need to get the + // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then + // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written. + // So we here we need to construct WALFactory first, and then pass it to the initialize method + // of ReplicationSourceHandler. + WALFactory factory = new WALFactory(conf, listeners, serverName.toString()); + this.walFactory = factory; + if (this.replicationSourceHandler != null) { + this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory); + } + if (this.replicationSinkHandler != null && + this.replicationSinkHandler != this.replicationSourceHandler) { + this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory); + } } public MetricsRegionServer getRegionServerMetrics() { @@ -2898,7 +2911,7 @@ public class HRegionServer extends HasThread implements /** * Load the replication service objects, if any */ - static private void createNewReplicationInstance(Configuration conf, + private static void createNewReplicationInstance(Configuration conf, HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{ if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || @@ -2908,47 +2921,41 @@ public class HRegionServer extends HasThread implements // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); // read in the name of the sink replication class from the config file. String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); // If both the sink and the source class names are the same, then instantiate // only one object. if (sourceClassname.equals(sinkClassname)) { - server.replicationSourceHandler = (ReplicationSourceService) - newReplicationInstance(sourceClassname, - conf, server, walFs, walDir, oldWALDir); - server.replicationSinkHandler = (ReplicationSinkService) - server.replicationSourceHandler; + server.replicationSourceHandler = + (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, + walDir, oldWALDir); + server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { - server.replicationSourceHandler = (ReplicationSourceService) - newReplicationInstance(sourceClassname, - conf, server, walFs, walDir, oldWALDir); - server.replicationSinkHandler = (ReplicationSinkService) - newReplicationInstance(sinkClassname, - conf, server, walFs, walDir, oldWALDir); + server.replicationSourceHandler = + (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, + walDir, oldWALDir); + server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname, + conf, server, walFs, walDir, oldWALDir); } } - static private ReplicationService newReplicationInstance(String classname, - Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, - Path oldLogDir) throws IOException{ - - Class clazz = null; + private static ReplicationService newReplicationInstance(String classname, Configuration conf, + HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException { + Class clazz = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - clazz = Class.forName(classname, true, classLoader); + clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class); } catch (java.lang.ClassNotFoundException nfe) { throw new IOException("Could not find class for " + classname); } - // create an instance of the replication object. - ReplicationService service = (ReplicationService) - ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir); - return service; + // create an instance of the replication object, but do not initialize it here as we need to use + // WALFactory when initializing. + return ReflectionUtils.newInstance(clazz, conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index d88450a144e..f3bc18869a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -20,17 +20,17 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; /** - * Gateway to Cluster Replication. - * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. - * One such application is a cross-datacenter - * replication service that can keep two hbase clusters in sync. + * Gateway to Cluster Replication. Used by + * {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. One such application is a + * cross-datacenter replication service that can keep two hbase clusters in sync. */ @InterfaceAudience.Private public interface ReplicationService { @@ -39,9 +39,8 @@ public interface ReplicationService { * Initializes the replication service object. * @throws IOException */ - void initialize( - Server rs, FileSystem fs, Path logdir, Path oldLogDir - ) throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, + WALFileLengthProvider walFileLengthProvider) throws IOException; /** * Start replication services. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 8b996761554..815710879a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.*; +import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import com.lmax.disruptor.RingBuffer; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.MemoryType; @@ -29,6 +32,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -58,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -68,6 +73,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; @@ -75,9 +81,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.RingBuffer; - /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. @@ -105,7 +108,7 @@ import com.lmax.disruptor.RingBuffer; * (Need to keep our own file lengths, not rely on HDFS). */ @InterfaceAudience.Private -public abstract class AbstractFSWAL implements WAL { +public abstract class AbstractFSWAL implements WAL { private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class); @@ -983,6 +986,28 @@ public abstract class AbstractFSWAL implements WAL { + filenum + ")"; } + /** + * if the given {@code path} is being written currently, then return its length. + *

+ * This is used by replication to prevent replicating unacked log entries. See + * https://issues.apache.org/jira/browse/HBASE-14004 for more details. + */ + @Override + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + rollWriterLock.lock(); + try { + Path currentPath = getOldPath(); + if (path.equals(currentPath)) { + W writer = this.writer; + return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); + } else { + return OptionalLong.empty(); + } + } finally { + rollWriterLock.unlock(); + } + } + /** * NOTE: This append, at a time that is usually after this call returns, starts an mvcc * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 7e91f8ccf3d..42183ec1c3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -707,8 +707,10 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); - this.writer.close(); - this.writer = null; + if (this.writer != null) { + this.writer.close(); + this.writer = null; + } closeExecutor.shutdown(); IOException error = new IOException("WAL has been closed"); syncFutures.forEach(f -> f.done(f.getTxid(), error)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 729813729d7..7e0fc374110 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -46,8 +45,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -62,6 +65,9 @@ import org.apache.htrace.NullScope; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * The default implementation of FSWAL. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 35948686ebb..248a52a3093 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -50,13 +50,12 @@ public class RecoveredReplicationSource extends ReplicationSource { private String actualPeerId; @Override - public void init(final Configuration conf, final FileSystem fs, - final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) throws IOException { + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode, - clusterId, replicationEndpoint, metrics); + clusterId, replicationEndpoint, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9fd1a87dcdd..d26f2536b29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -85,6 +86,7 @@ public class Replication extends WALActionsListener.Base implements private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + /** * Instantiate the replication management (if rep is enabled). * @param server Hosting server @@ -93,9 +95,8 @@ public class Replication extends WALActionsListener.Base implements * @param oldLogDir directory where logs are archived * @throws IOException */ - public Replication(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException{ - initialize(server, fs, logDir, oldLogDir); + public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException { + initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty()); } /** @@ -104,8 +105,8 @@ public class Replication extends WALActionsListener.Base implements public Replication() { } - public void initialize(final Server server, final FileSystem fs, - final Path logDir, final Path oldLogDir) throws IOException { + public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, + WALFileLengthProvider walFileLengthProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf); @@ -144,8 +145,8 @@ public class Replication extends WALActionsListener.Base implements throw new IOException("Could not read cluster id", ke); } this.replicationManager = - new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, - conf, this.server, fs, logDir, oldLogDir, clusterId); + new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, + this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6c968524b81..d16a68fa2f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -62,6 +60,8 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + /** * Class that handles the source of a replication stream. @@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; * A stream is considered down when we cannot contact a region server on the * peer cluster for more than 55 seconds by default. *

- * */ @InterfaceAudience.Private public class ReplicationSource extends Thread implements ReplicationSourceInterface { @@ -123,6 +122,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private ReplicationThrottler throttler; private long defaultBandwidth; private long currentBandwidth; + private WALFileLengthProvider walFileLengthProvider; protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); @@ -147,12 +147,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @throws IOException */ @Override - public void init(final Configuration conf, final FileSystem fs, - final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) - throws IOException { + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -181,6 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); + this.walFileLengthProvider = walFileLengthProvider; LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); } @@ -560,4 +559,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf totalReplicatedEdits.addAndGet(entries.size()); totalBufferUsed.addAndGet(-batchSize); } + + @Override + public WALFileLengthProvider getWALFileLengthProvider() { + return walFileLengthProvider; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index da89aba3279..066f7996c4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -53,11 +53,10 @@ public interface ReplicationSourceInterface { * @param clusterId * @throws IOException */ - public void init(final Configuration conf, final FileSystem fs, - final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final Stoppable stopper, - final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) throws IOException; + void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate @@ -146,6 +145,11 @@ public interface ReplicationSourceInterface { */ ReplicationSourceManager getSourceManager(); + /** + * @return the wal file length provider + */ + WALFileLengthProvider getWALFileLengthProvider(); + /** * Try to throttle when the peer config with a bandwidth * @param batchSize entries size will be pushed diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5b54ce00d94..609274f3262 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -31,7 +28,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -40,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -68,10 +65,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This class is responsible to manage all the replication * sources. There are two classes of sources: @@ -116,12 +116,12 @@ public class ReplicationSourceManager implements ReplicationListener { private final Path logDir; // Path to the wal archive private final Path oldLogDir; + private final WALFileLengthProvider walFileLengthProvider; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; - private final Random rand; private final boolean replicationForBulkLoadDataEnabled; private Connection connection; @@ -141,10 +141,10 @@ public class ReplicationSourceManager implements ReplicationListener { * @param oldLogDir the directory where old logs are archived * @param clusterId */ - public ReplicationSourceManager(final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, - final Configuration conf, final Server server, final FileSystem fs, final Path logDir, - final Path oldLogDir, final UUID clusterId) throws IOException { + public ReplicationSourceManager(ReplicationQueues replicationQueues, + ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, + Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, + WALFileLengthProvider walFileLengthProvider) throws IOException { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<>(); @@ -162,6 +162,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds this.clusterId = clusterId; + this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers @@ -175,8 +176,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.rand = new Random(); - this.latestPaths = Collections.synchronizedSet(new HashSet()); + this.latestPaths = new HashSet(); replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); @@ -243,7 +243,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Adds a normal source per registered peer cluster and tries to process all * old region server wal queues */ - protected void init() throws IOException, ReplicationException { + void init() throws IOException, ReplicationException { for (String id : this.replicationPeers.getConnectedPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { @@ -267,13 +267,13 @@ public class ReplicationSourceManager implements ReplicationListener { * @return the source that was created * @throws IOException */ - protected ReplicationSourceInterface addSource(String id) throws IOException, - ReplicationException { + @VisibleForTesting + ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getConnectedPeer(id); - ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, server, id, this.clusterId, peerConfig, peer); + ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, + this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer, + walFileLengthProvider); synchronized (this.walsById) { this.sources.add(src); Map> walsByGroup = new HashMap<>(); @@ -330,7 +330,8 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the first source on this rs * @return a sorted set of wal names */ - protected Map>> getWALs() { + @VisibleForTesting + Map>> getWALs() { return Collections.unmodifiableMap(walsById); } @@ -338,7 +339,8 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the recovered sources on this rs * @return a sorted set of wal names */ - protected Map>> getWalsByIdRecoveredQueues() { + @VisibleForTesting + Map>> getWalsByIdRecoveredQueues() { return Collections.unmodifiableMap(walsByIdRecoveredQueues); } @@ -364,12 +366,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @return the normal source for the give peer if it exists, otherwise null. */ public ReplicationSourceInterface getSource(String peerId) { - for (ReplicationSourceInterface source: getSources()) { - if (source.getPeerId().equals(peerId)) { - return source; - } - } - return null; + return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); } @VisibleForTesting @@ -466,12 +463,11 @@ public class ReplicationSourceManager implements ReplicationListener { * @return the created source * @throws IOException */ - protected ReplicationSourceInterface getReplicationSource(final Configuration conf, - final FileSystem fs, final ReplicationSourceManager manager, - final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, - final Server server, final String peerId, final UUID clusterId, - final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) - throws IOException { + private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, + ReplicationSourceManager manager, ReplicationQueues replicationQueues, + ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, + ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer, + WALFileLengthProvider walFileLengthProvider) throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -507,8 +503,8 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, - clusterId, replicationEndpoint, metrics); + src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, + replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), @@ -674,7 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Wait a bit before transferring the queues, we may be shutting down. // This sleep may not be enough in some cases. try { - Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover)); + Thread.sleep(sleepBeforeFailover + + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting before transferring a queue."); Thread.currentThread().interrupt(); @@ -688,7 +685,7 @@ public class ReplicationSourceManager implements ReplicationListener { List peers = rq.getUnClaimedQueueIds(rsZnode); while (peers != null && !peers.isEmpty()) { Pair> peer = this.rq.claimQueue(rsZnode, - peers.get(rand.nextInt(peers.size()))); + peers.get(ThreadLocalRandom.current().nextInt(peers.size()))); long sleep = sleepBeforeFailover/2; if (peer != null) { newQueues.put(peer.getFirst(), peer.getSecond()); @@ -748,7 +745,7 @@ public class ReplicationSourceManager implements ReplicationListener { // enqueue sources ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - server, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bb5abe9c29d..bb993c657dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -37,18 +37,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue @@ -127,8 +127,8 @@ public class ReplicationSourceWALReader extends Thread { public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, + source.getWALFileLengthProvider(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!checkQuota()) { continue; @@ -147,7 +147,7 @@ public class ReplicationSourceWALReader extends Thread { currentPosition = entryStream.getPosition(); entryStream.reset(); // reuse stream } - } catch (IOException | WALEntryStreamRuntimeException e) { // stream related + } catch (IOException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { LOG.debug("Failed to read stream of replication entries: " + e); sleepMultiplier++; @@ -202,8 +202,9 @@ public class ReplicationSourceWALReader extends Thread { // if we get an EOF due to a zero-length log, and there are other logs in queue // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log - private void handleEofException(Exception e) { - if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) { + private void handleEofException(IOException e) { + if (e instanceof EOFException || + e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 54511aebcdb..3be4ca46faa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.OptionalLong; import java.util.concurrent.PriorityBlockingQueue; import org.apache.commons.logging.Log; @@ -50,7 +50,7 @@ import org.apache.hadoop.ipc.RemoteException; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class WALEntryStream implements Iterator, Closeable, Iterable { +class WALEntryStream implements Closeable { private static final Log LOG = LogFactory.getLog(WALEntryStream.class); private Reader reader; @@ -59,24 +59,11 @@ public class WALEntryStream implements Iterator, Closeable, Iterable logQueue; - private FileSystem fs; - private Configuration conf; - private MetricsSource metrics; - - /** - * Create an entry stream over the given queue - * @param logQueue the queue of WAL paths - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream - * @param conf {@link Configuration} to use to create {@link Reader} for this stream - * @param metrics replication metrics - * @throws IOException - */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, - MetricsSource metrics) - throws IOException { - this(logQueue, fs, conf, 0, metrics); - } + private final PriorityBlockingQueue logQueue; + private final FileSystem fs; + private final Configuration conf; + private final WALFileLengthProvider walFileLengthProvider; + private final MetricsSource metrics; /** * Create an entry stream over the given queue at the given start position @@ -88,51 +75,40 @@ public class WALEntryStream implements Iterator, Closeable, Iterable logQueue, FileSystem fs, Configuration conf, - long startPosition, MetricsSource metrics) throws IOException { + long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.logQueue = logQueue; this.fs = fs; this.conf = conf; this.currentPosition = startPosition; + this.walFileLengthProvider = walFileLengthProvider; this.metrics = metrics; } /** * @return true if there is another WAL {@link Entry} - * @throws WALEntryStreamRuntimeException if there was an Exception while reading */ - @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { if (currentEntry == null) { - try { - tryAdvanceEntry(); - } catch (Exception e) { - throw new WALEntryStreamRuntimeException(e); - } + tryAdvanceEntry(); } return currentEntry != null; } /** * @return the next WAL entry in this stream - * @throws WALEntryStreamRuntimeException if there was an IOException + * @throws IOException * @throws NoSuchElementException if no more entries in the stream. */ - @Override - public Entry next() { - if (!hasNext()) throw new NoSuchElementException(); + public Entry next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } Entry save = currentEntry; currentEntry = null; // gets reloaded by hasNext() return save; } - /** - * Not supported. - */ - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - /** * {@inheritDoc} */ @@ -141,14 +117,6 @@ public class WALEntryStream implements Iterator, Closeable, Iterable iterator() { - return this; - } - /** * @return the position of the last Entry returned by next() */ @@ -195,24 +163,27 @@ public class WALEntryStream implements Iterator, Closeable, Iterable 1) { // log was rolled - // Before dequeueing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, - // and a new log was enqueued while we were reading. See HBASE-6758 - resetReader(); - readNextEntryAndSetPosition(); - if (currentEntry == null) { - if (checkAllBytesParsed()) { // now we're certain we're done with this log file - dequeueCurrentLog(); - if (openNextLog()) { - readNextEntryAndSetPosition(); - } + boolean beingWritten = readNextEntryAndSetPosition(); + if (currentEntry == null && !beingWritten) { + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndSetPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndSetPosition(); } } - } // no other logs, we've simply hit the end of the current open log. Do nothing + } } + // if currentEntry != null then just return + // if currentEntry == null but the file is still being written, then we should not switch to + // the next log either, just return here and try next time to see if there are more entries in + // the current file } // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) } @@ -270,15 +241,30 @@ public class WALEntryStream implements Iterator, Closeable, Iterable fileLength.getAsLong()) { + // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // data, so we need to make sure that we do not read beyond the committed file length. + if (LOG.isDebugEnabled()) { + LOG.debug("The provider tells us the valid length for " + currentPath + " is " + + fileLength.getAsLong() + ", but we have advanced to " + readerPos); + } + resetReader(); + return true; + } if (readEntry != null) { metrics.incrLogEditsRead(); metrics.incrLogReadInBytes(readerPos - currentPosition); } currentEntry = readEntry; // could be null setPosition(readerPos); + return fileLength.isPresent(); } private void closeReader() throws IOException { @@ -301,7 +287,9 @@ public class WALEntryStream implements Iterator, Closeable, Iterable, Closeable, Iterable> implemen } @Override - public List getWALs() throws IOException { + public List getWALs() { if (wal == null) { return Collections.emptyList(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index c805ff3ef14..a6d43d61c76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,7 +67,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public List getWALs() throws IOException { + public List getWALs() { List wals = new ArrayList<>(1); wals.add(disabled); return wals; @@ -232,6 +233,11 @@ class DisabledWALProvider implements WALProvider { public String toString() { return "WAL disabled."; } + + @Override + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + return OptionalLong.empty(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 95b7daefbd2..ab3a7d94198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -169,7 +169,7 @@ public class RegionGroupingProvider implements WALProvider { } @Override - public List getWALs() throws IOException { + public List getWALs() { List wals = new ArrayList<>(); for (WALProvider provider : cached.values()) { wals.addAll(provider.getWALs()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index eede93707ab..9ec58abc08b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.wal; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - import java.io.Closeable; import java.io.IOException; import java.util.Map; @@ -35,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface WAL extends Closeable { +public interface WAL extends Closeable, WALFileLengthProvider { /** * Registers WALActionsListener diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index aaa828fe43f..efb8e2d0346 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -38,6 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. */ @InterfaceAudience.Private -public class WALFactory { +public class WALFactory implements WALFileLengthProvider { private static final Log LOG = LogFactory.getLog(WALFactory.class); @@ -230,7 +232,7 @@ public class WALFactory { } } - public List getWALs() throws IOException { + public List getWALs() { return provider.getWALs(); } @@ -450,4 +452,9 @@ public class WALFactory { public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } + + @Override + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index ffcfcd4ac08..c38f419d89f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -60,7 +60,7 @@ public interface WALProvider { /** @return the List of WALs that are used by this server */ - List getWALs() throws IOException; + List getWALs(); /** * persist outstanding WALs to storage and stop accepting new appends. @@ -76,18 +76,20 @@ public interface WALProvider { */ void close() throws IOException; - // Writers are used internally. Users outside of the WAL should be relying on the - // interface provided by WAL. - interface Writer extends Closeable { - void sync() throws IOException; - void append(WAL.Entry entry) throws IOException; + interface WriterBase extends Closeable { long getLength(); } - interface AsyncWriter extends Closeable { + // Writers are used internally. Users outside of the WAL should be relying on the + // interface provided by WAL. + interface Writer extends WriterBase { + void sync() throws IOException; + void append(WAL.Entry entry) throws IOException; + } + + interface AsyncWriter extends WriterBase { CompletableFuture sync(); void append(WAL.Entry entry); - long getLength(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index e23e15bc581..bfe17b5a021 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -42,16 +43,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { String peerClusterId; Path currentPath; MetricsSource metrics; + WALFileLengthProvider walFileLengthProvider; @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) - throws IOException { - + UUID clusterId, ReplicationEndpoint replicationEndpoint, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; + this.walFileLengthProvider = walFileLengthProvider; } @Override @@ -135,4 +137,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void postShipEdits(List entries, int batchSize) { } + + @Override + public WALFileLengthProvider getWALFileLengthProvider() { + return walFileLengthProvider; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 56a5bdc0578..ebb1bf8a554 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.io.IOException; +import java.util.OptionalLong; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -166,8 +167,8 @@ public class TestReplicationSource { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeers, null, "testPeer", - null, replicationEndpoint, null); + source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, + replicationEndpoint, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9804df4a59a..3934e05e972 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -61,9 +61,6 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.wal.WALEdit; @@ -97,6 +94,9 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; /** * An abstract class that tests ReplicationSourceManager. Classes that extend this class should @@ -646,8 +646,8 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) - throws IOException { + UUID clusterId, ReplicationEndpoint replicationEndpoint, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 5f3452a64c9..d65054c505a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.NoSuchElementException; +import java.util.OptionalLong; import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -42,13 +43,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -56,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -67,11 +66,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; -@RunWith(MockitoJUnitRunner.class) @Category({ ReplicationTests.class, LargeTests.class }) public class TestWALEntryStream { @@ -84,8 +80,13 @@ public class TestWALEntryStream { private static final byte[] qualifier = Bytes.toBytes("qualifier"); private static final HRegionInfo info = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); - private static final HTableDescriptor htd = new HTableDescriptor(tableName); - private static NavigableMap scopes; + private static final NavigableMap scopes = getScopes(); + + private static NavigableMap getScopes() { + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + scopes.put(family, 1); + return scopes; + } private WAL log; PriorityBlockingQueue walQueue; @@ -103,10 +104,6 @@ public class TestWALEntryStream { cluster = TEST_UTIL.getDFSCluster(); fs = cluster.getFileSystem(); - scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - for (byte[] fam : htd.getFamiliesKeys()) { - scopes.put(fam, 0); - } } @AfterClass @@ -151,10 +148,10 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { int i = 0; - for (WAL.Entry e : entryStream) { - assertNotNull(e); + while (entryStream.hasNext()) { + assertNotNull(entryStream.next()); i++; } assertEquals(nbRows, i); @@ -176,10 +173,9 @@ public class TestWALEntryStream { @Test public void testAppendsWithRolls() throws Exception { appendToLog(); - long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.next(); @@ -196,8 +192,8 @@ public class TestWALEntryStream { appendToLog(); - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + log, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -210,8 +206,8 @@ public class TestWALEntryStream { log.rollWriter(); appendToLog(); - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + log, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -236,7 +232,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -261,7 +257,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -284,7 +280,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -292,7 +288,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -309,14 +305,14 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLog(3); // read only one element - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition, + log, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -327,7 +323,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } @@ -338,7 +334,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -351,6 +347,7 @@ public class TestWALEntryStream { ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); + when(source.getWALFileLengthProvider()).thenReturn(log); ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); Path walPath = walQueue.peek(); @@ -425,10 +422,6 @@ public class TestWALEntryStream { }; } - private ReplicationQueueInfo getQueueInfo() { - return new ReplicationQueueInfo("1"); - } - class PathWatcher extends WALActionsListener.Base { Path currentPath; @@ -440,4 +433,30 @@ public class TestWALEntryStream { } } + @Test + public void testReadBeyondCommittedLength() throws IOException, InterruptedException { + appendToLog("1"); + appendToLog("2"); + long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); + AtomicLong fileLength = new AtomicLong(size - 1); + try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0, + p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) { + assertTrue(entryStream.hasNext()); + assertNotNull(entryStream.next()); + // can not get log 2 + assertFalse(entryStream.hasNext()); + Thread.sleep(1000); + entryStream.reset(); + // still can not get log 2 + assertFalse(entryStream.hasNext()); + + // can get log 2 now + fileLength.set(size); + entryStream.reset(); + assertTrue(entryStream.hasNext()); + assertNotNull(entryStream.next()); + + assertFalse(entryStream.hasNext()); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 041d8ae332b..944a4f168f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -107,7 +107,7 @@ public class IOTestProvider implements WALProvider { } @Override - public List getWALs() throws IOException { + public List getWALs() { List wals = new ArrayList<>(1); wals.add(log); return wals;