diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a80881e5689..d2b40f5cae2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableDescriptors; @@ -1897,7 +1898,7 @@ public class HRegionServer extends Thread implements * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { - WALFactory factory = new WALFactory(conf, serverName.toString()); + WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this); // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 3b688d52e3b..ee8790f5852 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -86,6 +89,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one @@ -181,6 +185,8 @@ public abstract class AbstractFSWAL implements WAL { */ protected final Configuration conf; + protected final Abortable abortable; + /** Listeners that are called on WAL events. */ protected final List listeners = new CopyOnWriteArrayList<>(); @@ -313,6 +319,11 @@ public abstract class AbstractFSWAL implements WAL { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); + private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); + + private final int archiveRetries; + public long getFilenum() { return this.filenum.get(); } @@ -364,10 +375,19 @@ public abstract class AbstractFSWAL implements WAL { final String archiveDir, final Configuration conf, final List listeners, final boolean failIfWALExists, final String prefix, final String suffix) throws FailedLogCloseException, IOException { + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, + final String logDir, final String archiveDir, final Configuration conf, + final List listeners, final boolean failIfWALExists, final String prefix, + final String suffix) + throws FailedLogCloseException, IOException { this.fs = fs; this.walDir = new Path(rootDir, logDir); this.walArchiveDir = new Path(rootDir, archiveDir); this.conf = conf; + this.abortable = abortable; if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { throw new IOException("Unable to mkdir " + walDir); @@ -464,6 +484,7 @@ public abstract class AbstractFSWAL implements WAL { }; this.implClassName = getClass().getSimpleName(); this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); + archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); } /** @@ -672,11 +693,39 @@ public abstract class AbstractFSWAL implements WAL { } } } + if (logsToArchive != null) { - for (Pair logAndSize : logsToArchive) { - this.totalLogSize.addAndGet(-logAndSize.getSecond()); - archiveLogFile(logAndSize.getFirst()); - this.walFile2Props.remove(logAndSize.getFirst()); + final List> localLogsToArchive = logsToArchive; + // make it async + for (Pair log : localLogsToArchive) { + logArchiveExecutor.execute(() -> { + archive(log); + }); + this.walFile2Props.remove(log.getFirst()); + } + } + } + + protected void archive(final Pair log) { + int retry = 1; + while (true) { + try { + archiveLogFile(log.getFirst()); + totalLogSize.addAndGet(-log.getSecond()); + // successful + break; + } catch (Throwable e) { + if (retry > archiveRetries) { + LOG.error("Failed log archiving for the log {},", log.getFirst(), e); + if (this.abortable != null) { + this.abortable.abort("Failed log archiving", e); + break; + } + } else { + LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, + e); + } + retry++; } } } @@ -689,7 +738,8 @@ public abstract class AbstractFSWAL implements WAL { return new Path(archiveDir, p.getName()); } - private void archiveLogFile(final Path p) throws IOException { + @VisibleForTesting + protected void archiveLogFile(final Path p) throws IOException { Path newPath = getWALArchivePath(this.walArchiveDir, p); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { @@ -865,6 +915,9 @@ public abstract class AbstractFSWAL implements WAL { rollWriterLock.lock(); try { doShutdown(); + if (logArchiveExecutor != null) { + logArchiveExecutor.shutdownNow(); + } } finally { rollWriterLock.unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 3c799bf887e..66149a38434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; @@ -57,17 +59,17 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; + /** * An asynchronous implementation of FSWAL. *

