HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1208987 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-12-01 08:03:41 +00:00
parent 1e346aa829
commit 8dbb523768
14 changed files with 203 additions and 44 deletions

View File

@ -31,3 +31,5 @@ HDFS-2591. MiniDFSCluster support to mix and match federation with HA (todd)
HDFS-1975. Support for sharing the namenode state from active to standby. (jitendra, atm, todd)
HDFS-1971. Send block report from datanode to both active and standby namenodes. (sanjay, todd via suresh)
HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh)

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.ipc.RPC;
@ -333,7 +334,7 @@ class BPServiceActor implements Runnable {
}
DatanodeCommand [] sendHeartBeat() throws IOException {
HeartbeatResponse sendHeartBeat() throws IOException {
LOG.info("heartbeat: " + this);
// TODO: saw an NPE here - maybe if the two BPOS register at
// same time, this one won't block on the other one?
@ -420,16 +421,17 @@ class BPServiceActor implements Runnable {
//
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
DatanodeCommand[] cmds = sendHeartBeat();
HeartbeatResponse resp = sendHeartBeat();
dn.getMetrics().addHeartbeat(now() - startTime);
long startProcessCommands = now();
if (!processCommand(cmds))
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands = now();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands) +
"ms to process " + cmds.length + " commands from NN");
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}

View File

