HBASE-19747 Introduce a special WALProvider for synchronous replication

This commit is contained in:
zhangduo 2018-01-19 18:38:39 +08:00
parent b4a1dbf768
commit 274b813e12
28 changed files with 659 additions and 154 deletions

View File

@ -434,6 +434,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.implClassName = getClass().getSimpleName();
}
/**
* Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
*/
public void init() throws IOException {
rollWriter();
}
@Override
public void registerWALActionsListener(WALActionsListener listener) {
this.listeners.add(listener);

View File

@ -248,7 +248,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
rollWriter();
}
private static boolean waitingRoll(int epochAndState) {

View File

@ -38,14 +38,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
private final Path remoteWalDir;
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteRootDir,
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass);
this.remoteFs = remoteFs;
this.remoteWalDir = new Path(remoteRootDir, logDir);
this.remoteWalDir = remoteWalDir;
}
@Override

View File

@ -214,12 +214,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
5);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
// rollWriter sets this.hdfs_out if it can.
rollWriter();
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Get notification for replication peer events. Mainly used for telling the
* {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to close some WAL if not
* used any more.
* <p>
* TODO: Also need a synchronous peer state change notification.
*/
@InterfaceAudience.Private
public interface PeerActionListener {
default void peerRemoved(String peerId) {}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Get the peer id and remote root dir if the region is synchronously replicated.
*/
@InterfaceAudience.Private
public interface SynchronousReplicationPeerProvider {
/**
* Return the peer id and remote WAL directory if the region is synchronously replicated.
*/
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
}

View File

@ -137,6 +137,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
if (walCopy == null) {
walCopy = createWAL();
wal = walCopy;
walCopy.init();
}
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -32,12 +31,10 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
/**
* A WAL provider that use {@link AsyncFSWAL}.
@ -62,6 +59,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
private EventLoopGroup eventLoopGroup;
private Class<? extends Channel> channelClass;
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
@ -74,15 +72,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
@Override
protected void doInit(Configuration conf) throws IOException {
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
if (eventLoopGroupAndChannelClass != null) {
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
} else {
eventLoopGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
channelClass = NioSocketChannel.class;
}
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
/**

View File

@ -27,6 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
/**
* Helper class for passing netty event loop config to {@link AsyncFSWALProvider}.
@ -57,7 +60,10 @@ public final class NettyAsyncFSWALConfigHelper {
static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) {
String name = conf.get(EVENT_LOOP_CONFIG);
if (StringUtils.isBlank(name)) {
return null;
// create new event loop group if config is empty
return Pair.<EventLoopGroup, Class<? extends Channel>> newPair(
new NioEventLoopGroup(0, new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)),
NioSocketChannel.class);
}
return EVENT_LOOP_CONFIG_MAP.get(name);
}

View File

@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
@ -132,6 +133,7 @@ public class RegionGroupingProvider implements WALProvider {
private RegionGroupingStrategy strategy;
private WALFactory factory;
private Configuration conf;
private List<WALActionsListener> listeners = new ArrayList<>();
private String providerId;
private Class<? extends WALProvider> providerClass;
@ -141,6 +143,7 @@ public class RegionGroupingProvider implements WALProvider {
if (null != strategy) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
this.conf = conf;
this.factory = factory;
StringBuilder sb = new StringBuilder().append(factory.factoryId);
if (providerId != null) {
@ -156,11 +159,11 @@ public class RegionGroupingProvider implements WALProvider {
}
private WALProvider createProvider(String group) throws IOException {
if (META_WAL_PROVIDER_ID.equals(providerId)) {
return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
} else {
return factory.createProvider(providerClass, group);
}
WALProvider provider = WALFactory.createProvider(providerClass);
provider.init(factory, conf,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group);
provider.addWALActionsListener(new MetricsWAL());
return provider;
}
@Override

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
/**
* The special {@link WALProvider} for synchronous replication.
* <p>
* It works like an interceptor, when getting WAL, first it will check if the given region should be
* replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
* the request to the normal {@link WALProvider}.
*/
@InterfaceAudience.Private
public class SynchronousReplicationWALProvider implements WALProvider, PeerActionListener {
private static final Logger LOG =
LoggerFactory.getLogger(SynchronousReplicationWALProvider.class);
private static final String LOG_SUFFIX = ".syncrep";
private final WALProvider provider;
private final SynchronousReplicationPeerProvider peerProvider;
private WALFactory factory;
private Configuration conf;
private List<WALActionsListener> listeners = new ArrayList<>();
private EventLoopGroup eventLoopGroup;
private Class<? extends Channel> channelClass;
private AtomicBoolean initialized = new AtomicBoolean(false);
private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>();
private final KeyLocker<String> createLock = new KeyLocker<>();
SynchronousReplicationWALProvider(WALProvider provider,
SynchronousReplicationPeerProvider peerProvider) {
this.provider = provider;
this.peerProvider = peerProvider;
}
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
provider.init(factory, conf, providerId);
this.conf = conf;
this.factory = factory;
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
private String getLogPrefix(String peerId) {
return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
Path remoteWALDirPath = new Path(remoteWALDir);
FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
DualAsyncFSWAL wal = peerId2WAL.get(peerId);
if (wal != null) {
return wal;
}
Lock lock = createLock.acquireLock(peerId);
try {
wal = peerId2WAL.get(peerId);
if (wal == null) {
wal = createWAL(peerId, remoteWALDir);
peerId2WAL.put(peerId, wal);
wal.init();
}
return wal;
} finally {
lock.unlock();
}
}
@Override
public WAL getWAL(RegionInfo region) throws IOException {
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
return getWAL(pair.getFirst(), pair.getSecond());
} else {
return provider.getWAL(region);
}
}
private Stream<WAL> getWALStream() {
return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream());
}
@Override
public List<WAL> getWALs() {
return getWALStream().collect(Collectors.toList());
}
@Override
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (DualAsyncFSWAL wal : peerId2WAL.values()) {
try {
wal.shutdown();
} catch (IOException e) {
LOG.error("Shutdown WAL failed", e);
failure = e;
}
}
provider.shutdown();
if (failure != null) {
throw failure;
}
}
@Override
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (DualAsyncFSWAL wal : peerId2WAL.values()) {
try {
wal.close();
} catch (IOException e) {
LOG.error("Close WAL failed", e);
failure = e;
}
}
provider.close();
if (failure != null) {
throw failure;
}
}
@Override
public long getNumLogFiles() {
return peerId2WAL.size() + provider.getNumLogFiles();
}
@Override
public long getLogFileSize() {
return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
provider.getLogFileSize();
}
@Override
public void peerRemoved(String peerId) {
WAL wal = peerId2WAL.remove(peerId);
if (wal != null) {
try {
wal.close();
} catch (IOException e) {
LOG.error("Close WAL failed", e);
}
}
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
listeners.add(listener);
provider.addWALActionsListener(listener);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -132,13 +133,10 @@ public class WALFactory {
}
}
WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId)
throws IOException {
LOG.info("Instantiating WALProvider of type " + clazz);
static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
LOG.info("Instantiating WALProvider of type {}", clazz);
try {
final WALProvider result = clazz.getDeclaredConstructor().newInstance();
result.init(this, conf, providerId);
return result;
return clazz.newInstance();
} catch (Exception e) {
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", e);
@ -150,9 +148,10 @@ public class WALFactory {
* instantiate a provider from a config property. requires conf to have already been set (as well
* as anything the provider might need to read).
*/
WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException {
Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
WALProvider provider = createProvider(clazz, providerId);
private WALProvider getProvider(String key, String defaultValue, String providerId)
throws IOException {
WALProvider provider = createProvider(getProviderClass(key, defaultValue));
provider.init(this, conf, providerId);
provider.addWALActionsListener(new MetricsWAL());
return provider;
}
@ -183,6 +182,26 @@ public class WALFactory {
}
}
/**
* A temporary constructor for testing synchronous replication.
* <p>
* Remove it once we can integrate the synchronous replication logic in RS.
*/
@VisibleForTesting
WALFactory(Configuration conf, String factoryId, SynchronousReplicationPeerProvider peerProvider)
throws IOException {
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
AbstractFSWALProvider.Reader.class);
this.conf = conf;
this.factoryId = factoryId;
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
this.provider = new SynchronousReplicationWALProvider(provider, peerProvider);
this.provider.addWALActionsListener(new MetricsWAL());
this.provider.init(this, conf, null);
}
/**
* Shutdown all WALs and clean up any underlying storage.
* Use only when you will not need to replay and edits that have gone to any wals from this

View File

@ -127,13 +127,21 @@ public class WALKeyImpl implements WALKey {
}
@VisibleForTesting
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
long logSeqNum,
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, null);
}
@VisibleForTesting
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, UUID clusterId, MultiVersionConcurrencyControl mvcc) {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, null);
}
// TODO: Fix being able to pass in sequenceid.

View File

@ -100,6 +100,7 @@ public class TestCompactionPolicy {
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
hlog = new FSHLog(fs, basedir, logName, conf);
hlog.init();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
region.close();

View File

@ -102,6 +102,56 @@ public class TestFailedAppendAndSync {
return name.getMethodName();
}
// Dodgy WAL. Will throw exceptions when flags set.
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
volatile boolean throwAppendException = false;
final AtomicLong rolls = new AtomicLong(0);
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
byte[][] regions = super.rollWriter(force);
rolls.getAndIncrement();
return regions;
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}
@Override
public long getLength() {
return w.getLength();
}
};
}
}
/**
* Reproduce locking up that happens when we get an exceptions appending and syncing.
* See HBASE-14317.
@ -110,57 +160,6 @@ public class TestFailedAppendAndSync {
*/
@Test
public void testLockupAroundBadAssignSync() throws IOException {
final AtomicLong rolls = new AtomicLong(0);
// Dodgy WAL. Will throw exceptions when flags set.
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
volatile boolean throwAppendException = false;
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
byte [][] regions = super.rollWriter(force);
rolls.getAndIncrement();
return regions;
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}
@Override
public long getLength() {
return w.getLength();
}
};
}
}
// Make up mocked server and services.
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(CONF);
@ -172,6 +171,7 @@ public class TestFailedAppendAndSync {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL);
logRoller.start();
@ -192,7 +192,7 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
fail();
}
long rollsCount = rolls.get();
long rollsCount = dodgyWAL.rolls.get();
try {
dodgyWAL.throwAppendException = true;
dodgyWAL.throwSyncException = false;
@ -202,8 +202,10 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
threwOnAppend = true;
}
while (rollsCount == rolls.get()) Threads.sleep(100);
rollsCount = rolls.get();
while (rollsCount == dodgyWAL.rolls.get()) {
Threads.sleep(100);
}
rollsCount = dodgyWAL.rolls.get();
// When we get to here.. we should be ok. A new WAL has been put in place. There were no
// appends to sync. We should be able to continue.
@ -217,14 +219,16 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
threwOnBoth = true;
}
while (rollsCount == rolls.get()) Threads.sleep(100);
while (rollsCount == dodgyWAL.rolls.get()) {
Threads.sleep(100);
}
// Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able
// to just continue.
// So, should be no abort at this stage. Verify.
Mockito.verify(server, Mockito.atLeast(0)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
Mockito.verify(server, Mockito.atLeast(0)).abort(Mockito.anyString(),
Mockito.any(Throwable.class));
try {
dodgyWAL.throwAppendException = false;
dodgyWAL.throwSyncException = true;
@ -239,8 +243,8 @@ public class TestFailedAppendAndSync {
// happens. If it don't we'll timeout the whole test. That is fine.
while (true) {
try {
Mockito.verify(server, Mockito.atLeast(1)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
Mockito.verify(server, Mockito.atLeast(1)).abort(Mockito.anyString(),
Mockito.any(Throwable.class));
break;
} catch (WantedButNotInvoked t) {
Threads.sleep(1);

View File

@ -216,7 +216,6 @@ public class TestHRegion {
protected static HBaseTestingUtility TEST_UTIL;
public static Configuration CONF ;
private String dir;
private static FileSystem FILESYSTEM;
private final int MAX_VERSIONS = 2;
// Test names
@ -238,7 +237,6 @@ public class TestHRegion {
@Before
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
FILESYSTEM = TEST_UTIL.getTestFileSystem();
CONF = TEST_UTIL.getConfiguration();
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
method = name.getMethodName();
@ -341,6 +339,7 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
faultyLog.init();
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
COLUMN_FAMILY_BYTES);
@ -352,7 +351,6 @@ public class TestHRegion {
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
boolean threwIOE = false;
try {
region.put(put);
@ -389,6 +387,7 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + testName);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
hLog.init();
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES);
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
@ -1164,6 +1163,7 @@ public class TestHRegion {
FailAppendFlushMarkerWAL wal =
new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
try {
@ -1195,7 +1195,7 @@ public class TestHRegion {
wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
region.put(put);
@ -2447,6 +2447,7 @@ public class TestHRegion {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
hLog.init();
// This chunk creation is done throughout the code base. Do we want to move it into core?
// It is missing from this test. W/o it we NPE.
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
@ -2499,9 +2500,9 @@ public class TestHRegion {
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
// Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
// do below format (from Mockito doc).
Mockito.doAnswer(new Answer() {
Mockito.doAnswer(new Answer<Void>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
public Void answer(InvocationOnMock invocation) throws Throwable {
MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
mb.addOperationsFromCP(0, new Mutation[]{addPut});
return null;
@ -3795,9 +3796,12 @@ public class TestHRegion {
boolean previousEmpty = res.isEmpty();
res.clear();
InternalScanner scanner = region.getScanner(scan);
while (scanner.next(res))
;
try (InternalScanner scanner = region.getScanner(scan)) {
boolean moreRows;
do {
moreRows = scanner.next(res);
} while (moreRows);
}
if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
assertEquals("i=" + i, expectedCount, res.size());
long timestamp = res.get(0).getTimestamp();
@ -3893,7 +3897,7 @@ public class TestHRegion {
region.put(put);
numPutsFinished++;
if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
System.out.println("put iteration = " + numPutsFinished);
LOG.debug("put iteration = {}", numPutsFinished);
Delete delete = new Delete(row, (long) numPutsFinished - 30);
region.delete(delete);
}

View File

@ -27,25 +27,18 @@ import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A test similar to TestHRegion, but with in-memory flush families.
* Also checks wal truncation after in-memory compaction.
*/
@Category({VerySlowRegionServerTests.class, LargeTests.class})
@SuppressWarnings("deprecation")
public class TestHRegionWithInMemoryFlush extends TestHRegion {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHRegionWithInMemoryFlush.class);
// Do not spin up clusters in here. If you need to spin up a cluster, do it
// over in TestHRegionOnCluster.
private static final Logger LOG = LoggerFactory.getLogger(TestHRegionWithInMemoryFlush.class);
/**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Scan;
@ -36,7 +37,6 @@ import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -81,12 +81,12 @@ public class TestRegionIncrement {
}
private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
TEST_UTIL.getDataTestDir().toString(), conf);
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
TEST_UTIL.getDataTestDir().toString(), conf);
wal.init();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
}
private void closeRegion(final HRegion region) throws IOException {
@ -170,8 +170,6 @@ public class TestRegionIncrement {
/**
* Have each thread update its own Cell. Avoid contention with another thread.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testUnContendedSingleCellIncrement()
@ -209,13 +207,9 @@ public class TestRegionIncrement {
/**
* Have each thread update its own Cell. Avoid contention with another thread.
* This is
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testContendedAcrossCellsIncrement()
throws IOException, InterruptedException {
public void testContendedAcrossCellsIncrement() throws IOException, InterruptedException {
final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
long startTime = System.currentTimeMillis();

View File

@ -215,6 +215,7 @@ public class TestWALLockup {
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
dodgyWAL.init();
Path originalWAL = dodgyWAL.getCurrentFileName();
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);

View File

@ -1097,6 +1097,7 @@ public abstract class AbstractTestWALReplay {
private MockWAL createMockWAL() throws IOException {
MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
wal.init();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -41,7 +42,7 @@ import org.apache.hadoop.hbase.wal.WALProvider;
/**
* Helper class for testing protobuf log.
*/
final class ProtobufLogTestHelper {
public final class ProtobufLogTestHelper {
private ProtobufLogTestHelper() {
}
@ -54,17 +55,22 @@ final class ProtobufLogTestHelper {
return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build();
}
private static WAL.Entry generateEdit(int i, RegionInfo hri, TableName tableName, byte[] row,
int columnCount, long timestamp, MultiVersionConcurrencyControl mvcc) {
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
HConstants.DEFAULT_CLUSTER_ID, mvcc);
WALEdit edit = new WALEdit();
int prefix = i;
IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
.map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add);
return new WAL.Entry(key, edit);
}
public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName,
int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
RegionInfo hri = toRegionInfo(tableName);
for (int i = 0; i < recordCount; i++) {
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
int prefix = i;
IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
.map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add);
writer.append(new WAL.Entry(key, edit));
writer.append(generateEdit(i, hri, tableName, row, columnCount, timestamp, null));
}
writer.sync(false);
if (withTrailer) {
@ -72,14 +78,24 @@ final class ProtobufLogTestHelper {
}
}
public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName,
int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
public static void doWrite(WAL wal, RegionInfo hri, TableName tableName, int columnCount,
int recordCount, byte[] row, long timestamp, MultiVersionConcurrencyControl mvcc)
throws IOException {
for (int i = 0; i < recordCount; i++) {
WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc);
wal.append(hri, entry.getKey(), entry.getEdit(), true);
}
wal.sync();
}
public static void doRead(ProtobufLogReader reader, boolean withTrailer, RegionInfo hri,
TableName tableName, int columnCount, int recordCount, byte[] row, long timestamp)
throws IOException {
if (withTrailer) {
assertNotNull(reader.trailer);
} else {
assertNull(reader.trailer);
}
RegionInfo hri = toRegionInfo(tableName);
for (int i = 0; i < recordCount; ++i) {
WAL.Entry entry = reader.next();
assertNotNull(entry);
@ -96,4 +112,10 @@ final class ProtobufLogTestHelper {
}
assertNull(reader.next());
}
public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName,
int columnCount, int recordCount, byte[] row, long timestamp) throws IOException {
doRead(reader, withTrailer, toRegionInfo(tableName), tableName, columnCount, recordCount, row,
timestamp);
}
}

View File

@ -67,8 +67,10 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP, CHANNEL_CLASS);
AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS);
wal.init();
return wal;
}
@Override
@ -76,15 +78,16 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP, CHANNEL_CLASS) {
AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
};
wal.init();
return wal;
}
}

View File

@ -66,7 +66,9 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
AsyncFSWAL wal = new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS);
wal.init();
return wal;
}
}

View File

@ -77,8 +77,7 @@ public class TestCombinedAsyncWriter {
CHANNEL_CLASS = NioSocketChannel.class;
UTIL.startMiniDFSCluster(3);
UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS());
WALS =
new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName());
WALS = new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName());
}
@AfterClass

View File

@ -74,8 +74,10 @@ public class TestFSHLog extends AbstractTestFSWAL {
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix);
FSHLog wal =
new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
wal.init();
return wal;
}
@Override
@ -83,8 +85,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix) {
FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists,
prefix, suffix) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
@ -92,6 +94,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
super.atHeadOfRingBufferEventHandlerAppend();
}
};
wal.init();
return wal;
}
@Test
@ -100,6 +104,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
final String name = this.name.getMethodName();
FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
CONF, null, true, null, null);
log.init();
try {
Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
ringBufferEventHandlerField.setAccessible(true);
@ -142,7 +147,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
try (FSHLog log =
new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF,
null, true, null, null)) {
log.init();
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)

