HDFS-6788. Improve synchronization in BPOfferService with read write lock. Contributed by Yongjun Zhang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1615191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-08-01 18:56:25 +00:00
parent e86559fde3
commit 511234c828
2 changed files with 189 additions and 113 deletions

View File

@ -97,6 +97,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6802. Some tests in TestDFSClientFailover are missing @Test HDFS-6802. Some tests in TestDFSClientFailover are missing @Test
annotation. (Akira Ajisaka via wang) annotation. (Akira Ajisaka via wang)
HDFS-6788. Improve synchronization in BPOfferService with read write lock.
(Yongjun Zhang via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -21,6 +21,7 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -38,6 +39,8 @@
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* One instance per block-pool/namespace on the DN, which handles the * One instance per block-pool/namespace on the DN, which handles the
@ -91,6 +94,28 @@ class BPOfferService {
*/ */
private long lastActiveClaimTxId = -1; private long lastActiveClaimTxId = -1;
private final ReentrantReadWriteLock mReadWriteLock =
new ReentrantReadWriteLock();
private final Lock mReadLock = mReadWriteLock.readLock();
private final Lock mWriteLock = mReadWriteLock.writeLock();
// utility methods to acquire and release read lock and write lock
void readLock() {
mReadLock.lock();
}
void readUnlock() {
mReadLock.unlock();
}
void writeLock() {
mWriteLock.lock();
}
void writeUnlock() {
mWriteLock.unlock();
}
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(), Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN."); "Must pass at least one NN.");
@ -135,14 +160,19 @@ boolean isAlive() {
} }
return false; return false;
} }
synchronized String getBlockPoolId() { String getBlockPoolId() {
if (bpNSInfo != null) { readLock();
return bpNSInfo.getBlockPoolID(); try {
} else { if (bpNSInfo != null) {
LOG.warn("Block pool ID needed, but service not yet registered with NN", return bpNSInfo.getBlockPoolID();
new Exception("trace")); } else {
return null; LOG.warn("Block pool ID needed, but service not yet registered with NN",
new Exception("trace"));
return null;
}
} finally {
readUnlock();
} }
} }
@ -150,27 +180,37 @@ boolean hasBlockPoolId() {
return getNamespaceInfo() != null; return getNamespaceInfo() != null;
} }
synchronized NamespaceInfo getNamespaceInfo() { NamespaceInfo getNamespaceInfo() {
return bpNSInfo; readLock();
try {
return bpNSInfo;
} finally {
readUnlock();
}
} }
@Override @Override
public synchronized String toString() { public String toString() {
if (bpNSInfo == null) { readLock();
// If we haven't yet connected to our NN, we don't yet know our try {
// own block pool ID. if (bpNSInfo == null) {
// If _none_ of the block pools have connected yet, we don't even // If we haven't yet connected to our NN, we don't yet know our
// know the DatanodeID ID of this DN. // own block pool ID.
String datanodeUuid = dn.getDatanodeUuid(); // If _none_ of the block pools have connected yet, we don't even
// know the DatanodeID ID of this DN.
String datanodeUuid = dn.getDatanodeUuid();
if (datanodeUuid == null || datanodeUuid.isEmpty()) { if (datanodeUuid == null || datanodeUuid.isEmpty()) {
datanodeUuid = "unassigned"; datanodeUuid = "unassigned";
}
return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
} else {
return "Block pool " + getBlockPoolId() +
" (Datanode Uuid " + dn.getDatanodeUuid() +
")";
} }
return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; } finally {
} else { readUnlock();
return "Block pool " + getBlockPoolId() +
" (Datanode Uuid " + dn.getDatanodeUuid() +
")";
} }
} }
@ -266,32 +306,37 @@ DataNode getDataNode() {
* verifies that this namespace matches (eg to prevent a misconfiguration * verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified) * where a StandbyNode from a different cluster is specified)
*/ */
synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
if (this.bpNSInfo == null) { writeLock();
this.bpNSInfo = nsInfo; try {
boolean success = false; if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo;
boolean success = false;
// Now that we know the namespace ID, etc, we can pass this to the DN. // Now that we know the namespace ID, etc, we can pass this to the DN.
// The DN can now initialize its local storage if we are the // The DN can now initialize its local storage if we are the
// first BP to handshake, etc. // first BP to handshake, etc.
try { try {
dn.initBlockPool(this); dn.initBlockPool(this);
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
// The datanode failed to initialize the BP. We need to reset // The datanode failed to initialize the BP. We need to reset
// the namespace info so that other BPService actors still have // the namespace info so that other BPService actors still have
// a chance to set it, and re-initialize the datanode. // a chance to set it, and re-initialize the datanode.
this.bpNSInfo = null; this.bpNSInfo = null;
}
} }
} else {
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
"Blockpool ID");
checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
"Namespace ID");
checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
"Cluster ID");
} }
} else { } finally {
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), writeUnlock();
"Blockpool ID");
checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
"Namespace ID");
checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
"Cluster ID");
} }
} }
@ -300,22 +345,27 @@ synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOExcep
* NN, it calls this function to verify that the NN it connected to * NN, it calls this function to verify that the NN it connected to
* is consistent with other NNs serving the block-pool. * is consistent with other NNs serving the block-pool.
*/ */
synchronized void registrationSucceeded(BPServiceActor bpServiceActor, void registrationSucceeded(BPServiceActor bpServiceActor,
DatanodeRegistration reg) throws IOException { DatanodeRegistration reg) throws IOException {
if (bpRegistration != null) { writeLock();
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), try {
reg.getStorageInfo().getNamespaceID(), "namespace ID"); if (bpRegistration != null) {
checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
reg.getStorageInfo().getClusterID(), "cluster ID"); reg.getStorageInfo().getNamespaceID(), "namespace ID");
} else { checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
bpRegistration = reg; reg.getStorageInfo().getClusterID(), "cluster ID");
} } else {
bpRegistration = reg;
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); }
// Add the initial block token secret keys to the DN's secret manager.
if (dn.isBlockTokenEnabled) { dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), // Add the initial block token secret keys to the DN's secret manager.
reg.getExportedKeys()); if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
reg.getExportedKeys());
}
} finally {
writeUnlock();
} }
} }
@ -333,25 +383,35 @@ private static void checkNSEquality(
} }
} }
synchronized DatanodeRegistration createRegistration() { DatanodeRegistration createRegistration() {
Preconditions.checkState(bpNSInfo != null, writeLock();
"getRegistration() can only be called after initial handshake"); try {
return dn.createBPRegistration(bpNSInfo); Preconditions.checkState(bpNSInfo != null,
"getRegistration() can only be called after initial handshake");
return dn.createBPRegistration(bpNSInfo);
} finally {
writeUnlock();
}
} }
/** /**
* Called when an actor shuts down. If this is the last actor * Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN. * to shut down, shuts down the whole blockpool in the DN.
*/ */
synchronized void shutdownActor(BPServiceActor actor) { void shutdownActor(BPServiceActor actor) {
if (bpServiceToActive == actor) { writeLock();
bpServiceToActive = null; try {
} if (bpServiceToActive == actor) {
bpServiceToActive = null;
}
bpServices.remove(actor); bpServices.remove(actor);
if (bpServices.isEmpty()) { if (bpServices.isEmpty()) {
dn.shutdownBlockPool(this); dn.shutdownBlockPool(this);
}
} finally {
writeUnlock();
} }
} }
@ -393,11 +453,16 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
* @return a proxy to the active NN, or null if the BPOS has not * @return a proxy to the active NN, or null if the BPOS has not
* acknowledged any NN as active yet. * acknowledged any NN as active yet.
*/ */
synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { DatanodeProtocolClientSideTranslatorPB getActiveNN() {
if (bpServiceToActive != null) { readLock();
return bpServiceToActive.bpNamenode; try {
} else { if (bpServiceToActive != null) {
return null; return bpServiceToActive.bpNamenode;
} else {
return null;
}
} finally {
readUnlock();
} }
} }
@ -425,45 +490,50 @@ void signalRollingUpgrade(boolean inProgress) {
* @param actor the actor which received the heartbeat * @param actor the actor which received the heartbeat
* @param nnHaState the HA-related heartbeat contents * @param nnHaState the HA-related heartbeat contents
*/ */
synchronized void updateActorStatesFromHeartbeat( void updateActorStatesFromHeartbeat(
BPServiceActor actor, BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) { NNHAStatusHeartbeat nnHaState) {
final long txid = nnHaState.getTxId(); writeLock();
try {
final boolean nnClaimsActive = final long txid = nnHaState.getTxId();
nnHaState.getState() == HAServiceState.ACTIVE;
final boolean bposThinksActive = bpServiceToActive == actor; final boolean nnClaimsActive =
final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; nnHaState.getState() == HAServiceState.ACTIVE;
final boolean bposThinksActive = bpServiceToActive == actor;
if (nnClaimsActive && !bposThinksActive) { final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
"txid=" + txid); if (nnClaimsActive && !bposThinksActive) {
if (!isMoreRecentClaim) { LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
// Split-brain scenario - an NN is trying to claim active "txid=" + txid);
// state when a different NN has already claimed it with a higher if (!isMoreRecentClaim) {
// txid. // Split-brain scenario - an NN is trying to claim active
LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + // state when a different NN has already claimed it with a higher
txid + " but there was already a more recent claim at txid=" + // txid.
lastActiveClaimTxId); LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
return; txid + " but there was already a more recent claim at txid=" +
} else { lastActiveClaimTxId);
if (bpServiceToActive == null) { return;
LOG.info("Acknowledging ACTIVE Namenode " + actor);
} else { } else {
LOG.info("Namenode " + actor + " taking over ACTIVE state from " + if (bpServiceToActive == null) {
bpServiceToActive + " at higher txid=" + txid); LOG.info("Acknowledging ACTIVE Namenode " + actor);
} else {
LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
bpServiceToActive + " at higher txid=" + txid);
}
bpServiceToActive = actor;
} }
bpServiceToActive = actor; } else if (!nnClaimsActive && bposThinksActive) {
LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
"txid=" + nnHaState.getTxId());
bpServiceToActive = null;
} }
} else if (!nnClaimsActive && bposThinksActive) {
LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + if (bpServiceToActive == actor) {
"txid=" + nnHaState.getTxId()); assert txid >= lastActiveClaimTxId;
bpServiceToActive = null; lastActiveClaimTxId = txid;
} }
} finally {
if (bpServiceToActive == actor) { writeUnlock();
assert txid >= lastActiveClaimTxId;
lastActiveClaimTxId = txid;
} }
} }
@ -534,11 +604,14 @@ boolean processCommandFromActor(DatanodeCommand cmd,
actor.reRegister(); actor.reRegister();
return true; return true;
} }
synchronized (this) { writeLock();
try {
if (actor == bpServiceToActive) { if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor); return processCommandFromActive(cmd, actor);
} else { } else {
return processCommandFromStandby(cmd, actor); return processCommandFromStandby(cmd, actor);
} finally {
writeUnlock();
} }
} }
} }