HBASE-27311 The implementation of syncReplicationPeerLock is incomplete (#4715)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
parent
00a719e76f
commit
950ad8dd3e
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -91,13 +92,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void prePeerModification(MasterProcedureEnv env)
|
protected void prePeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException, InterruptedException {
|
throws IOException, ReplicationException, ProcedureSuspendedException {
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||||
}
|
}
|
||||||
if (peerConfig.isSyncReplication()) {
|
if (peerConfig.isSyncReplication()) {
|
||||||
env.getReplicationPeerManager().acquireSyncReplicationPeerLock();
|
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
|
||||||
|
throw suspend(env.getMasterConfiguration(),
|
||||||
|
backoff -> LOG.warn(
|
||||||
|
"Can not acquire sync replication peer lock for peer {}, sleep {} secs", peerId,
|
||||||
|
backoff / 1000));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
|
env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
|
||||||
}
|
}
|
||||||
|
@ -119,6 +125,20 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterReplay(MasterProcedureEnv env) {
|
||||||
|
if (getCurrentState() == getInitialState()) {
|
||||||
|
// will try to acquire the lock when executing the procedure, no need to acquire it here
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (peerConfig.isSyncReplication()) {
|
||||||
|
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Can not acquire sync replication peer lock for peer " + peerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
super.serializeStateData(serializer);
|
super.serializeStateData(serializer);
|
||||||
|
|
|
@ -60,7 +60,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
* all checks passes then the procedure can not be rolled back any more.
|
* all checks passes then the procedure can not be rolled back any more.
|
||||||
*/
|
*/
|
||||||
protected abstract void prePeerModification(MasterProcedureEnv env)
|
protected abstract void prePeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException, InterruptedException;
|
throws IOException, ReplicationException, ProcedureSuspendedException;
|
||||||
|
|
||||||
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
|
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
|
||||||
|
|
||||||
|
|
|
@ -587,8 +587,8 @@ public class ReplicationPeerManager {
|
||||||
return s1.equals(s2);
|
return s1.equals(s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void acquireSyncReplicationPeerLock() throws InterruptedException {
|
public boolean tryAcquireSyncReplicationPeerLock() {
|
||||||
syncReplicationPeerLock.acquire();
|
return syncReplicationPeerLock.tryAcquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void releaseSyncReplicationPeerLock() {
|
public void releaseSyncReplicationPeerLock() {
|
||||||
|
|
Loading…
Reference in New Issue