HBASE-19857 Complete the procedure for adding a sync replication peer

This commit is contained in:
zhangduo 2018-01-25 20:09:00 +08:00
parent 2acebac00e
commit d8842dc3d4
9 changed files with 60 additions and 44 deletions

View File

@ -53,6 +53,15 @@ public interface ReplicationPeer {
*/
PeerState getPeerState();
/**
* Returns the sync replication state of the peer by reading local cache.
* <p>
* If the peer is not a synchronous replication peer, a {@link SyncReplicationState#NONE} will be
* returned.
* @return the sync replication state
*/
SyncReplicationState getSyncReplicationState();
/**
* Test whether the peer is enabled.
* @return {@code true} if enabled, otherwise {@code false}.

View File

@ -36,6 +36,8 @@ public class ReplicationPeerImpl implements ReplicationPeer {
private volatile PeerState peerState;
private volatile SyncReplicationState syncReplicationState;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
/**
@ -45,12 +47,13 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
ReplicationPeerConfig peerConfig) {
public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
boolean peerState, SyncReplicationState syncReplicationState) {
this.conf = conf;
this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
this.syncReplicationState = syncReplicationState;
this.peerConfigListeners = new ArrayList<>();
}
@ -77,37 +80,26 @@ public class ReplicationPeerImpl implements ReplicationPeer {
return peerState;
}
/**
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
*/
@Override
public SyncReplicationState getSyncReplicationState() {
return syncReplicationState;
}
@Override
public ReplicationPeerConfig getPeerConfig() {
return peerConfig;
}
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
@Override
public Configuration getConfiguration() {
return conf;
}
/**
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
@Override
public Map<TableName, List<String>> getTableCFs() {
return this.peerConfig.getTableCFsMap();
}
/**
* Get replicable namespace set of this peer
* @return the replicable namespaces set
*/
@Override
public Set<String> getNamespaces() {
return this.peerConfig.getNamespaces();

View File

@ -134,7 +134,8 @@ public class ReplicationPeers {
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, enabled, peerConfig);
peerId, peerConfig, enabled, syncReplicationState);
}
}

View File

@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Get notification for replication peer events. Mainly used for telling the
* {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to close some WAL if not
* used any more.
* <p>
* TODO: Also need a synchronous peer state change notification.
* {@link org.apache.hadoop.hbase.wal.SyncReplicationWALProvider} to close some WAL if not used any
* more.
*/
@InterfaceAudience.Private
public interface PeerActionListener {
default void peerRemoved(String peerId) {}
default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to) {}
}

View File

@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Get the peer id and remote root dir if the region is synchronously replicated.
*/
@InterfaceAudience.Private
public interface SynchronousReplicationPeerProvider {
public interface SyncReplicationPeerProvider {
/**
* Return the peer id and remote WAL directory if the region is synchronously replicated.

View File

@ -37,8 +37,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@ -58,16 +59,15 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
* the request to the normal {@link WALProvider}.
*/
@InterfaceAudience.Private
public class SynchronousReplicationWALProvider implements WALProvider, PeerActionListener {
public class SyncReplicationWALProvider implements WALProvider, PeerActionListener {
private static final Logger LOG =
LoggerFactory.getLogger(SynchronousReplicationWALProvider.class);
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
private static final String LOG_SUFFIX = ".syncrep";
private final WALProvider provider;
private final SynchronousReplicationPeerProvider peerProvider;
private final SyncReplicationPeerProvider peerProvider;
private WALFactory factory;
@ -85,8 +85,7 @@ public class SynchronousReplicationWALProvider implements WALProvider, PeerActio
private final KeyLocker<String> createLock = new KeyLocker<>();
SynchronousReplicationWALProvider(WALProvider provider,
SynchronousReplicationPeerProvider peerProvider) {
SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) {
this.provider = provider;
this.peerProvider = peerProvider;
}
@ -100,7 +99,7 @@ public class SynchronousReplicationWALProvider implements WALProvider, PeerActio
this.conf = conf;
this.factory = factory;
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
@ -140,7 +139,7 @@ public class SynchronousReplicationWALProvider implements WALProvider, PeerActio
@Override
public WAL getWAL(RegionInfo region) throws IOException {
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerProvider.getPeerIdAndRemoteWALDir(region);
peerProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
return getWAL(pair.getFirst(), pair.getSecond());
@ -205,9 +204,7 @@ public class SynchronousReplicationWALProvider implements WALProvider, PeerActio
provider.getLogFileSize();
}
@Override
public void peerRemoved(String peerId) {
WAL wal = peerId2WAL.remove(peerId);
private void safeClose(WAL wal) {
if (wal != null) {
try {
wal.close();
@ -222,4 +219,16 @@ public class SynchronousReplicationWALProvider implements WALProvider, PeerActio
listeners.add(listener);
provider.addWALActionsListener(listener);
}
@Override
public void peerRemoved(String peerId) {
safeClose(peerId2WAL.remove(peerId));
}
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to) {
assert to == SyncReplicationState.DOWNGRADE_ACTIVE;
safeClose(peerId2WAL.remove(peerId));
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -188,7 +188,7 @@ public class WALFactory {
* Remove it once we can integrate the synchronous replication logic in RS.
*/
@VisibleForTesting
WALFactory(Configuration conf, String factoryId, SynchronousReplicationPeerProvider peerProvider)
WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider)
throws IOException {
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
@ -197,9 +197,9 @@ public class WALFactory {
this.conf = conf;
this.factoryId = factoryId;
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
this.provider = new SynchronousReplicationWALProvider(provider, peerProvider);
this.provider.addWALActionsListener(new MetricsWAL());
this.provider = new SyncReplicationWALProvider(provider, peerProvider);
this.provider.init(this, conf, null);
this.provider.addWALActionsListener(new MetricsWAL());
}
/**

View File

@ -173,6 +173,9 @@ public abstract class TestReplicationSourceManager {
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
Bytes.toBytes(SyncReplicationState.NONE.ordinal()));
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);

View File

@ -45,7 +45,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSynchronousReplicationWALProvider {
public class TestSyncReplicationWALProvider {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -75,7 +75,7 @@ public class TestSynchronousReplicationWALProvider {
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir);
TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir);
UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
}
@ -145,8 +145,8 @@ public class TestSynchronousReplicationWALProvider {
DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
assertEquals(2, FACTORY.getWALs().size());
testReadWrite(wal);
SynchronousReplicationWALProvider walProvider =
(SynchronousReplicationWALProvider) FACTORY.getWALProvider();
SyncReplicationWALProvider walProvider =
(SyncReplicationWALProvider) FACTORY.getWALProvider();
walProvider.peerRemoved(PEER_ID);
assertEquals(1, FACTORY.getWALs().size());
}