@@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL { Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { - super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, + eventLoopGroup, channelClass); + } + + public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, + String archiveDir, Configuration conf, List listeners, + boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class channelClass) throws FailedLogCloseException, IOException { + super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, + suffix); this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; Supplier hasConsumerTask; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 2227da703cf..f2ee0aee888 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -40,10 +40,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The default implementation of FSWAL. */ @@ -208,6 +210,19 @@ public class FSHLog extends AbstractFSWAL { this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); } + @VisibleForTesting + public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir, + final Configuration conf) throws IOException { + this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, + null); + } + + public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, + final String archiveDir, final Configuration conf, final List listeners, + final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + /** * Create an edit log at the given dir location. You should never have to load an * existing log. If there is a log at startup, it should have already been processed and deleted @@ -226,10 +241,12 @@ public class FSHLog extends AbstractFSWAL { * @param suffix will be url encoded. null is treated as empty. non-empty must start with * {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} */ - public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, - final String archiveDir, final Configuration conf, final List listeners, - final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { - super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir, + final String logDir, final String archiveDir, final Configuration conf, + final List listeners, final boolean failIfWALExists, final String prefix, + final String suffix) throws IOException { + super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, + suffix); this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir)); this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index d3bb0d92756..6fcebb0d7bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -28,10 +28,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -88,6 +90,7 @@ public abstract class AbstractFSWALProvider> implemen protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change protected String logPrefix; + protected Abortable abortable; /** * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs @@ -102,7 +105,8 @@ public abstract class AbstractFSWALProvider> implemen * null */ @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } @@ -119,6 +123,7 @@ public abstract class AbstractFSWALProvider> implemen } } logPrefix = sb.toString(); + this.abortable = abortable; doInit(conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 88b0140b37b..377f6a49bfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -67,11 +67,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { private Class channelClass; @Override protected AsyncFSWAL createWAL() throws IOException { - return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), + return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, + CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, - eventLoopGroup, channelClass); + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, + channelClass); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 0ff2195eaa0..6c215f84ed4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -55,7 +56,8 @@ class DisabledWALProvider implements WALProvider { WAL disabled; @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (null != disabled) { throw new IllegalStateException("WALProvider.init should only be called once."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 3b91c2475cf..8f2ca0753e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider { @Override protected FSHLog createWAL() throws IOException { - return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), + return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable, + CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 2fd828898ba..4a2c2204b81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; // imports for classes still in regionserver.wal @@ -137,7 +139,8 @@ public class RegionGroupingProvider implements WALProvider { private Class providerClass; @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 30bb77ef66e..3b7f31106d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; @@ -37,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** * Entry point for users of the Write Ahead Log. * Acts as the shim between internal use and the particular WALProvider we use to handle wal @@ -86,6 +87,7 @@ public class WALFactory { public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; final String factoryId; + final Abortable abortable; private final WALProvider provider; // The meta updates are written to a different wal. If this // regionserver holds meta regions, then this ref will be non-null. @@ -119,6 +121,7 @@ public class WALFactory { // this instance can't create wals, just reader/writers. provider = null; factoryId = SINGLETON_ID; + this.abortable = null; } @VisibleForTesting @@ -160,7 +163,7 @@ public class WALFactory { LOG.info("Instantiating WALProvider of type " + clazz); try { final WALProvider result = clazz.getDeclaredConstructor().newInstance(); - result.init(this, conf, providerId); + result.init(this, conf, providerId, this.abortable); return result; } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); @@ -180,13 +183,17 @@ public class WALFactory { return provider; } + @VisibleForTesting + public WALFactory(Configuration conf, String factoryId) throws IOException { + this(conf, factoryId, null); + } + /** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. - * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations - * to make a directory + * @param abortable the server to abort */ - public WALFactory(Configuration conf, String factoryId) throws IOException { + public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); @@ -195,6 +202,7 @@ public class WALFactory { AbstractFSWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; + this.abortable = abortable; // end required early initialization if (conf.getBoolean(WAL_ENABLED, true)) { provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); @@ -202,7 +210,7 @@ public class WALFactory { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); - provider.init(this, conf, factoryId); + provider.init(this, conf, factoryId, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index c3bd1499507..0a3123a8104 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -46,7 +48,8 @@ public interface WALProvider { * @param conf may not be null * @param providerId differentiate between providers from one factory. may be null */ - void init(WALFactory factory, Configuration conf, String providerId) throws IOException; + void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException; /** * @param region the region which we want to get a WAL for it. Could be null. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index 08b5f995190..fb1d0ac90f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -19,11 +19,9 @@ package org.apache.hadoop.hbase.master.region; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -88,12 +86,8 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase { region.requestRollAll(); region.waitUntilWalRollFinished(); // should have one - FileStatus[] files = fs.listStatus(globalWALArchiveDir); - assertEquals(1, files.length); - Thread.sleep(2000); - // should still be there - assertTrue(fs.exists(files[0].getPath())); - Thread.sleep(6000); + + Thread.sleep(9000); // should have been cleaned assertEquals(0, fs.listStatus(globalWALArchiveDir).length); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 67e31344df7..a37f1f24a1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider.Writer; @@ -116,10 +118,11 @@ public class TestFailedAppendAndSync { class DodgyFSLog extends FSHLog { volatile boolean throwSyncException = false; volatile boolean throwAppendException = false; + volatile boolean throwArchiveException = false; - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) - throws IOException { - super(fs, root, logDir, conf); + public DodgyFSLog(FileSystem fs, Abortable abortable, Path root, String logDir, + Configuration conf) throws IOException { + super(fs, abortable, root, logDir, conf); } @Override @@ -130,6 +133,18 @@ public class TestFailedAppendAndSync { return regions; } + @Override + protected void archiveLogFile(Path p) throws IOException { + if (throwArchiveException) { + throw new IOException("throw archival exception"); + } + } + + @Override + protected void archive(Pair localLogsToArchive) { + super.archive(localLogsToArchive); + } + @Override protected Writer createWriterInstance(Path path) throws IOException { final Writer w = super.createWriterInstance(path); @@ -177,7 +192,7 @@ public class TestFailedAppendAndSync { // the test. FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + getName()); - DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, services, rootDir, getName(), CONF); dodgyWAL.init(); LogRoller logRoller = new LogRoller(services); logRoller.addWAL(dodgyWAL); @@ -253,6 +268,27 @@ public class TestFailedAppendAndSync { Threads.sleep(1); } } + + try { + dodgyWAL.throwAppendException = false; + dodgyWAL.throwSyncException = false; + dodgyWAL.throwArchiveException = true; + Pair pair = new Pair(); + pair.setFirst(new Path("/a/b/")); + pair.setSecond(100L); + dodgyWAL.archive(pair); + } catch (Throwable ioe) { + } + while (true) { + try { + // one more abort needs to be called + Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(), + (Throwable) Mockito.anyObject()); + break; + } catch (WantedButNotInvoked t) { + Threads.sleep(1); + } + } } finally { // To stop logRoller, its server has to say it is stopped. Mockito.when(services.isStopped()).thenReturn(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 83bd9abecd0..44ca988fb58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling { } } - private void assertLogFileSize(WAL log) { + private void assertLogFileSize(WAL log) throws InterruptedException { if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) { assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0); } else { + for (int i = 0; i < 10; i++) { + if (AbstractFSWALProvider.getLogFileSize(log) != 0) { + Thread.sleep(10); + } + } assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 384a293ad41..e624e6fe264 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; // imports for things that haven't moved from regionserver.wal yet. @@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider { * null */ @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); }