HBASE-20576 Check remote WAL directory when creating peer and transiting peer to A

This commit is contained in:
zhangduo 2018-05-15 15:07:40 +08:00
parent 8a264dfc00
commit 603110719d
3 changed files with 110 additions and 39 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
@ -31,6 +32,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -193,9 +194,9 @@ public class ReplicationPeerManager {
} }
/** /**
* @return the old state, and whether the peer is enabled. * @return the old desciption of the peer
*/ */
Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId, ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws DoNotRetryIOException { SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState(); SyncReplicationState fromState = desc.getSyncReplicationState();
@ -204,7 +205,7 @@ public class ReplicationPeerManager {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId); " to " + state + " for peer id=" + peerId);
} }
return Pair.newPair(fromState, desc.isEnabled()); return desc;
} }
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@ -384,6 +385,16 @@ public class ReplicationPeerManager {
"Only support replicated table config for sync replication peer"); "Only support replicated table config for sync replication peer");
} }
} }
Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
if (!remoteWALDir.isAbsolute()) {
throw new DoNotRetryIOException(
"The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
}
URI remoteWALDirUri = remoteWALDir.toUri();
if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() +
" is not qualified, you must provide scheme and authority");
}
} }
/** /**

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@ -31,9 +32,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -113,10 +114,20 @@ public class TransitPeerSyncReplicationStateProcedure
if (cpHost != null) { if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
} }
Pair<SyncReplicationState, Boolean> pair = ReplicationPeerDescription desc =
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
fromState = pair.getFirst(); if (toState == SyncReplicationState.ACTIVE) {
enabled = pair.getSecond(); Path remoteWALDirForPeer =
ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
// check whether the remote wal directory is present
if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
.exists(remoteWALDirForPeer)) {
throw new DoNotRetryIOException(
"The remote WAL directory " + remoteWALDirForPeer + " does not exist");
}
}
fromState = desc.getSyncReplicationState();
enabled = desc.isEnabled();
} }
private void postTransit(MasterProcedureEnv env) throws IOException { private void postTransit(MasterProcedureEnv env) throws IOException {
@ -152,6 +163,36 @@ public class TransitPeerSyncReplicationStateProcedure
} }
} }
private void setNextStateAfterRefreshBegin() {
if (fromState.equals(SyncReplicationState.ACTIVE)) {
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else {
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
}
}
private void setNextStateAfterRefreshEnd() {
if (toState == SyncReplicationState.STANDBY) {
setNextState(
enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
: PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
}
}
private void replayRemoteWAL() {
addChildProcedure(new RecoverStandbyProcedure[] { new RecoverStandbyProcedure(peerId) });
}
@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state) PeerSyncReplicationStateTransitionState state)
@ -191,21 +232,10 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new)); .toArray(RefreshPeerProcedure[]::new));
if (fromState.equals(SyncReplicationState.ACTIVE)) { setNextStateAfterRefreshBegin();
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else {
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
}
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER: case REPLAY_REMOTE_WAL_IN_PEER:
addChildProcedure(new RecoverStandbyProcedure(peerId)); replayRemoteWAL();
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -248,14 +278,7 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new)); .toArray(RefreshPeerProcedure[]::new));
if (toState == SyncReplicationState.STANDBY) { setNextStateAfterRefreshEnd();
setNextState(
enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
: PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
}
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_SET_PEER_ENABLED: case SYNC_REPLICATION_SET_PEER_ENABLED:
try { try {

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
@ -981,34 +982,37 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertNull(rpc.getRemoteWALDir()); assertNull(rpc.getRemoteWALDir());
builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
try { try {
builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change remote wal dir is not allowed"); fail("Change remote wal dir is not allowed");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
builder = ReplicationPeerConfig.newBuilder(); builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND); builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir); builder.setRemoteWALDir("whatever");
try { try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication"); fail("Only support replicated table config for sync replication");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
builder.setReplicateAllUserTables(false); builder.setReplicateAllUserTables(false);
Set<String> namespaces = new HashSet<String>();
namespaces.add("ns1");
builder.setNamespaces(namespaces);
try { try {
Set<String> namespaces = new HashSet<String>();
namespaces.add("ns1");
builder.setNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication"); fail("Only support replicated table config for sync replication");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
builder.setNamespaces(null); builder.setNamespaces(null);
@ -1017,21 +1021,41 @@ public class TestReplicationAdmin {
fail("Only support replicated table config for sync replication, and tables can't be empty"); fail("Only support replicated table config for sync replication, and tables can't be empty");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
Map<TableName, List<String>> tableCfs = new HashMap<>(); Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, Arrays.asList("cf1"));
builder.setTableCFsMap(tableCfs);
try { try {
tableCfs.put(tableName, Arrays.asList("cf1"));
builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication"); fail("Only support replicated table config for sync replication");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>()); tableCfs.put(tableName, new ArrayList<>());
builder.setTableCFsMap(tableCfs); builder.setTableCFsMap(tableCfs);
try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("The remote WAL dir must be absolute");
} catch (Exception e) {
// OK
LOG.info("Expected error:", e);
}
builder.setRemoteWALDir("/hbase/remoteWALs");
try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("The remote WAL dir must be qualified");
} catch (Exception e) {
// OK
LOG.info("Expected error:", e);
}
builder.setRemoteWALDir(rootDir);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
assertEquals(rootDir, rpc.getRemoteWALDir()); assertEquals(rootDir, rpc.getRemoteWALDir());
@ -1042,6 +1066,7 @@ public class TestReplicationAdmin {
fail("Change remote wal dir is not allowed"); fail("Change remote wal dir is not allowed");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
try { try {
@ -1050,6 +1075,7 @@ public class TestReplicationAdmin {
fail("Change remote wal dir is not allowed"); fail("Change remote wal dir is not allowed");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
try { try {
@ -1062,6 +1088,7 @@ public class TestReplicationAdmin {
"Change replicated table config on an existing synchronous peer is not allowed"); "Change replicated table config on an existing synchronous peer is not allowed");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
} }
@ -1079,13 +1106,13 @@ public class TestReplicationAdmin {
try { try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
SyncReplicationState.DOWNGRADE_ACTIVE); SyncReplicationState.DOWNGRADE_ACTIVE);
fail("Can't transit cluster state if replication peer don't config remote wal dir"); fail("Can't transit sync replication state if replication peer don't config remote wal dir");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
builder = ReplicationPeerConfig.newBuilder(); builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND); builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
@ -1106,6 +1133,15 @@ public class TestReplicationAdmin {
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
fail("Can't transit sync replication state to ACTIVE if remote wal dir does not exist");
} catch (Exception e) {
// OK
LOG.info("Expected error:", e);
}
TEST_UTIL.getTestFileSystem()
.mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE, assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
@ -1133,9 +1169,10 @@ public class TestReplicationAdmin {
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try { try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
fail("Can't transit cluster state from STANDBY to ACTIVE"); fail("Can't transit sync replication state from STANDBY to ACTIVE");
} catch (Exception e) { } catch (Exception e) {
// OK // OK
LOG.info("Expected error:", e);
} }
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE); SyncReplicationState.DOWNGRADE_ACTIVE);