Revert HDFS-6788, bad merge.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1615239 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-08-01 21:45:38 +00:00
parent 511234c828
commit 2777f777e9
2 changed files with 113 additions and 189 deletions

View File

@ -97,9 +97,6 @@ Release 2.6.0 - UNRELEASED
HDFS-6802. Some tests in TestDFSClientFailover are missing @Test HDFS-6802. Some tests in TestDFSClientFailover are missing @Test
annotation. (Akira Ajisaka via wang) annotation. (Akira Ajisaka via wang)
HDFS-6788. Improve synchronization in BPOfferService with read write lock.
(Yongjun Zhang via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -39,8 +38,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* One instance per block-pool/namespace on the DN, which handles the * One instance per block-pool/namespace on the DN, which handles the
@ -94,28 +91,6 @@ class BPOfferService {
*/ */
private long lastActiveClaimTxId = -1; private long lastActiveClaimTxId = -1;
private final ReentrantReadWriteLock mReadWriteLock =
new ReentrantReadWriteLock();
private final Lock mReadLock = mReadWriteLock.readLock();
private final Lock mWriteLock = mReadWriteLock.writeLock();
// utility methods to acquire and release read lock and write lock
void readLock() {
mReadLock.lock();
}
void readUnlock() {
mReadLock.unlock();
}
void writeLock() {
mWriteLock.lock();
}
void writeUnlock() {
mWriteLock.unlock();
}
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(), Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN."); "Must pass at least one NN.");
@ -161,9 +136,7 @@ class BPOfferService {
return false; return false;
} }
String getBlockPoolId() { synchronized String getBlockPoolId() {
readLock();
try {
if (bpNSInfo != null) { if (bpNSInfo != null) {
return bpNSInfo.getBlockPoolID(); return bpNSInfo.getBlockPoolID();
} else { } else {
@ -171,28 +144,18 @@ class BPOfferService {
new Exception("trace")); new Exception("trace"));
return null; return null;
} }
} finally {
readUnlock();
}
} }
boolean hasBlockPoolId() { boolean hasBlockPoolId() {
return getNamespaceInfo() != null; return getNamespaceInfo() != null;
} }
NamespaceInfo getNamespaceInfo() { synchronized NamespaceInfo getNamespaceInfo() {
readLock();
try {
return bpNSInfo; return bpNSInfo;
} finally {
readUnlock();
}
} }
@Override @Override
public String toString() { public synchronized String toString() {
readLock();
try {
if (bpNSInfo == null) { if (bpNSInfo == null) {
// If we haven't yet connected to our NN, we don't yet know our // If we haven't yet connected to our NN, we don't yet know our
// own block pool ID. // own block pool ID.
@ -209,9 +172,6 @@ class BPOfferService {
" (Datanode Uuid " + dn.getDatanodeUuid() + " (Datanode Uuid " + dn.getDatanodeUuid() +
")"; ")";
} }
} finally {
readUnlock();
}
} }
void reportBadBlocks(ExtendedBlock block, void reportBadBlocks(ExtendedBlock block,
@ -306,9 +266,7 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration * verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified) * where a StandbyNode from a different cluster is specified)
*/ */
void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
writeLock();
try {
if (this.bpNSInfo == null) { if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo; this.bpNSInfo = nsInfo;
boolean success = false; boolean success = false;
@ -335,9 +293,6 @@ class BPOfferService {
checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
"Cluster ID"); "Cluster ID");
} }
} finally {
writeUnlock();
}
} }
/** /**
@ -345,10 +300,8 @@ class BPOfferService {
* NN, it calls this function to verify that the NN it connected to * NN, it calls this function to verify that the NN it connected to
* is consistent with other NNs serving the block-pool. * is consistent with other NNs serving the block-pool.
*/ */
void registrationSucceeded(BPServiceActor bpServiceActor, synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
DatanodeRegistration reg) throws IOException { DatanodeRegistration reg) throws IOException {
writeLock();
try {
if (bpRegistration != null) { if (bpRegistration != null) {
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
reg.getStorageInfo().getNamespaceID(), "namespace ID"); reg.getStorageInfo().getNamespaceID(), "namespace ID");
@ -364,9 +317,6 @@ class BPOfferService {
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
reg.getExportedKeys()); reg.getExportedKeys());
} }
} finally {
writeUnlock();
}
} }
/** /**
@ -383,24 +333,17 @@ class BPOfferService {
} }
} }
DatanodeRegistration createRegistration() { synchronized DatanodeRegistration createRegistration() {
writeLock();
try {
Preconditions.checkState(bpNSInfo != null, Preconditions.checkState(bpNSInfo != null,
"getRegistration() can only be called after initial handshake"); "getRegistration() can only be called after initial handshake");
return dn.createBPRegistration(bpNSInfo); return dn.createBPRegistration(bpNSInfo);
} finally {
writeUnlock();
}
} }
/** /**
* Called when an actor shuts down. If this is the last actor * Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN. * to shut down, shuts down the whole blockpool in the DN.
*/ */
void shutdownActor(BPServiceActor actor) { synchronized void shutdownActor(BPServiceActor actor) {
writeLock();
try {
if (bpServiceToActive == actor) { if (bpServiceToActive == actor) {
bpServiceToActive = null; bpServiceToActive = null;
} }
@ -410,9 +353,6 @@ class BPOfferService {
if (bpServices.isEmpty()) { if (bpServices.isEmpty()) {
dn.shutdownBlockPool(this); dn.shutdownBlockPool(this);
} }
} finally {
writeUnlock();
}
} }
@ -453,17 +393,12 @@ class BPOfferService {
* @return a proxy to the active NN, or null if the BPOS has not * @return a proxy to the active NN, or null if the BPOS has not
* acknowledged any NN as active yet. * acknowledged any NN as active yet.
*/ */
DatanodeProtocolClientSideTranslatorPB getActiveNN() { synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
readLock();
try {
if (bpServiceToActive != null) { if (bpServiceToActive != null) {
return bpServiceToActive.bpNamenode; return bpServiceToActive.bpNamenode;
} else { } else {
return null; return null;
} }
} finally {
readUnlock();
}
} }
@VisibleForTesting @VisibleForTesting
@ -490,11 +425,9 @@ class BPOfferService {
* @param actor the actor which received the heartbeat * @param actor the actor which received the heartbeat
* @param nnHaState the HA-related heartbeat contents * @param nnHaState the HA-related heartbeat contents
*/ */
void updateActorStatesFromHeartbeat( synchronized void updateActorStatesFromHeartbeat(
BPServiceActor actor, BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) { NNHAStatusHeartbeat nnHaState) {
writeLock();
try {
final long txid = nnHaState.getTxId(); final long txid = nnHaState.getTxId();
final boolean nnClaimsActive = final boolean nnClaimsActive =
@ -532,9 +465,6 @@ class BPOfferService {
assert txid >= lastActiveClaimTxId; assert txid >= lastActiveClaimTxId;
lastActiveClaimTxId = txid; lastActiveClaimTxId = txid;
} }
} finally {
writeUnlock();
}
} }
/** /**
@ -604,14 +534,11 @@ class BPOfferService {
actor.reRegister(); actor.reRegister();
return true; return true;
} }
writeLock(); synchronized (this) {
try {
if (actor == bpServiceToActive) { if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor); return processCommandFromActive(cmd, actor);
} else { } else {
return processCommandFromStandby(cmd, actor); return processCommandFromStandby(cmd, actor);
} finally {
writeUnlock();
} }
} }
} }