HBASE-25065 - WAL archival to be done by a separate thread (#2531)
* HBASE-25065 - WAL archival to be done by a separate thread * Fix checkstyle comments * Fix compile issue * Fix checkstyle and make the failing test more reliable * Remove unused import
This commit is contained in:
parent
3beae80c60
commit
ed7aa8e369
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
|
@ -1897,7 +1898,7 @@ public class HRegionServer extends Thread implements
|
|||
* be hooked up to WAL.
|
||||
*/
|
||||
private void setupWALAndReplication() throws IOException {
|
||||
WALFactory factory = new WALFactory(conf, serverName.toString());
|
||||
WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this);
|
||||
|
||||
// TODO Replication make assumptions here based on the default filesystem impl
|
||||
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
|
|
@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -86,6 +89,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
|
||||
|
@ -181,6 +185,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
*/
|
||||
protected final Configuration conf;
|
||||
|
||||
protected final Abortable abortable;
|
||||
|
||||
/** Listeners that are called on WAL events. */
|
||||
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
|
@ -313,6 +319,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||
|
||||
private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build());
|
||||
|
||||
private final int archiveRetries;
|
||||
|
||||
public long getFilenum() {
|
||||
return this.filenum.get();
|
||||
}
|
||||
|
@ -364,10 +375,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final boolean failIfWALExists, final String prefix, final String suffix)
|
||||
throws FailedLogCloseException, IOException {
|
||||
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
}
|
||||
|
||||
protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
|
||||
final String logDir, final String archiveDir, final Configuration conf,
|
||||
final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
|
||||
final String suffix)
|
||||
throws FailedLogCloseException, IOException {
|
||||
this.fs = fs;
|
||||
this.walDir = new Path(rootDir, logDir);
|
||||
this.walArchiveDir = new Path(rootDir, archiveDir);
|
||||
this.conf = conf;
|
||||
this.abortable = abortable;
|
||||
|
||||
if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
|
||||
throw new IOException("Unable to mkdir " + walDir);
|
||||
|
@ -464,6 +484,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
};
|
||||
this.implClassName = getClass().getSimpleName();
|
||||
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
||||
archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -672,11 +693,39 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (logsToArchive != null) {
|
||||
for (Pair<Path, Long> logAndSize : logsToArchive) {
|
||||
this.totalLogSize.addAndGet(-logAndSize.getSecond());
|
||||
archiveLogFile(logAndSize.getFirst());
|
||||
this.walFile2Props.remove(logAndSize.getFirst());
|
||||
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
||||
// make it async
|
||||
for (Pair<Path, Long> log : localLogsToArchive) {
|
||||
logArchiveExecutor.execute(() -> {
|
||||
archive(log);
|
||||
});
|
||||
this.walFile2Props.remove(log.getFirst());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void archive(final Pair<Path, Long> log) {
|
||||
int retry = 1;
|
||||
while (true) {
|
||||
try {
|
||||
archiveLogFile(log.getFirst());
|
||||
totalLogSize.addAndGet(-log.getSecond());
|
||||
// successful
|
||||
break;
|
||||
} catch (Throwable e) {
|
||||
if (retry > archiveRetries) {
|
||||
LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
|
||||
if (this.abortable != null) {
|
||||
this.abortable.abort("Failed log archiving", e);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
|
||||
e);
|
||||
}
|
||||
retry++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -689,7 +738,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
return new Path(archiveDir, p.getName());
|
||||
}
|
||||
|
||||
private void archiveLogFile(final Path p) throws IOException {
|
||||
@VisibleForTesting
|
||||
protected void archiveLogFile(final Path p) throws IOException {
|
||||
Path newPath = getWALArchivePath(this.walArchiveDir, p);
|
||||
// Tell our listeners that a log is going to be archived.
|
||||
if (!this.listeners.isEmpty()) {
|
||||
|
@ -865,6 +915,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
rollWriterLock.lock();
|
||||
try {
|
||||
doShutdown();
|
||||
if (logArchiveExecutor != null) {
|
||||
logArchiveExecutor.shutdownNow();
|
||||
}
|
||||
} finally {
|
||||
rollWriterLock.unlock();
|
||||
}
|
||||
|
|
|
@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||
|
@ -57,17 +59,17 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
|||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
|
||||
|
||||
|
||||
/**
|
||||
* An asynchronous implementation of FSWAL.
|
||||
* <p>
|
||||
|
@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
|
||||
eventLoopGroup, channelClass);
|
||||
}
|
||||
|
||||
public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
|
||||
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
|
||||
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
||||
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
|
||||
suffix);
|
||||
this.eventLoopGroup = eventLoopGroup;
|
||||
this.channelClass = channelClass;
|
||||
Supplier<Boolean> hasConsumerTask;
|
||||
|
|
|
@ -40,10 +40,12 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
|
@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
|
||||
/**
|
||||
* The default implementation of FSWAL.
|
||||
*/
|
||||
|
@ -208,6 +210,19 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir,
|
||||
final Configuration conf) throws IOException {
|
||||
this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
|
||||
null);
|
||||
}
|
||||
|
||||
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
|
||||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
|
||||
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an edit log at the given <code>dir</code> location. You should never have to load an
|
||||
* existing log. If there is a log at startup, it should have already been processed and deleted
|
||||
|
@ -226,10 +241,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
|
||||
* {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
|
||||
*/
|
||||
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
|
||||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir,
|
||||
final String logDir, final String archiveDir, final Configuration conf,
|
||||
final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
|
||||
final String suffix) throws IOException {
|
||||
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
|
||||
suffix);
|
||||
this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
|
||||
CommonFSUtils.getDefaultReplication(fs, this.walDir));
|
||||
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
|
||||
|
|
|
@ -28,10 +28,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -88,6 +90,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
protected AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
// for default wal provider, logPrefix won't change
|
||||
protected String logPrefix;
|
||||
protected Abortable abortable;
|
||||
|
||||
/**
|
||||
* We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
|
||||
|
@ -102,7 +105,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
* null
|
||||
*/
|
||||
@Override
|
||||
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
|
||||
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
|
||||
throws IOException {
|
||||
if (!initialized.compareAndSet(false, true)) {
|
||||
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||
}
|
||||
|
@ -119,6 +123,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
}
|
||||
}
|
||||
logPrefix = sb.toString();
|
||||
this.abortable = abortable;
|
||||
doInit(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -67,11 +67,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
|||
private Class<? extends Channel> channelClass;
|
||||
@Override
|
||||
protected AsyncFSWAL createWAL() throws IOException {
|
||||
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId),
|
||||
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
|
||||
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
|
||||
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
|
||||
eventLoopGroup, channelClass);
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
|
||||
channelClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
|
@ -55,7 +56,8 @@ class DisabledWALProvider implements WALProvider {
|
|||
WAL disabled;
|
||||
|
||||
@Override
|
||||
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
|
||||
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
|
||||
throws IOException {
|
||||
if (null != disabled) {
|
||||
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||
}
|
||||
|
|
|
@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
|||
|
||||
@Override
|
||||
protected FSHLog createWAL() throws IOException {
|
||||
return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId),
|
||||
return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable,
|
||||
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
|
||||
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
// imports for classes still in regionserver.wal
|
||||
|
@ -137,7 +139,8 @@ public class RegionGroupingProvider implements WALProvider {
|
|||
private Class<? extends WALProvider> providerClass;
|
||||
|
||||
@Override
|
||||
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
|
||||
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
|
||||
throws IOException {
|
||||
if (null != strategy) {
|
||||
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||
}
|
||||
|
|
|
@ -21,9 +21,11 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
|
@ -37,7 +39,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Entry point for users of the Write Ahead Log.
|
||||
* Acts as the shim between internal use and the particular WALProvider we use to handle wal
|
||||
|
@ -86,6 +87,7 @@ public class WALFactory {
|
|||
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
|
||||
|
||||
final String factoryId;
|
||||
final Abortable abortable;
|
||||
private final WALProvider provider;
|
||||
// The meta updates are written to a different wal. If this
|
||||
// regionserver holds meta regions, then this ref will be non-null.
|
||||
|
@ -119,6 +121,7 @@ public class WALFactory {
|
|||
// this instance can't create wals, just reader/writers.
|
||||
provider = null;
|
||||
factoryId = SINGLETON_ID;
|
||||
this.abortable = null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -160,7 +163,7 @@ public class WALFactory {
|
|||
LOG.info("Instantiating WALProvider of type " + clazz);
|
||||
try {
|
||||
final WALProvider result = clazz.getDeclaredConstructor().newInstance();
|
||||
result.init(this, conf, providerId);
|
||||
result.init(this, conf, providerId, this.abortable);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
|
||||
|
@ -180,13 +183,17 @@ public class WALFactory {
|
|||
return provider;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public WALFactory(Configuration conf, String factoryId) throws IOException {
|
||||
this(conf, factoryId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf must not be null, will keep a reference to read params in later reader/writer
|
||||
* instances.
|
||||
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
|
||||
* to make a directory
|
||||
* @param abortable the server to abort
|
||||
*/
|
||||
public WALFactory(Configuration conf, String factoryId) throws IOException {
|
||||
public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException {
|
||||
// until we've moved reader/writer construction down into providers, this initialization must
|
||||
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
|
||||
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
|
||||
|
@ -195,6 +202,7 @@ public class WALFactory {
|
|||
AbstractFSWALProvider.Reader.class);
|
||||
this.conf = conf;
|
||||
this.factoryId = factoryId;
|
||||
this.abortable = abortable;
|
||||
// end required early initialization
|
||||
if (conf.getBoolean(WAL_ENABLED, true)) {
|
||||
provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
|
||||
|
@ -202,7 +210,7 @@ public class WALFactory {
|
|||
// special handling of existing configuration behavior.
|
||||
LOG.warn("Running with WAL disabled.");
|
||||
provider = new DisabledWALProvider();
|
||||
provider.init(this, conf, factoryId);
|
||||
provider.init(this, conf, factoryId, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,9 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
|
@ -46,7 +48,8 @@ public interface WALProvider {
|
|||
* @param conf may not be null
|
||||
* @param providerId differentiate between providers from one factory. may be null
|
||||
*/
|
||||
void init(WALFactory factory, Configuration conf, String providerId) throws IOException;
|
||||
void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* @param region the region which we want to get a WAL for it. Could be null.
|
||||
|
|
|
@ -19,11 +19,9 @@ package org.apache.hadoop.hbase.master.region;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -88,12 +86,8 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
|
|||
region.requestRollAll();
|
||||
region.waitUntilWalRollFinished();
|
||||
// should have one
|
||||
FileStatus[] files = fs.listStatus(globalWALArchiveDir);
|
||||
assertEquals(1, files.length);
|
||||
Thread.sleep(2000);
|
||||
// should still be there
|
||||
assertTrue(fs.exists(files[0].getPath()));
|
||||
Thread.sleep(6000);
|
||||
|
||||
Thread.sleep(9000);
|
||||
// should have been cleaned
|
||||
assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
|||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
|
@ -116,10 +118,11 @@ public class TestFailedAppendAndSync {
|
|||
class DodgyFSLog extends FSHLog {
|
||||
volatile boolean throwSyncException = false;
|
||||
volatile boolean throwAppendException = false;
|
||||
volatile boolean throwArchiveException = false;
|
||||
|
||||
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||
throws IOException {
|
||||
super(fs, root, logDir, conf);
|
||||
public DodgyFSLog(FileSystem fs, Abortable abortable, Path root, String logDir,
|
||||
Configuration conf) throws IOException {
|
||||
super(fs, abortable, root, logDir, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -130,6 +133,18 @@ public class TestFailedAppendAndSync {
|
|||
return regions;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void archiveLogFile(Path p) throws IOException {
|
||||
if (throwArchiveException) {
|
||||
throw new IOException("throw archival exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void archive(Pair<Path, Long> localLogsToArchive) {
|
||||
super.archive(localLogsToArchive);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writer createWriterInstance(Path path) throws IOException {
|
||||
final Writer w = super.createWriterInstance(path);
|
||||
|
@ -177,7 +192,7 @@ public class TestFailedAppendAndSync {
|
|||
// the test.
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + getName());
|
||||
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
||||
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, services, rootDir, getName(), CONF);
|
||||
dodgyWAL.init();
|
||||
LogRoller logRoller = new LogRoller(services);
|
||||
logRoller.addWAL(dodgyWAL);
|
||||
|
@ -253,6 +268,27 @@ public class TestFailedAppendAndSync {
|
|||
Threads.sleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
dodgyWAL.throwAppendException = false;
|
||||
dodgyWAL.throwSyncException = false;
|
||||
dodgyWAL.throwArchiveException = true;
|
||||
Pair<Path, Long> pair = new Pair<Path, Long>();
|
||||
pair.setFirst(new Path("/a/b/"));
|
||||
pair.setSecond(100L);
|
||||
dodgyWAL.archive(pair);
|
||||
} catch (Throwable ioe) {
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
// one more abort needs to be called
|
||||
Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(),
|
||||
(Throwable) Mockito.anyObject());
|
||||
break;
|
||||
} catch (WantedButNotInvoked t) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// To stop logRoller, its server has to say it is stopped.
|
||||
Mockito.when(services.isStopped()).thenReturn(true);
|
||||
|
|
|
@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertLogFileSize(WAL log) {
|
||||
private void assertLogFileSize(WAL log) throws InterruptedException {
|
||||
if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
|
||||
assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
|
||||
} else {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
|
@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider {
|
|||
* null
|
||||
*/
|
||||
@Override
|
||||
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
|
||||
public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
|
||||
throws IOException {
|
||||
if (!initialized.compareAndSet(false, true)) {
|
||||
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue