HBASE-25065 WAL archival to be done by a separate thread (#2501)

* HBASE-25065 WAL archival can be batched/throttled and also done by a separate thread

* Fix checkstyle issues

* Address review comments

* checkstyle comments

* Addressing final review comments
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
ramkrish86 2020-10-11 10:46:06 +05:30 committed by GitHub
parent c367e91aff
commit accd9750aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 199 additions and 43 deletions

View File

@ -301,7 +301,7 @@ public final class MasterRegion {
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start();
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), false);
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
HRegion region;
if (fs.exists(tableDir)) {

View File

@ -1906,7 +1906,7 @@ public class HRegionServer extends Thread implements
boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
!LoadBalancer.isMasterCanHostUserRegions(conf);
WALFactory factory =
new WALFactory(conf, serverName.toString(), !isMasterNoTableOrSystemTableOnly);
new WALFactory(conf, serverName.toString(), this, !isMasterNoTableOrSystemTableOnly);
if (!isMasterNoTableOrSystemTableOnly) {
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);

View File

@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -84,8 +87,12 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
@ -185,6 +192,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
protected final Configuration conf;
protected final Abortable abortable;
/** Listeners that are called on WAL events. */
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
@ -329,6 +338,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archiver-%d").build());
private final int archiveRetries;
public long getFilenum() {
return this.filenum.get();
}
@ -380,10 +394,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix)
throws FailedLogCloseException, IOException {
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}
protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
final String logDir, final String archiveDir, final Configuration conf,
final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
final String suffix)
throws FailedLogCloseException, IOException {
this.fs = fs;
this.walDir = new Path(rootDir, logDir);
this.walArchiveDir = new Path(rootDir, archiveDir);
this.conf = conf;
this.abortable = abortable;
if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
throw new IOException("Unable to mkdir " + walDir);
@ -482,6 +505,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
}
/**
@ -715,11 +740,39 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
regionsBlockingThisWal.clear();
}
}
if (logsToArchive != null) {
for (Pair<Path, Long> logAndSize : logsToArchive) {
this.totalLogSize.addAndGet(-logAndSize.getSecond());
archiveLogFile(logAndSize.getFirst());
this.walFile2Props.remove(logAndSize.getFirst());
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
// make it async
for (Pair<Path, Long> log : localLogsToArchive) {
logArchiveExecutor.execute(() -> {
archive(log);
});
this.walFile2Props.remove(log.getFirst());
}
}
}
protected void archive(final Pair<Path, Long> log) {
int retry = 1;
while (true) {
try {
archiveLogFile(log.getFirst());
totalLogSize.addAndGet(-log.getSecond());
// successful
break;
} catch (Throwable e) {
if (retry > archiveRetries) {
LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
if (this.abortable != null) {
this.abortable.abort("Failed log archiving", e);
break;
}
} else {
LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
e);
}
retry++;
}
}
}
@ -732,7 +785,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return new Path(archiveDir, p.getName());
}
private void archiveLogFile(final Path p) throws IOException {
@VisibleForTesting
protected void archiveLogFile(final Path p) throws IOException {
Path newPath = getWALArchivePath(this.walArchiveDir, p);
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
@ -907,6 +961,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
rollWriterLock.lock();
try {
doShutdown();
if (logArchiveExecutor != null) {
logArchiveExecutor.shutdownNow();
}
} finally {
rollWriterLock.unlock();
}

View File

@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
@ -60,7 +62,6 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
@ -68,6 +69,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
/**
* An asynchronous implementation of FSWAL.
* <p>
@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass);
}
public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix);
this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask;

View File

@ -40,10 +40,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.trace.TraceUtil;
@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The default implementation of FSWAL.
*/
@ -168,7 +170,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final int waitOnShutdownInSeconds;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@ -208,11 +210,25 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
}
@VisibleForTesting
public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir,
final Configuration conf) throws IOException {
this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
null);
}
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}
/**
* Create an edit log at the given <code>dir</code> location. You should never have to load an
* existing log. If there is a log at startup, it should have already been processed and deleted
* by the time the WAL object is started up.
* @param fs filesystem handle
* @param abortable Abortable - the server here
* @param rootDir path to where logs and oldlogs
* @param logDir dir where wals are stored
* @param archiveDir dir where wals are archived
@ -226,10 +242,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
* {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
*/
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir,
final String logDir, final String archiveDir, final Configuration conf,
final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
final String suffix) throws IOException {
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix);
this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@ -29,10 +30,12 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -88,6 +91,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
protected AtomicBoolean initialized = new AtomicBoolean(false);
// for default wal provider, logPrefix won't change
protected String logPrefix;
protected Abortable abortable;
/**
* We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
@ -102,7 +106,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* null
*/
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
@ -119,6 +124,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
}
logPrefix = sb.toString();
this.abortable = abortable;
doInit(conf);
}

View File

@ -65,11 +65,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId),
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup, channelClass);
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
channelClass);
}
@Override

View File

@ -25,8 +25,10 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
@ -55,7 +57,8 @@ class DisabledWALProvider implements WALProvider {
WAL disabled;
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
throws IOException {
if (null != disabled) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}

View File

@ -67,7 +67,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
* Public because of FSHLog. Should be package-private
*/
public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path,
final boolean overwritable, long blocksize) throws IOException {
final boolean overwritable, long blocksize) throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends Writer> logWriterClass =
conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
@Override
protected FSHLog createWAL() throws IOException {
return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId),
return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable,
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}