View File

@ -48,6 +48,7 @@ public class TestWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
wal.init();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);

View File

@ -103,8 +103,6 @@ public class IOTestProvider implements WALProvider {
this.factory = factory;
this.conf = conf;
this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
}
@Override

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSynchronousReplicationWALProvider {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static String REMOTE_WAL_DIR = "/RemoteWAL";
private static TableName TABLE = TableName.valueOf("table");
private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
private static WALFactory FACTORY;
private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
if (info.getTable().equals(TABLE)) {
return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
} else {
return Optional.empty();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir);
UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
}
@AfterClass
public static void tearDownAfterClass() throws IOException {
FACTORY.close();
UTIL.shutdownMiniDFSCluster();
}
private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
int recordCount = 100;
int columnCount = 10;
byte[] row = Bytes.toBytes("testRow");
long timestamp = System.currentTimeMillis();
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
mvcc);
Path localFile = wal.getCurrentFileName();
Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
try (ProtobufLogReader reader =
(ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
try (ProtobufLogReader reader =
(ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
wal.rollWriter();
DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
}
@Override
public String explainFailure() throws Exception {
StringBuilder sb = new StringBuilder();
if (!dfs.isFileClosed(localFile)) {
sb.append(localFile + " has not been closed yet.");
}
if (!dfs.isFileClosed(remoteFile)) {
sb.append(remoteFile + " has not been closed yet.");
}
return sb.toString();
}
});
try (ProtobufLogReader reader =
(ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
try (ProtobufLogReader reader =
(ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
}
@Test
public void test() throws Exception {
WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
assertEquals(2, FACTORY.getWALs().size());
testReadWrite(wal);
SynchronousReplicationWALProvider walProvider =
(SynchronousReplicationWALProvider) FACTORY.getWALProvider();
walProvider.peerRemoved(PEER_ID);
assertEquals(1, FACTORY.getWALs().size());
}
}