HBASE-19079 Support setting up two clusters with A and S stat

This commit is contained in:
zhangduo 2018-04-10 22:35:19 +08:00
parent c7d1085fa2
commit 2389c09d75
16 changed files with 400 additions and 167 deletions

View File

@ -68,8 +68,9 @@ public class ReplicationPeerManager {
private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>>
allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY,
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,

View File

@ -171,7 +171,7 @@ public class TransitPeerSyncReplicationStateProcedure
}
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
// TODO: replay remote wal when transiting from S to DA.
addChildProcedure(new RecoverStandbyProcedure(peerId));
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
return Flow.HAS_MORE_STATE;
case REOPEN_ALL_REGIONS_IN_PEER:

View File

@ -38,6 +38,8 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
private final Path remoteWalDir;
private volatile boolean skipRemoteWal = false;
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
@ -51,6 +53,9 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
if (skipRemoteWal) {
return localWriter;
}
AsyncWriter remoteWriter;
boolean succ = false;
try {
@ -64,4 +69,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
localWriter);
}
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
// dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we
// need to write a close marker when closing a region, and if it fails, the whole rs will abort.
// So here we need to skip the creation of remote writer and make it possible to write the region
// close marker.
public void skipRemoteWal() {
this.skipRemoteWal = true;
}
}

View File

@ -20,11 +20,13 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.NavigableMap;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -34,7 +36,9 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@ -179,4 +183,23 @@ public class WALUtil {
return conf.getLong("hbase.regionserver.hlog.blocksize",
CommonFSUtils.getDefaultBlockSize(fs, dir) * 2);
}
public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
ArrayList<Cell> cells = edit.getCells();
int size = cells.size();
int newSize = 0;
for (int i = 0; i < size; i++) {
Cell cell = mapper.apply(cells.get(i));
if (cell != null) {
cells.set(newSize, cell);
newSize++;
}
}
for (int i = size - 1; i >= newSize; i--) {
cells.remove(i);
}
if (newSize < size / 2) {
cells.trimToSize();
}
}
}

View File

@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A {@link WALEntryFilter} which contains multiple filters and applies them
@ -82,22 +82,16 @@ public class ChainWALEntryFilter implements WALEntryFilter {
if (entry == null || cellFilters.length == 0) {
return;
}
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
for (WALCellFilter filter : cellFilters) {
cell = filter.filterCell(entry, cell);
if (cell != null) {
cells.set(i, cell);
} else {
cells.remove(i);
break;
}
WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c));
}
private Cell filterCell(Entry entry, Cell cell) {
for (WALCellFilter filter : cellFilters) {
cell = filter.filterCell(entry, cell);
if (cell == null) {
break;
}
}
if (cells.size() < size / 2) {
cells.trimToSize();
}
return cell;
}
}

View File

@ -21,21 +21,23 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -129,20 +131,31 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
}
}
// return whether we should include this entry.
private boolean filter(Entry entry) {
WALEdit edit = entry.getEdit();
WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c);
return !edit.isEmpty();
}
private List<Entry> readWALEntries(Reader reader) throws IOException {
List<Entry> entries = new ArrayList<>();
if (reader == null) {
return entries;
}
long size = 0;
Entry entry = reader.next();
while (entry != null) {
entries.add(entry);
size += entry.getEdit().heapSize();
if (size > batchSize) {
for (;;) {
Entry entry = reader.next();
if (entry == null) {
break;
}
entry = reader.next();
if (filter(entry)) {
entries.add(entry);
size += entry.getEdit().heapSize();
if (size > batchSize) {
break;
}
}
}
return entries;
}

View File

@ -54,8 +54,10 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
Pair<SyncReplicationState, SyncReplicationState> states =
peer.getSyncReplicationStateAndNewState();
if (states.getFirst() == SyncReplicationState.ACTIVE &&
states.getSecond() == SyncReplicationState.NONE) {
if ((states.getFirst() == SyncReplicationState.ACTIVE &&
states.getSecond() == SyncReplicationState.NONE) ||
(states.getFirst() == SyncReplicationState.DOWNGRADE_ACTIVE &&
states.getSecond() == SyncReplicationState.ACTIVE)) {
return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
} else {
return Optional.empty();

View File

@ -136,8 +136,16 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
walCopy = wal;
if (walCopy == null) {
walCopy = createWAL();
boolean succ = false;
try {
walCopy.init();
succ = true;
} finally {
if (!succ) {
walCopy.close();
}
}
wal = walCopy;
walCopy.init();
}
}
}

View File

@ -69,7 +69,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final WALProvider provider;
private SyncReplicationPeerInfoProvider peerInfoProvider =
new DefaultSyncReplicationPeerInfoProvider();
new DefaultSyncReplicationPeerInfoProvider();
private WALFactory factory;
@ -83,7 +83,11 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private AtomicBoolean initialized = new AtomicBoolean(false);
private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>();
// when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
// the peer yet. When getting WAL from this map the caller should know that it should not use
// DualAsyncFSWAL any more.
private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL =
new ConcurrentHashMap<>();
private final KeyLocker<String> createLock = new KeyLocker<>();
@ -123,18 +127,27 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
DualAsyncFSWAL wal = peerId2WAL.get(peerId);
if (wal != null) {
return wal;
Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
if (opt != null) {
return opt.orElse(null);
}
Lock lock = createLock.acquireLock(peerId);
try {
wal = peerId2WAL.get(peerId);
if (wal == null) {
wal = createWAL(peerId, remoteWALDir);
peerId2WAL.put(peerId, wal);
wal.init();
opt = peerId2WAL.get(peerId);
if (opt != null) {
return opt.orElse(null);
}
DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir);
boolean succ = false;
try {
wal.init();
succ = true;
} finally {
if (!succ) {
wal.close();
}
}
peerId2WAL.put(peerId, Optional.of(wal));
return wal;
} finally {
lock.unlock();
@ -146,18 +159,20 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
if (region == null) {
return provider.getWAL(null);
}
WAL wal = null;
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
return getWAL(pair.getFirst(), pair.getSecond());
} else {
return provider.getWAL(region);
wal = getWAL(pair.getFirst(), pair.getSecond());
}
return wal != null ? wal : provider.getWAL(region);
}
private Stream<WAL> getWALStream() {
return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream());
return Streams.concat(
peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
provider.getWALs().stream());
}
@Override
@ -169,12 +184,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (DualAsyncFSWAL wal : peerId2WAL.values()) {
try {
wal.shutdown();
} catch (IOException e) {
LOG.error("Shutdown WAL failed", e);
failure = e;
for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
if (wal.isPresent()) {
try {
wal.get().shutdown();
} catch (IOException e) {
LOG.error("Shutdown WAL failed", e);
failure = e;
}
}
}
provider.shutdown();
@ -187,12 +204,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
for (DualAsyncFSWAL wal : peerId2WAL.values()) {
try {
wal.close();
} catch (IOException e) {
LOG.error("Close WAL failed", e);
failure = e;
for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
if (wal.isPresent()) {
try {
wal.get().close();
} catch (IOException e) {
LOG.error("Close WAL failed", e);
failure = e;
}
}
}
provider.close();
@ -208,8 +227,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public long getLogFileSize() {
return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
provider.getLogFileSize();
return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
.mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize();
}
private void safeClose(WAL wal) {
@ -231,10 +250,23 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to, int stage) {
// TODO: stage 0
if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE &&
stage == 1) {
safeClose(peerId2WAL.remove(peerId));
if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) {
if (stage == 0) {
Lock lock = createLock.acquireLock(peerId);
try {
Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
if (opt != null) {
opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
} else {
// add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
peerId2WAL.put(peerId, Optional.empty());
}
} finally {
lock.unlock();
}
} else if (stage == 1) {
peerId2WAL.remove(peerId).ifPresent(this::safeClose);
}
}
}

View File

@ -24,12 +24,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@ -37,15 +36,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
/**
* Used in HBase's transaction log (WAL) to represent a collection of edits (Cell/KeyValue objects)
* that came in as a single transaction. All the edits for a given transaction are written out as a
@ -62,7 +60,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC })
public class WALEdit implements HeapSize {
private static final Logger LOG = LoggerFactory.getLogger(WALEdit.class);
// TODO: Get rid of this; see HBASE-8457
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");

View File

@ -135,7 +135,7 @@ public class WALFactory {
static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
LOG.info("Instantiating WALProvider of type {}", clazz);
try {
return clazz.newInstance();
return clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", e);

View File

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -259,9 +260,11 @@ public class TestReplicationAdmin {
TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
String rootDir = "hdfs://srv1:9999/hbase";
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE));
builder.setClusterKey(KEY_ONE);
builder.setRemoteWALDir(rootDir);
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
@ -1081,10 +1084,12 @@ public class TestReplicationAdmin {
// OK
}
String rootDir = "hdfs://srv1:9999/hbase";
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir);
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
@ -1105,13 +1110,18 @@ public class TestReplicationAdmin {
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.STANDBY);
fail("Can't transit cluster state from ACTIVE to STANDBY");
} catch (Exception e) {
// OK
}
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
assertEquals(SyncReplicationState.STANDBY,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);
@ -1121,7 +1131,6 @@ public class TestReplicationAdmin {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
assertEquals(SyncReplicationState.STANDBY,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
fail("Can't transit cluster state from STANDBY to ACTIVE");

View File

@ -104,6 +104,7 @@ public class TestWALDurability {
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
HRegion region = initHRegion(tableName, null, null, customFSLog);
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
@ -118,6 +119,7 @@ public class TestWALDurability {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
customFSLog.init();
region = initHRegion(tableName, null, null, customFSLog);
customFSLog.resetSyncFlag();

View File

@ -17,69 +17,51 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplication {
/**
* Base class for testing sync replication.
*/
public class SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplication.class);
protected static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
private static TableName TABLE_NAME = TableName.valueOf("SyncRep");
protected static byte[] CF = Bytes.toBytes("cf");
private static byte[] CF = Bytes.toBytes("cf");
protected static byte[] CQ = Bytes.toBytes("cq");
private static byte[] CQ = Bytes.toBytes("cq");
private static String PEER_ID = "1";
protected static String PEER_ID = "1";
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
@ -108,18 +90,18 @@ public class TestSyncReplication {
UTIL1.startMiniCluster(3);
UTIL2.startMiniCluster(3);
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
UTIL1.getAdmin().createTable(td);
UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
Path remoteWALDir1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
Path remoteWALDir2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(false)
@ -139,60 +121,47 @@ public class TestSyncReplication {
ZK_UTIL.shutdownMiniZKCluster();
}
@FunctionalInterface
private interface TableAction {
void call(Table table) throws IOException;
}
private void assertDisallow(Table table, TableAction action) throws IOException {
try {
action.call(table);
} catch (DoNotRetryIOException | RetriesExhaustedException e) {
// expected
assertThat(e.getMessage(), containsString("STANDBY"));
}
}
@Test
public void testStandby() throws Exception {
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID);
assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
assertDisallow(table,
t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
assertDisallow(table,
t -> t
.put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
.add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
}
// But we should still allow replication writes
try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
for (int i = 0; i < 100; i++) {
protected final void write(HBaseTestingUtility util, int start, int end) throws IOException {
try (Table table = util.getConnection().getTable(TABLE_NAME)) {
for (int i = start; i < end; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
}
protected final void verify(HBaseTestingUtility util, int start, int end) throws IOException {
try (Table table = util.getConnection().getTable(TABLE_NAME)) {
for (int i = start; i < end; i++) {
assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
}
protected final void verifyThroughRegion(HBaseTestingUtility util, int start, int end)
throws IOException {
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
for (int i = start; i < end; i++) {
assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtility util, int start,
int end) throws IOException {
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
for (int i = start; i < end; i++) {
assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
}
}
protected final void waitUntilReplicationDone(HBaseTestingUtility util, int end)
throws Exception {
// The reject check is in RSRpcService so we can still read through HRegion
HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() {
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
util.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !region.get(new Get(Bytes.toBytes(99))).isEmpty();
return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
}
@Override
@ -200,8 +169,17 @@ public class TestSyncReplication {
return "Replication has not been catched up yet";
}
});
for (int i = 0; i < 100; i++) {
assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
protected final void writeAndVerifyReplication(HBaseTestingUtility util1,
HBaseTestingUtility util2, int start, int end) throws Exception {
write(util1, start, end);
waitUntilReplicationDone(util2, end);
verifyThroughRegion(util2, start, end);
}
protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
return new Path(remoteWALDir, PEER_ID);
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationActive extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
@Test
public void testActive() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 0, 100);
Thread.sleep(2000);
// peer is disabled so no data have been replicated
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// confirm that the data is there after we convert the peer to DA
verify(UTIL2, 0, 100);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
// shutdown the cluster completely
UTIL1.shutdownMiniCluster();
// confirm that we can convert to DA even if the remote slave cluster is down
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
write(UTIL2, 200, 300);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationStandBy.class);
@FunctionalInterface
private interface TableAction {
void call(Table table) throws IOException;
}
private void assertDisallow(Table table, TableAction action) throws IOException {
try {
action.call(table);
} catch (DoNotRetryIOException | RetriesExhaustedException e) {
// expected
assertThat(e.getMessage(), containsString("STANDBY"));
}
}
@Test
public void testStandby() throws Exception {
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
assertDisallow(table,
t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
assertDisallow(table,
t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
assertDisallow(table,
t -> t
.put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
.add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
}
// We should still allow replication writes
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
}
}