Merge HDFS-5014. Process register commands with out holding BPOfferService lock. Contributed by Vinay.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543862 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b0bd967090
commit
6e1234cbf4
|
@ -303,6 +303,9 @@ Release 2.2.1 - UNRELEASED
|
|||
HDFS-4516. Client crash after block allocation and NN switch before lease recovery for
|
||||
the same file can cause readers to fail forever (VinaayKumar B via umamahesh)
|
||||
|
||||
HDFS-5014. Process register commands with out holding BPOfferService lock.
|
||||
(Vinaykumar B via umamahesh)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -72,7 +72,7 @@ class BPOfferService {
|
|||
* This is assigned after the second phase of the
|
||||
* handshake.
|
||||
*/
|
||||
DatanodeRegistration bpRegistration;
|
||||
volatile DatanodeRegistration bpRegistration;
|
||||
|
||||
private final DataNode dn;
|
||||
|
||||
|
@ -294,7 +294,7 @@ class BPOfferService {
|
|||
* NN, it calls this function to verify that the NN it connected to
|
||||
* is consistent with other NNs serving the block-pool.
|
||||
*/
|
||||
void registrationSucceeded(BPServiceActor bpServiceActor,
|
||||
synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
|
||||
DatanodeRegistration reg) throws IOException {
|
||||
if (bpRegistration != null) {
|
||||
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
|
||||
|
@ -497,17 +497,37 @@ class BPOfferService {
|
|||
}
|
||||
}
|
||||
|
||||
synchronized boolean processCommandFromActor(DatanodeCommand cmd,
|
||||
boolean processCommandFromActor(DatanodeCommand cmd,
|
||||
BPServiceActor actor) throws IOException {
|
||||
assert bpServices.contains(actor);
|
||||
if (cmd == null) {
|
||||
return true;
|
||||
}
|
||||
/*
|
||||
* Datanode Registration can be done asynchronously here. No need to hold
|
||||
* the lock. for more info refer HDFS-5014
|
||||
*/
|
||||
if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) {
|
||||
// namenode requested a registration - at start or if NN lost contact
|
||||
// Just logging the claiming state is OK here instead of checking the
|
||||
// actor state by obtaining the lock
|
||||
LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr
|
||||
+ " with " + actor.state + " state");
|
||||
actor.reRegister();
|
||||
return true;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (actor == bpServiceToActive) {
|
||||
return processCommandFromActive(cmd, actor);
|
||||
} else {
|
||||
return processCommandFromStandby(cmd, actor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should handle all commands from Active namenode except
|
||||
* DNA_REGISTER which should be handled earlier itself.
|
||||
*
|
||||
* @param cmd
|
||||
* @return true if further processing may be required or false otherwise.
|
||||
|
@ -515,8 +535,6 @@ class BPOfferService {
|
|||
*/
|
||||
private boolean processCommandFromActive(DatanodeCommand cmd,
|
||||
BPServiceActor actor) throws IOException {
|
||||
if (cmd == null)
|
||||
return true;
|
||||
final BlockCommand bcmd =
|
||||
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
|
||||
|
||||
|
@ -548,11 +566,6 @@ class BPOfferService {
|
|||
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
|
||||
// See HDFS-2987.
|
||||
throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
|
||||
case DatanodeProtocol.DNA_REGISTER:
|
||||
// namenode requested a registration - at start or if NN lost contact
|
||||
LOG.info("DatanodeCommand action: DNA_REGISTER");
|
||||
actor.reRegister();
|
||||
break;
|
||||
case DatanodeProtocol.DNA_FINALIZE:
|
||||
String bp = ((FinalizeCommand) cmd).getBlockPoolId();
|
||||
assert getBlockPoolId().equals(bp) :
|
||||
|
@ -592,16 +605,13 @@ class BPOfferService {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should handle commands from Standby namenode except
|
||||
* DNA_REGISTER which should be handled earlier itself.
|
||||
*/
|
||||
private boolean processCommandFromStandby(DatanodeCommand cmd,
|
||||
BPServiceActor actor) throws IOException {
|
||||
if (cmd == null)
|
||||
return true;
|
||||
switch(cmd.getAction()) {
|
||||
case DatanodeProtocol.DNA_REGISTER:
|
||||
// namenode requested a registration - at start or if NN lost contact
|
||||
LOG.info("DatanodeCommand action from standby: DNA_REGISTER");
|
||||
actor.reRegister();
|
||||
break;
|
||||
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
||||
LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
|
||||
if (dn.isBlockTokenEnabled) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -72,6 +73,7 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
static final Log LOG = DataNode.LOG;
|
||||
final InetSocketAddress nnAddr;
|
||||
HAServiceState state;
|
||||
|
||||
BPOfferService bpos;
|
||||
|
||||
|
@ -534,6 +536,7 @@ class BPServiceActor implements Runnable {
|
|||
// that we should actually process.
|
||||
bpos.updateActorStatesFromHeartbeat(
|
||||
this, resp.getNameNodeHaState());
|
||||
state = resp.getNameNodeHaState().getState();
|
||||
|
||||
long startProcessCommands = now();
|
||||
if (!processCommand(resp.getCommands()))
|
||||
|
|
Loading…
Reference in New Issue