View File

@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@ -137,14 +139,17 @@ public class RegionGroupingProvider implements WALProvider {
private List<WALActionsListener> listeners = new ArrayList<>();
private String providerId;
private Class<? extends WALProvider> providerClass;
private Abortable abortable;
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
throws IOException {
if (null != strategy) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
this.conf = conf;
this.factory = factory;
this.abortable = abortable;
if (META_WAL_PROVIDER_ID.equals(providerId)) {
// do not change the provider id if it is for meta
@ -171,7 +176,7 @@ public class RegionGroupingProvider implements WALProvider {
private WALProvider createProvider(String group) throws IOException {
WALProvider provider = WALFactory.createProvider(providerClass);
provider.init(factory, conf,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group);
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
return provider;
}

View File

@ -35,7 +35,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
@ -108,11 +110,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
provider.init(factory, conf, providerId);
provider.init(factory, conf, providerId, abortable);
this.conf = conf;
this.factory = factory;
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =

View File

@ -21,9 +21,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@ -86,6 +87,7 @@ public class WALFactory {
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
final String factoryId;
final Abortable abortable;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
// regionserver holds meta regions, then this ref will be non-null.
@ -119,6 +121,7 @@ public class WALFactory {
// this instance can't create wals, just reader/writers.
provider = null;
factoryId = SINGLETON_ID;
this.abortable = null;
}
@VisibleForTesting
@ -175,7 +178,7 @@ public class WALFactory {
public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, true);
this(conf, factoryId, null, true);
}
/**
@ -183,11 +186,12 @@ public class WALFactory {
* instances.
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
* to make a directory
* @param abortable the server associated with this WAL file
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider}
*/
public WALFactory(Configuration conf, String factoryId, boolean enableSyncReplicationWALProvider)
throws IOException {
public WALFactory(Configuration conf, String factoryId, Abortable abortable,
boolean enableSyncReplicationWALProvider) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
@ -196,20 +200,21 @@ public class WALFactory {
AbstractFSWALProvider.Reader.class);
this.conf = conf;
this.factoryId = factoryId;
this.abortable = abortable;
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
if (enableSyncReplicationWALProvider) {
provider = new SyncReplicationWALProvider(provider);
}
provider.init(this, conf, null);
provider.init(this, conf, null, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
this.provider = provider;
} else {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
provider = new DisabledWALProvider();
provider.init(this, conf, factoryId);
provider.init(this, conf, factoryId, null);
}
}
@ -274,7 +279,7 @@ public class WALFactory {
clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
}
provider = createProvider(clz);
provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
if (metaProvider.compareAndSet(null, provider)) {
return provider;

View File

@ -23,7 +23,9 @@ import java.io.IOException;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -46,7 +48,8 @@ public interface WALProvider {
* @param conf may not be null
* @param providerId differentiate between providers from one factory. may be null
*/
void init(WALFactory factory, Configuration conf, String providerId) throws IOException;
void init(WALFactory factory, Configuration conf, String providerId, Abortable server)
throws IOException;
/**
* @param region the region which we want to get a WAL for it. Could be null.

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
@ -107,11 +110,13 @@ public class TestFailedAppendAndSync {
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
volatile boolean throwAppendException = false;
volatile boolean throwArchiveException = false;
final AtomicLong rolls = new AtomicLong(0);
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
public DodgyFSLog(FileSystem fs, Server server, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
super(fs, server, root, logDir, conf);
}
@Override
@ -122,6 +127,18 @@ public class TestFailedAppendAndSync {
return regions;
}
@Override
protected void archiveLogFile(Path p) throws IOException {
if (throwArchiveException) {
throw new IOException("throw archival exception");
}
}
@Override
protected void archive(Pair<Path, Long> localLogsToArchive) {
super.archive(localLogsToArchive);
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
@ -176,7 +193,7 @@ public class TestFailedAppendAndSync {
// the test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, (Server)services, rootDir, getName(), CONF);
dodgyWAL.init();
LogRoller logRoller = new LogRoller(services);
logRoller.addWAL(dodgyWAL);
@ -256,6 +273,27 @@ public class TestFailedAppendAndSync {
Threads.sleep(1);
}
}
try {
dodgyWAL.throwAppendException = false;
dodgyWAL.throwSyncException = false;
dodgyWAL.throwArchiveException = true;
Pair<Path, Long> pair = new Pair<Path, Long>();
pair.setFirst(new Path("/a/b/"));
pair.setSecond(100L);
dodgyWAL.archive(pair);
} catch (Throwable ioe) {
}
while (true) {
try {
// one more abort needs to be called
Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(),
(Throwable) Mockito.anyObject());
break;
} catch (WantedButNotInvoked t) {
Threads.sleep(1);
}
}
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(services.isStopped()).thenReturn(true);

View File

@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling {
}
}
private void assertLogFileSize(WAL log) {
private void assertLogFileSize(WAL log) throws InterruptedException {
if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
} else {
for (int i = 0; i < 10; i++) {
if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
Thread.sleep(10);
}
}
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
}
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
// imports for things that haven't moved from regionserver.wal yet.
@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider {
* null
*/
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}

View File

@ -687,7 +687,7 @@ public class TestWALFactory {
assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
// if providers are not set and do not enable SyncReplicationWALProvider
walFactory = new WALFactory(conf, this.currentServername.toString(), false);
walFactory = new WALFactory(conf, this.currentServername.toString(), null, false);
assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
}