HBASE-20147 Serial replication will be stuck if we create a table with serial replication but add it to a peer after there are region moves

This commit is contained in:
zhangduo 2018-03-21 21:03:14 +08:00
parent aadb2f0837
commit 64061f896f
16 changed files with 825 additions and 264 deletions

View File

@ -489,16 +489,17 @@ public class AsyncMetaTableAccessor {
QueryType type) {
return tableName.map((table) -> {
switch (type) {
case REGION:
byte[] startRow = new byte[table.getName().length + 2];
System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
startRow[startRow.length - 2] = HConstants.DELIMITER;
startRow[startRow.length - 1] = HConstants.DELIMITER;
return startRow;
case ALL:
case TABLE:
default:
return table.getName();
case REGION:
case REPLICATION:
byte[] startRow = new byte[table.getName().length + 2];
System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
startRow[startRow.length - 2] = HConstants.DELIMITER;
startRow[startRow.length - 1] = HConstants.DELIMITER;
return startRow;
case ALL:
case TABLE:
default:
return table.getName();
}
});
}
@ -512,20 +513,21 @@ public class AsyncMetaTableAccessor {
return tableName.map((table) -> {
final byte[] stopRow;
switch (type) {
case REGION:
stopRow = new byte[table.getName().length + 3];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 3] = ' ';
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
break;
case ALL:
case TABLE:
default:
stopRow = new byte[table.getName().length + 1];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 1] = ' ';
break;
case REGION:
case REPLICATION:
stopRow = new byte[table.getName().length + 3];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 3] = ' ';
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
break;
case ALL:
case TABLE:
default:
stopRow = new byte[table.getName().length + 1];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 1] = ' ';
break;
}
return stopRow;
});

View File

@ -192,7 +192,8 @@ public class MetaTableAccessor {
public enum QueryType {
ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY),
REGION(HConstants.CATALOG_FAMILY),
TABLE(HConstants.TABLE_FAMILY);
TABLE(HConstants.TABLE_FAMILY),
REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
private final byte[][] families;
@ -1168,8 +1169,9 @@ public class MetaTableAccessor {
final List<T> results = new ArrayList<>();
@Override
public boolean visit(Result r) throws IOException {
if (r == null || r.isEmpty()) return true;
add(r);
if (r != null && !r.isEmpty()) {
add(r);
}
return true;
}
@ -2108,6 +2110,24 @@ public class MetaTableAccessor {
}
}
public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn,
TableName tableName) throws IOException {
List<Pair<String, Long>> list = new ArrayList<>();
scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, r -> {
byte[] value =
r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER);
if (value == null) {
return true;
}
long lastBarrier = Bytes.toLong(value);
String encodedRegionName = RegionInfo.encodeRegionName(r.getRow());
list.add(Pair.newPair(encodedRegionName, lastBarrier));
return true;
});
return list;
}
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;

View File

@ -377,7 +377,11 @@ enum PeerModificationState {
PRE_PEER_MODIFICATION = 1;
UPDATE_PEER_STORAGE = 2;
REFRESH_PEER_ON_RS = 3;
POST_PEER_MODIFICATION = 4;
SERIAL_PEER_REOPEN_REGIONS = 4;
SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID = 5;
SERIAL_PEER_SET_PEER_ENABLED = 6;
SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS = 7;
POST_PEER_MODIFICATION = 8;
}
message PeerModificationStateData {
@ -415,4 +419,5 @@ message AddPeerStateData {
message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
optional ReplicationPeer old_peer_config = 2;
}

View File

@ -78,6 +78,14 @@ public interface ReplicationQueueStorage {
*/
long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
/**
* Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up
* a serial replication peer.
* @param peerId peer id
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/
void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver

View File

@ -111,13 +111,11 @@ public final class ReplicationUtils {
return true;
}
public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) {
public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
ReplicationPeerConfig rpc2) {
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
return false;
}
if (rpc1.isSerial() != rpc2.isSerial()) {
return false;
}
if (rpc1.replicateAllUserTables()) {
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());

View File

@ -203,6 +203,24 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
List<ZKUtilOp> listOfOps) throws KeeperException {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* multiOrSequential() method said, if received a NodeExistsException, all operations will
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* because only need to make sure that update file position and sequence id atomically.
*/
ZKUtil.createWithParents(zookeeper, path);
// Persist the max sequence id of region to zookeeper.
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
}
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
@ -213,23 +231,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
ZKUtil.positionToByteArray(position)));
}
// Persist the max sequence id(s) of regions for serial replication atomically.
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* multiOrSequential() method said, if received a NodeExistsException, all operations will
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* because only need to make sure that update file position and sequence id atomically.
*/
ZKUtil.createWithParents(zookeeper, path);
// Persist the max sequence id of region to zookeeper.
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
if (!listOfOps.isEmpty()) {
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
}
addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
@ -256,6 +259,19 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
return HConstants.NO_SEQNUM;
}
@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {
try {
List<ZKUtilOp> listOfOps = new ArrayList<>();
addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId +
", lastSeqIds.size=" + lastSeqIds.size(), e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {

View File

@ -56,6 +56,21 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
return PeerOperationType.ADD;
}
@Override
protected boolean reopenRegionsAfterRefresh() {
return true;
}
@Override
protected boolean enablePeerBeforeFinish() {
return enabled;
}
@Override
protected ReplicationPeerConfig getNewPeerConfig() {
return peerConfig;
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
@ -68,11 +83,13 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled);
env.getReplicationPeerManager().addPeer(peerId, peerConfig,
peerConfig.isSerial() ? false : enabled);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId,
peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();

View File

@ -18,11 +18,28 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +55,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
protected ModifyPeerProcedure() {
}
@ -73,6 +92,114 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
ProcedurePrepareLatch.releaseLatch(latch, this);
}
/**
* Implementation class can override this method. The default return value is false which means we
* will jump to POST_PEER_MODIFICATION and finish the procedure. If returns true, we will jump to
* SERIAL_PEER_REOPEN_REGIONS.
*/
protected boolean reopenRegionsAfterRefresh() {
return false;
}
/**
* The implementation class should override this method if the procedure may enter the serial
* related states.
*/
protected boolean enablePeerBeforeFinish() {
throw new UnsupportedOperationException();
}
private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, type, sn))
.toArray(RefreshPeerProcedure[]::new));
}
protected ReplicationPeerConfig getOldPeerConfig() {
return null;
}
protected ReplicationPeerConfig getNewPeerConfig() {
throw new UnsupportedOperationException();
}
private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
Stream<TableDescriptor> stream = env.getMasterServices().getTableDescriptors().getAll().values()
.stream().filter(TableDescriptor::hasGlobalReplicationScope)
.filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName()));
}
return stream;
}
private void reopenRegions(MasterProcedureEnv env) throws IOException {
Stream<TableDescriptor> stream = getTables(env);
TableStateManager tsm = env.getMasterServices().getTableStateManager();
stream.filter(td -> {
try {
return tsm.getTableState(td.getTableName()).isEnabled();
} catch (TableStateNotFoundException e) {
return false;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).forEach(td -> {
try {
addChildProcedure(env.getAssignmentManager().createReopenProcedures(
env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}
private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
Stream<TableDescriptor> stream = getTables(env);
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
stream.forEach(td -> {
try {
if (tsm.getTableState(td.getTableName()).isEnabled()) {
for (Pair<String, Long> name2Barrier : MetaTableAccessor
.getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) {
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
queueStorage);
}
} else {
for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) {
long maxSequenceId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
}
}
} catch (IOException | ReplicationException e) {
throw new RuntimeException(e);
}
});
if (!lastSeqIds.isEmpty()) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
}
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
@ -104,9 +231,42 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn))
.toArray(RefreshPeerProcedure[]::new));
refreshPeer(env, getPeerOperationType());
setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_REOPEN_REGIONS:
try {
reopenRegions(env);
} catch (Exception e) {
LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
setLastSequenceIdForSerialPeer(env);
} catch (Exception e) {
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
}
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_SET_PEER_ENABLED:
try {
env.getReplicationPeerManager().enablePeer(peerId);
} catch (ReplicationException e) {
LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
peerId, e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
refreshPeer(env, PeerOperationType.ENABLE);
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:

View File

@ -85,7 +85,7 @@ public class ReplicationPeerManager {
}
}
public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException, ReplicationException {
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
@ -109,43 +109,47 @@ public class ReplicationPeerManager {
return desc;
}
public void preRemovePeer(String peerId) throws DoNotRetryIOException {
void preRemovePeer(String peerId) throws DoNotRetryIOException {
checkPeerExists(peerId);
}
public void preEnablePeer(String peerId) throws DoNotRetryIOException {
void preEnablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
}
}
public void preDisablePeer(String peerId) throws DoNotRetryIOException {
void preDisablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (!desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
}
}
public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
/**
* Return the old peer description. Can never be null.
*/
ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
checkPeerConfig(peerConfig);
ReplicationPeerDescription desc = checkPeerExists(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
throw new DoNotRetryIOException(
"Changing the cluster key on an existing peer is not allowed. Existing key '" +
oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
peerConfig.getClusterKey() + "'");
"Changing the cluster key on an existing peer is not allowed. Existing key '" +
oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
peerConfig.getClusterKey() + "'");
}
if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
oldPeerConfig.getReplicationEndpointImpl())) {
throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '" +
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
"on an existing peer is not allowed. Existing class '" +
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
}
return desc;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@ -216,7 +220,7 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@ -340,7 +344,7 @@ public class ReplicationPeerManager {
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
@ -348,7 +352,7 @@ public class ReplicationPeerManager {
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
/**

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,6 +42,10 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
private ReplicationPeerConfig peerConfig;
private ReplicationPeerConfig oldPeerConfig;
private boolean enabled;
public UpdatePeerConfigProcedure() {
}
@ -53,22 +59,54 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
return PeerOperationType.UPDATE_CONFIG;
}
@Override
protected boolean reopenRegionsAfterRefresh() {
// If we remove some tables from the peer config then we do not need to enter the extra states
// for serial replication. Could try to optimize later since it is not easy to determine this...
return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
!ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig));
}
@Override
protected boolean enablePeerBeforeFinish() {
// do not need to test reopenRegionsAfterRefresh since we can only enter here if
// reopenRegionsAfterRefresh returns true.
return enabled;
}
@Override
protected ReplicationPeerConfig getOldPeerConfig() {
return oldPeerConfig;
}
@Override
protected ReplicationPeerConfig getNewPeerConfig() {
return peerConfig;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
ReplicationPeerDescription desc =
env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
oldPeerConfig = desc.getPeerConfig();
enabled = desc.isEnabled();
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
if (enabled && reopenRegionsAfterRefresh()) {
env.getReplicationPeerManager().disablePeer(peerId);
}
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
@ -79,14 +117,23 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(UpdatePeerConfigStateData.newBuilder()
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build());
UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder()
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
if (oldPeerConfig != null) {
builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig));
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
peerConfig = ReplicationPeerConfigUtil
.convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig());
UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class);
peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
if (data.hasOldPeerConfig()) {
oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig());
} else {
oldPeerConfig = null;
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
@ -99,19 +100,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
ReplicationPeerImpl peer = null;
ReplicationPeerConfig oldConfig = null;
PeerState oldState = null;
boolean success = false;
try {
peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
peer = peers.getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldConfig = peer.getPeerConfig();
ReplicationPeerConfig newConfig =
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
oldState = peer.getPeerState();
ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
// also need to refresh peer state here. When updating a serial replication peer we may
// disable it first and then enable it.
PeerState newState = peers.refreshPeerState(peerId);
// RS need to start work with the new replication config change
if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) ||
oldConfig.isSerial() != newConfig.isSerial() ||
(oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
@ -119,6 +127,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
if (!success && peer != null) {
// Reset peer config if refresh source failed
peer.setPeerConfig(oldConfig);
peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
peerLock.unlock();
}

View File

@ -510,7 +510,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
if (wals != null && !wals.first().equals(log)) {
if (wals != null) {
cleanOldLogs(wals, log, inclusive, queueId);
}
}
@ -755,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
Map<String, Map<String, NavigableSet<String>>> getWALs() {
public Map<String, Map<String, NavigableSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}

View File

@ -157,4 +157,12 @@ class WALEntryBatch {
public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}
@Override
public String toString() {
return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
endOfFile + "]";
}
}

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
/**
* Base class for testing serial replication.
*/
public class SerialReplicationTestBase {
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected static String PEER_ID = "1";
protected static byte[] CF = Bytes.toBytes("CF");
protected static byte[] CQ = Bytes.toBytes("CQ");
protected static FileSystem FS;
protected static Path LOG_DIR;
protected static WALProvider.Writer WRITER;
@Rule
public final TestName name = new TestName();
protected Path logPath;
public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
private static final UUID PEER_UUID = UUID.randomUUID();
@Override
public UUID getPeerUUID() {
return PEER_UUID;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
WRITER.append(entry);
}
WRITER.sync(false);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return true;
}
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
// disable balancer
UTIL.getAdmin().balancerSwitch(false, true);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
rollAllWALs();
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return rs.getRegion(region.getEncodedName()) != null;
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
}
protected static void rollAllWALs() throws Exception {
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
}
@Override
public String explainFailure() throws Exception {
return "Log roll has not finished yet";
}
});
}
protected final void setupWALWriter() throws IOException {
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
}
protected final void waitUntilReplicationDone(int expectedEntries) throws Exception {
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= expectedEntries;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
}
protected final void addPeer(boolean enabled) throws IOException {
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
.build(),
enabled);
}
protected final void checkOrder(int expectedEntries) throws IOException {
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
count++;
}
assertEquals(expectedEntries, count);
}
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/**
* Testcase for HBASE-20147.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
setupWALWriter();
}
// make sure that we will start replication for the sequence id after move, that's what we want to
// test here.
private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
moveRegion(region, rs);
rollAllWALs();
}
private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws Exception {
Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ReplicationSourceManager manager =
((Replication) rs.getReplicationSourceService()).getReplicationManager();
return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1;
}
@Override
public String explainFailure() throws Exception {
return "Still not replicated to the current WAL file yet";
}
});
}
@Test
public void testAddPeer() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
moveRegionAndArchiveOldWals(region, rs);
addPeer(true);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(100);
checkOrder(100);
}
@Test
public void testChangeToSerial() throws Exception {
ReplicationPeerConfig peerConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
moveRegionAndArchiveOldWals(region, rs);
waitUntilReplicationDone(100);
waitUntilReplicatedToTheCurrentWALFile(srcRs);
UTIL.getAdmin().disableReplicationPeer(PEER_ID);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(200);
checkOrder(200);
}
@Test
public void testAddToSerialPeer() throws Exception {
ReplicationPeerConfig peerConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
.setReplicateAllUserTables(false).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
moveRegionAndArchiveOldWals(region, rs);
waitUntilReplicatedToTheCurrentWALFile(rs);
UTIL.getAdmin().disableReplicationPeer(PEER_ID);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig)
.setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(100);
checkOrder(100);
}
@Test
public void testDisabledTable() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().disableTable(tableName);
rollAllWALs();
addPeer(true);
UTIL.getAdmin().enableTable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
waitUntilReplicationDone(100);
checkOrder(100);
}
}

View File

@ -23,211 +23,49 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSerialReplication {
@Category({ ReplicationTests.class, MediumTests.class })
public class TestSerialReplication extends SerialReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplication.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static byte[] CF = Bytes.toBytes("CF");
private static byte[] CQ = Bytes.toBytes("CQ");
private static FileSystem FS;
private static Path LOG_DIR;
private static WALProvider.Writer WRITER;
public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
private static final UUID PEER_UUID = UUID.randomUUID();
@Override
public UUID getPeerUUID() {
return PEER_UUID;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
WRITER.append(entry);
}
WRITER.sync(false);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return true;
}
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
// disable balancer
UTIL.getAdmin().balancerSwitch(false, true);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Rule
public final TestName name = new TestName();
private Path logPath;
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
setupWALWriter();
// add in disable state, so later when enabling it all sources will start push together.
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
.build(),
false);
}
@After
public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
}
@Override
public String explainFailure() throws Exception {
return "Log roll has not finished yet";
}
});
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return rs.getRegion(region.getEncodedName()) != null;
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
addPeer(false);
}
private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception {
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= expectedEntries;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
waitUntilReplicationDone(expectedEntries);
}
@Test
@ -251,22 +89,7 @@ public class TestSerialReplication {
}
}
enablePeerAndWaitUntilReplicationDone(200);
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
count++;
}
assertEquals(200, count);
}
checkOrder(200);
}
@Test