svn merge -c 1214033 from trunk for HDFS-2661.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
38fe4f7215
commit
0ec97841cc
|
@ -33,6 +33,8 @@ Release 0.23-PB - Unreleased
|
||||||
HDFS-2663. Optional protobuf parameters are not handled correctly.
|
HDFS-2663. Optional protobuf parameters are not handled correctly.
|
||||||
(suresh)
|
(suresh)
|
||||||
|
|
||||||
|
HDFS-2661. Enable protobuf RPC for DatanodeProtocol. (jitendra)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2018. Move all journal stream management code into one place.
|
HDFS-2018. Move all journal stream management code into one place.
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
|
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
|
||||||
.setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
|
.setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
|
||||||
.setCapacity(dfsUsed).setRemaining(remaining)
|
.setDfsUsed(dfsUsed).setRemaining(remaining)
|
||||||
.setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
|
.setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
|
||||||
.setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
|
.setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
|
||||||
HeartbeatResponseProto resp;
|
HeartbeatResponseProto resp;
|
||||||
|
@ -194,7 +194,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
|
||||||
.setBlockPoolId(poolId);
|
.setBlockPoolId(poolId);
|
||||||
if (blocks != null) {
|
if (blocks != null) {
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
builder.setBlocks(i, blocks[i]);
|
builder.addBlocks(blocks[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
BlockReportRequestProto req = builder.build();
|
BlockReportRequestProto req = builder.build();
|
||||||
|
@ -217,7 +217,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
|
||||||
.setBlockPoolId(poolId);
|
.setBlockPoolId(poolId);
|
||||||
if (receivedAndDeletedBlocks != null) {
|
if (receivedAndDeletedBlocks != null) {
|
||||||
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
|
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
|
||||||
builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i]));
|
builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
BlockReceivedAndDeletedRequestProto req = builder.build();
|
BlockReceivedAndDeletedRequestProto req = builder.build();
|
||||||
|
@ -290,7 +290,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
|
||||||
.setNewLength(newlength).setCloseFile(closeFile)
|
.setNewLength(newlength).setCloseFile(closeFile)
|
||||||
.setDeleteBlock(deleteblock);
|
.setDeleteBlock(deleteblock);
|
||||||
for (int i = 0; i < newtargets.length; i++) {
|
for (int i = 0; i < newtargets.length; i++) {
|
||||||
builder.setNewTaragets(i, PBHelper.convert(newtargets[i]));
|
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
|
||||||
}
|
}
|
||||||
CommitBlockSynchronizationRequestProto req = builder.build();
|
CommitBlockSynchronizationRequestProto req = builder.build();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
||||||
@Override
|
@Override
|
||||||
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
|
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
|
||||||
HeartbeatRequestProto request) throws ServiceException {
|
HeartbeatRequestProto request) throws ServiceException {
|
||||||
DatanodeCommand[] cmds;
|
DatanodeCommand[] cmds = null;
|
||||||
try {
|
try {
|
||||||
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
|
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
|
||||||
request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
|
request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
|
||||||
|
@ -120,7 +120,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
||||||
@Override
|
@Override
|
||||||
public BlockReportResponseProto blockReport(RpcController controller,
|
public BlockReportResponseProto blockReport(RpcController controller,
|
||||||
BlockReportRequestProto request) throws ServiceException {
|
BlockReportRequestProto request) throws ServiceException {
|
||||||
DatanodeCommand cmd;
|
DatanodeCommand cmd = null;
|
||||||
List<Long> blockIds = request.getBlocksList();
|
List<Long> blockIds = request.getBlocksList();
|
||||||
long[] blocks = new long[blockIds.size()];
|
long[] blocks = new long[blockIds.size()];
|
||||||
for (int i = 0; i < blockIds.size(); i++) {
|
for (int i = 0; i < blockIds.size(); i++) {
|
||||||
|
|
|
@ -665,6 +665,9 @@ public class PBHelper {
|
||||||
case DatanodeProtocol.DNA_INVALIDATE:
|
case DatanodeProtocol.DNA_INVALIDATE:
|
||||||
builder.setAction(BlockCommandProto.Action.INVALIDATE);
|
builder.setAction(BlockCommandProto.Action.INVALIDATE);
|
||||||
break;
|
break;
|
||||||
|
case DatanodeProtocol.DNA_SHUTDOWN:
|
||||||
|
builder.setAction(BlockCommandProto.Action.SHUTDOWN);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
Block[] blocks = cmd.getBlocks();
|
Block[] blocks = cmd.getBlocks();
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
|
@ -685,6 +688,10 @@ public class PBHelper {
|
||||||
|
|
||||||
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
|
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
|
||||||
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
|
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
|
||||||
|
if (datanodeCommand == null) {
|
||||||
|
return builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
switch (datanodeCommand.getAction()) {
|
switch (datanodeCommand.getAction()) {
|
||||||
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
|
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
|
||||||
builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
|
builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
|
||||||
|
@ -711,11 +718,18 @@ public class PBHelper {
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_TRANSFER:
|
case DatanodeProtocol.DNA_TRANSFER:
|
||||||
case DatanodeProtocol.DNA_INVALIDATE:
|
case DatanodeProtocol.DNA_INVALIDATE:
|
||||||
|
case DatanodeProtocol.DNA_SHUTDOWN:
|
||||||
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
|
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
|
||||||
PBHelper.convert((BlockCommand) datanodeCommand));
|
PBHelper.convert((BlockCommand) datanodeCommand));
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_SHUTDOWN: //Not expected
|
case DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS:
|
||||||
|
case DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE:
|
||||||
|
builder.setCmdType(DatanodeCommandProto.Type.UpgradeCommand)
|
||||||
|
.setUpgradeCmd(PBHelper.convert((UpgradeCommand) datanodeCommand));
|
||||||
|
break;
|
||||||
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
|
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
|
||||||
|
default:
|
||||||
|
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
@ -754,13 +768,15 @@ public class PBHelper {
|
||||||
|
|
||||||
public static BlockCommand convert(BlockCommandProto blkCmd) {
|
public static BlockCommand convert(BlockCommandProto blkCmd) {
|
||||||
List<BlockProto> blockProtoList = blkCmd.getBlocksList();
|
List<BlockProto> blockProtoList = blkCmd.getBlocksList();
|
||||||
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
|
|
||||||
DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][];
|
|
||||||
Block[] blocks = new Block[blockProtoList.size()];
|
Block[] blocks = new Block[blockProtoList.size()];
|
||||||
for (int i = 0; i < blockProtoList.size(); i++) {
|
for (int i = 0; i < blockProtoList.size(); i++) {
|
||||||
targets[i] = PBHelper.convert(targetList.get(i));
|
|
||||||
blocks[i] = PBHelper.convert(blockProtoList.get(i));
|
blocks[i] = PBHelper.convert(blockProtoList.get(i));
|
||||||
}
|
}
|
||||||
|
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
|
||||||
|
DatanodeInfo[][] targets = new DatanodeInfo[targetList.size()][];
|
||||||
|
for (int i = 0; i < targetList.size(); i++) {
|
||||||
|
targets[i] = PBHelper.convert(targetList.get(i));
|
||||||
|
}
|
||||||
int action = DatanodeProtocol.DNA_UNKNOWN;
|
int action = DatanodeProtocol.DNA_UNKNOWN;
|
||||||
switch (blkCmd.getAction()) {
|
switch (blkCmd.getAction()) {
|
||||||
case TRANSFER:
|
case TRANSFER:
|
||||||
|
@ -769,6 +785,9 @@ public class PBHelper {
|
||||||
case INVALIDATE:
|
case INVALIDATE:
|
||||||
action = DatanodeProtocol.DNA_INVALIDATE;
|
action = DatanodeProtocol.DNA_INVALIDATE;
|
||||||
break;
|
break;
|
||||||
|
case SHUTDOWN:
|
||||||
|
action = DatanodeProtocol.DNA_SHUTDOWN;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
|
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
|
||||||
}
|
}
|
||||||
|
@ -800,9 +819,13 @@ public class PBHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static UpgradeCommandProto convert(UpgradeCommand comm) {
|
public static UpgradeCommandProto convert(UpgradeCommand comm) {
|
||||||
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder()
|
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder();
|
||||||
.setVersion(comm.getVersion())
|
if (comm == null) {
|
||||||
.setUpgradeStatus(comm.getCurrentStatus());
|
return builder.setAction(UpgradeCommandProto.Action.UNKNOWN)
|
||||||
|
.setVersion(0).setUpgradeStatus(0).build();
|
||||||
|
}
|
||||||
|
builder.setVersion(comm.getVersion()).setUpgradeStatus(
|
||||||
|
comm.getCurrentStatus());
|
||||||
switch (comm.getAction()) {
|
switch (comm.getAction()) {
|
||||||
case UpgradeCommand.UC_ACTION_REPORT_STATUS:
|
case UpgradeCommand.UC_ACTION_REPORT_STATUS:
|
||||||
builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
|
builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
|
||||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
@ -93,7 +95,7 @@ class BPOfferService implements Runnable {
|
||||||
boolean resetBlockReportTime = true;
|
boolean resetBlockReportTime = true;
|
||||||
|
|
||||||
Thread bpThread;
|
Thread bpThread;
|
||||||
DatanodeProtocol bpNamenode;
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
||||||
private long lastHeartbeat = 0;
|
private long lastHeartbeat = 0;
|
||||||
private volatile boolean initialized = false;
|
private volatile boolean initialized = false;
|
||||||
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
|
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
|
||||||
|
@ -164,7 +166,7 @@ class BPOfferService implements Runnable {
|
||||||
* Used to inject a spy NN in the unit tests.
|
* Used to inject a spy NN in the unit tests.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setNameNode(DatanodeProtocol dnProtocol) {
|
void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
|
||||||
bpNamenode = dnProtocol;
|
bpNamenode = dnProtocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,8 +226,8 @@ class BPOfferService implements Runnable {
|
||||||
|
|
||||||
private void connectToNNAndHandshake() throws IOException {
|
private void connectToNNAndHandshake() throws IOException {
|
||||||
// get NN proxy
|
// get NN proxy
|
||||||
bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
|
bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
|
||||||
DatanodeProtocol.versionID, nnAddr, dn.getConf());
|
dn.getConf());
|
||||||
|
|
||||||
// First phase of the handshake with NN - get the namespace
|
// First phase of the handshake with NN - get the namespace
|
||||||
// info.
|
// info.
|
||||||
|
|
|
@ -109,6 +109,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||||
|
@ -1281,7 +1282,7 @@ public class DataNode extends Configured
|
||||||
|
|
||||||
//inform NameNodes
|
//inform NameNodes
|
||||||
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
|
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
|
||||||
DatanodeProtocol nn = bpos.bpNamenode;
|
DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode;
|
||||||
try {
|
try {
|
||||||
nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
|
nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
|
@ -1314,7 +1315,8 @@ public class DataNode extends Configured
|
||||||
private void transferBlock( ExtendedBlock block,
|
private void transferBlock( ExtendedBlock block,
|
||||||
DatanodeInfo xferTargets[]
|
DatanodeInfo xferTargets[]
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
|
DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
|
||||||
|
.getBlockPoolId());
|
||||||
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
||||||
|
|
||||||
if (!data.isValidBlock(block)) {
|
if (!data.isValidBlock(block)) {
|
||||||
|
@ -1982,7 +1984,8 @@ public class DataNode extends Configured
|
||||||
* @return Namenode corresponding to the bpid
|
* @return Namenode corresponding to the bpid
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
|
public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
|
||||||
|
throws IOException {
|
||||||
BPOfferService bpos = blockPoolManager.get(bpid);
|
BPOfferService bpos = blockPoolManager.get(bpid);
|
||||||
if(bpos == null || bpos.bpNamenode == null) {
|
if(bpos == null || bpos.bpNamenode == null) {
|
||||||
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
|
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
|
||||||
|
@ -1994,7 +1997,8 @@ public class DataNode extends Configured
|
||||||
void syncBlock(RecoveringBlock rBlock,
|
void syncBlock(RecoveringBlock rBlock,
|
||||||
List<BlockRecord> syncList) throws IOException {
|
List<BlockRecord> syncList) throws IOException {
|
||||||
ExtendedBlock block = rBlock.getBlock();
|
ExtendedBlock block = rBlock.getBlock();
|
||||||
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
|
DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
|
||||||
|
.getBlockPoolId());
|
||||||
|
|
||||||
long recoveryId = rBlock.getNewGenerationStamp();
|
long recoveryId = rBlock.getNewGenerationStamp();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -60,6 +60,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
||||||
|
@ -143,6 +146,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
clientProtocolServerTranslator =
|
clientProtocolServerTranslator =
|
||||||
new ClientNamenodeProtocolServerSideTranslatorR23(this);
|
new ClientNamenodeProtocolServerSideTranslatorR23(this);
|
||||||
|
|
||||||
|
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
|
||||||
|
new DatanodeProtocolServerSideTranslatorPB(this);
|
||||||
|
BlockingService dnProtoPbService = DatanodeProtocolService
|
||||||
|
.newReflectiveBlockingService(dnProtoPbTranslator);
|
||||||
|
|
||||||
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
|
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
|
||||||
new NamenodeProtocolServerSideTranslatorPB(this);
|
new NamenodeProtocolServerSideTranslatorPB(this);
|
||||||
BlockingService service = NamenodeProtocolService
|
BlockingService service = NamenodeProtocolService
|
||||||
|
@ -160,8 +168,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
||||||
serviceHandlerCount,
|
serviceHandlerCount,
|
||||||
false, conf, namesystem.getDelegationTokenSecretManager());
|
false, conf, namesystem.getDelegationTokenSecretManager());
|
||||||
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
|
||||||
DatanodeProtocol.class, this);
|
|
||||||
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
RefreshAuthorizationPolicyProtocol.class, this);
|
RefreshAuthorizationPolicyProtocol.class, this);
|
||||||
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
|
@ -170,6 +176,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
GetUserMappingsProtocol.class, this);
|
GetUserMappingsProtocol.class, this);
|
||||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
||||||
serviceRpcServer);
|
serviceRpcServer);
|
||||||
|
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
||||||
|
serviceRpcServer);
|
||||||
|
|
||||||
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
||||||
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
||||||
|
@ -184,8 +192,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
clientProtocolServerTranslator, socAddr.getHostName(),
|
clientProtocolServerTranslator, socAddr.getHostName(),
|
||||||
socAddr.getPort(), handlerCount, false, conf,
|
socAddr.getPort(), handlerCount, false, conf,
|
||||||
namesystem.getDelegationTokenSecretManager());
|
namesystem.getDelegationTokenSecretManager());
|
||||||
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
|
||||||
DatanodeProtocol.class, this);
|
|
||||||
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
RefreshAuthorizationPolicyProtocol.class, this);
|
RefreshAuthorizationPolicyProtocol.class, this);
|
||||||
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
|
||||||
|
@ -194,7 +200,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
GetUserMappingsProtocol.class, this);
|
GetUserMappingsProtocol.class, this);
|
||||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
|
||||||
clientRpcServer);
|
clientRpcServer);
|
||||||
|
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
|
||||||
|
clientRpcServer);
|
||||||
|
|
||||||
// set service-level authorization security policy
|
// set service-level authorization security policy
|
||||||
if (serviceAuthEnabled =
|
if (serviceAuthEnabled =
|
||||||
|
|
|
@ -76,6 +76,8 @@ public interface DatanodeProtocol extends VersionedProtocol {
|
||||||
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
|
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
|
||||||
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
|
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
|
||||||
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
|
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
|
||||||
|
final static int DNA_UC_ACTION_REPORT_STATUS = 100; // Report upgrade status
|
||||||
|
final static int DNA_UC_ACTION_START_UPGRADE = 101; // start upgrade
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register Datanode.
|
* Register Datanode.
|
||||||
|
|
|
@ -41,8 +41,10 @@ import org.apache.hadoop.io.WritableFactory;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class UpgradeCommand extends DatanodeCommand {
|
public class UpgradeCommand extends DatanodeCommand {
|
||||||
public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
|
public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
|
||||||
public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
|
public final static int UC_ACTION_REPORT_STATUS =
|
||||||
public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade
|
DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS;
|
||||||
|
public final static int UC_ACTION_START_UPGRADE =
|
||||||
|
DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE;
|
||||||
|
|
||||||
private int version;
|
private int version;
|
||||||
private short upgradeStatus;
|
private short upgradeStatus;
|
||||||
|
|
|
@ -47,6 +47,7 @@ message DatanodeCommandProto {
|
||||||
KeyUpdateCommand = 4;
|
KeyUpdateCommand = 4;
|
||||||
RegisterCommand = 5;
|
RegisterCommand = 5;
|
||||||
UpgradeCommand = 6;
|
UpgradeCommand = 6;
|
||||||
|
NullDatanodeCommand = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
required Type cmdType = 1; // Type of the command
|
required Type cmdType = 1; // Type of the command
|
||||||
|
@ -80,6 +81,7 @@ message BlockCommandProto {
|
||||||
enum Action {
|
enum Action {
|
||||||
TRANSFER = 1; // Transfer blocks to another datanode
|
TRANSFER = 1; // Transfer blocks to another datanode
|
||||||
INVALIDATE = 2; // Invalidate blocks
|
INVALIDATE = 2; // Invalidate blocks
|
||||||
|
SHUTDOWN = 3; // Shutdown the datanode
|
||||||
}
|
}
|
||||||
required Action action = 1;
|
required Action action = 1;
|
||||||
required string blockPoolId = 2;
|
required string blockPoolId = 2;
|
||||||
|
@ -190,7 +192,7 @@ message BlockReportRequestProto {
|
||||||
* cmd - Command from namenode to the datanode
|
* cmd - Command from namenode to the datanode
|
||||||
*/
|
*/
|
||||||
message BlockReportResponseProto {
|
message BlockReportResponseProto {
|
||||||
required DatanodeCommandProto cmd = 1;
|
optional DatanodeCommandProto cmd = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
@ -518,7 +519,7 @@ public class MiniDFSCluster {
|
||||||
setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine);
|
setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine);
|
||||||
setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine);
|
setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine);
|
||||||
setRpcEngine(conf, ClientProtocol.class, rpcEngine);
|
setRpcEngine(conf, ClientProtocol.class, rpcEngine);
|
||||||
setRpcEngine(conf, DatanodeProtocol.class, rpcEngine);
|
setRpcEngine(conf, DatanodeProtocolPB.class, rpcEngine);
|
||||||
setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine);
|
setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine);
|
||||||
setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine);
|
setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine);
|
||||||
setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine);
|
setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine);
|
||||||
|
|
|
@ -24,9 +24,8 @@ import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -49,7 +48,8 @@ public class TestDatanodeRegister {
|
||||||
|
|
||||||
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
|
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
|
||||||
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
|
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
|
||||||
DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class);
|
DatanodeProtocolClientSideTranslatorPB fakeDNProt =
|
||||||
|
mock(DatanodeProtocolClientSideTranslatorPB.class);
|
||||||
when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
|
when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
|
||||||
|
|
||||||
bpos.setNameNode( fakeDNProt );
|
bpos.setNameNode( fakeDNProt );
|
||||||
|
|
Loading…
Reference in New Issue