@ -154,6 +154,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -2688,7 +2689,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @return an array of datanode commands
* @throws IOException
*/
DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
@ -2699,16 +2700,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
if (cmds != null) {
return cmds;
if (cmds == null) {
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
if (cmd != null) {
cmds = new DatanodeCommand[] {cmd};
}
}
//check distributed upgrade
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
if (cmd != null) {
return new DatanodeCommand[] {cmd};
}
return null;
return new HeartbeatResponse(cmds);
} finally {
readUnlock();
}

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -857,7 +858,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {

View File

@ -22,8 +22,8 @@ import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol;
@ -92,7 +92,7 @@ public interface DatanodeProtocol extends VersionedProtocol {
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
* It also gives the NameNode a chance to return
* an array of "DatanodeCommand" objects.
* an array of "DatanodeCommand" objects in HeartbeatResponse.
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
* @param registration datanode registration information
@ -106,7 +106,7 @@ public interface DatanodeProtocol extends VersionedProtocol {
* @throws IOException on error
*/
@Nullable
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
long capacity,
long dfsUsed, long remaining,
long blockPoolUsed,

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
@InterfaceStability.Evolving
/**
* Response to {@link DatanodeProtocol#sendHeartbeat}
*/
public class HeartbeatResponse implements Writable {
/** Commands returned from the namenode to the datanode */
private DatanodeCommand[] commands;
public HeartbeatResponse() {
// Empty constructor required for Writable
}
public HeartbeatResponse(DatanodeCommand[] cmds) {
commands = cmds;
}
public DatanodeCommand[] getCommands() {
return commands;
}
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
int length = commands == null ? 0 : commands.length;
out.writeInt(length);
for (int i = 0; i < length; i++) {
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
}
@Override
public void readFields(DataInput in) throws IOException {
int length = in.readInt();
commands = new DatanodeCommand[length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < length; i++) {
commands[i] = (DatanodeCommand) ObjectWritable.readObject(in,
objectWritable, null);
}
}
}

View File

@ -110,11 +110,11 @@ public class DatanodeProtocolServerSideTranslatorR23 implements
}
@Override
public DatanodeCommandWritable[] sendHeartbeat(
public HeartbeatResponseWritable sendHeartbeat(
DatanodeRegistrationWritable registration, long capacity, long dfsUsed,
long remaining, long blockPoolUsed, int xmitsInProgress,
int xceiverCount, int failedVolumes) throws IOException {
return DatanodeCommandWritable.convert(server.sendHeartbeat(
return HeartbeatResponseWritable.convert(server.sendHeartbeat(
registration.convert(), capacity, dfsUsed, remaining, blockPoolUsed,
xmitsInProgress, xceiverCount, failedVolumes));
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@ -130,14 +131,14 @@ public class DatanodeProtocolTranslatorR23 implements
}
@Override
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {
return DatanodeCommandWritable.convert(rpcProxy.sendHeartbeat(
DatanodeRegistrationWritable.convert(registration), capacity,
dfsUsed, remaining, blockPoolUsed, xmitsInProgress, xceiverCount,
failedVolumes));
return rpcProxy.sendHeartbeat(
DatanodeRegistrationWritable.convert(registration), capacity, dfsUsed,
remaining, blockPoolUsed, xmitsInProgress, xceiverCount, failedVolumes)
.convert();
}
@Override

View File

@ -99,7 +99,7 @@ public interface DatanodeWireProtocol extends VersionedProtocol {
* @throws IOException on error
*/
@Nullable
public DatanodeCommandWritable[] sendHeartbeat(
public HeartbeatResponseWritable sendHeartbeat(
DatanodeRegistrationWritable registration, long capacity, long dfsUsed,
long remaining, long blockPoolUsed, int xmitsInProgress,
int xceiverCount, int failedVolumes) throws IOException;

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class HeartbeatResponseWritable implements Writable {
private DatanodeCommandWritable[] commands;
public HeartbeatResponseWritable() {
// Empty constructor for Writable
}
public HeartbeatResponseWritable(DatanodeCommandWritable[] cmds) {
commands = cmds;
}
public HeartbeatResponse convert() {
return new HeartbeatResponse(DatanodeCommandWritable.convert(commands));
}
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
int length = commands == null ? 0 : commands.length;
out.writeInt(length);
for (int i = 0; i < length; i++) {
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
}
@Override
public void readFields(DataInput in) throws IOException {
int length = in.readInt();
commands = new DatanodeCommandWritable[length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < length; i++) {
commands[i] = (DatanodeCommandWritable) ObjectWritable.readObject(in,
objectWritable, null);
}
}
public static HeartbeatResponseWritable convert(
HeartbeatResponse resp) {
return new HeartbeatResponseWritable(DatanodeCommandWritable.convert(resp
.getCommands()));
}
}

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
public class TestHeartbeatHandling extends TestCase {
/**
* Test if
* {@link FSNamesystem#handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int)}
* {@link FSNamesystem#handleHeartbeat}
* can pick up replication and/or invalidate requests and observes the max
* limit
*/
@ -75,7 +75,8 @@ public class TestHeartbeatHandling extends TestCase {
dd.addBlockToBeReplicated(
new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
}
DatanodeCommand[]cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd,
namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
@ -85,26 +86,30 @@ public class TestHeartbeatHandling extends TestCase {
blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
}
dd.addBlocksToBeInvalidated(blockList);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(null, cmds);
}
} finally {

View File

@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@ -78,7 +76,7 @@ import org.apache.log4j.LogManager;
* <li>-logLevel L specifies the logging level when the benchmark runs.
* The default logging level is {@link Level#ERROR}.</li>
* <li>-UGCacheRefreshCount G will cause the benchmark to call
* {@link NameNode#refreshUserToGroupsMappings()} after
* {@link NameNodeRpcServer#refreshUserToGroupsMappings} after
* every G operations, which purges the name-node's user group cache.
* By default the refresh is never called.</li>
* <li>-keepResults do not clean up the name-space after execution.</li>
@ -104,7 +102,7 @@ public class NNThroughputBenchmark {
static NameNode nameNode;
static NamenodeProtocols nameNodeProto;
NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
NNThroughputBenchmark(Configuration conf) throws IOException {
config = conf;
// We do not need many handlers, since each thread simulates a handler
// by calling name-node methods directly
@ -125,7 +123,7 @@ public class NNThroughputBenchmark {
nameNodeProto = nameNode.getRpcServer();
}
void close() throws IOException {
void close() {
nameNode.stop();
}
@ -806,7 +804,8 @@ public class NNThroughputBenchmark {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0)
.getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -851,7 +850,8 @@ public class NNThroughputBenchmark {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0)
.getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@ -916,7 +916,7 @@ public class NNThroughputBenchmark {
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60);
parseArguments(args);
// adjust replication to the number of data-nodes
this.replication = (short)Math.min((int)replication, getNumDatanodes());
this.replication = (short)Math.min(replication, getNumDatanodes());
}
/**

View File

@ -26,8 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
@ -90,7 +90,7 @@ public class NameNodeAdapter {
return ns.getDelegationTokenSecretManager();
}
public static DatanodeCommand[] sendHeartBeat(DatanodeRegistration nodeReg,
public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(),
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0);

View File

@ -128,7 +128,8 @@ public class TestDeadDatanode {
// Ensure heartbeat from dead datanode is rejected with a command
// that asks datanode to register again
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, 0, 0, 0, 0, 0, 0, 0);
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, 0, 0, 0, 0, 0, 0, 0)
.getCommands();
Assert.assertEquals(1, cmd.length);
Assert.assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());