HDFS-5848. Add rolling upgrade status to heartbeat response.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1563384 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-01 08:58:55 +00:00
parent f2972402b7
commit 4f92eb2f61
15 changed files with 145 additions and 36 deletions

View File

@ -18,3 +18,5 @@ HDFS-5535 subtasks:
HDFS-5754. Split LayoutVerion into NameNodeLayoutVersion and
DataNodeLayoutVersion. (Brandon Li via szetszwo)
HDFS-5848. Add rolling upgrade status to heartbeat response. (szetszwo)

View File

@ -27,17 +27,19 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RollingUpgradeInfo {
public static final RollingUpgradeInfo EMPTY_INFO = new RollingUpgradeInfo(0);
public class RollingUpgradeInfo extends RollingUpgradeStatus {
public static final RollingUpgradeInfo EMPTY_INFO = new RollingUpgradeInfo(
null, 0);
private long startTime;
private long finalizeTime;
public RollingUpgradeInfo(long startTime) {
this(startTime, 0L);
public RollingUpgradeInfo(String blockPoolId, long startTime) {
this(blockPoolId, startTime, 0L);
}
public RollingUpgradeInfo(long startTime, long finalizeTime) {
public RollingUpgradeInfo(String blockPoolId, long startTime, long finalizeTime) {
super(blockPoolId);
this.startTime = startTime;
this.finalizeTime = finalizeTime;
}
@ -62,7 +64,7 @@ public long getFinalizeTime() {
@Override
public int hashCode() {
//only use lower 32 bits
return (int)startTime ^ (int)finalizeTime;
return super.hashCode() ^ (int)startTime ^ (int)finalizeTime;
}
@Override
@ -73,13 +75,15 @@ public boolean equals(Object obj) {
return false;
}
final RollingUpgradeInfo that = (RollingUpgradeInfo)obj;
return this.startTime == that.startTime
return super.equals(that)
&& this.startTime == that.startTime
&& this.finalizeTime == that.finalizeTime;
}
@Override
public String toString() {
return " Start Time: " + (startTime == 0? "<NOT STARTED>": timestamp2String(startTime))
return super.toString()
+ "\n Start Time: " + (startTime == 0? "<NOT STARTED>": timestamp2String(startTime))
+ "\n Finalize Time: " + (finalizeTime == 0? "<NOT FINALIZED>": timestamp2String(finalizeTime));
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Rolling upgrade status
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RollingUpgradeStatus {
private String blockPoolId;
public RollingUpgradeStatus(String blockPoolId) {
this.blockPoolId = blockPoolId;
}
public String getBlockPoolId() {
return blockPoolId;
}
@Override
public int hashCode() {
return blockPoolId.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof RollingUpgradeStatus)) {
return false;
}
final RollingUpgradeStatus that = (RollingUpgradeStatus)obj;
return this.blockPoolId.equals(that.blockPoolId);
}
@Override
public String toString() {
return " Block Pool ID: " + blockPoolId;
}
}

View File

@ -120,7 +120,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
@ -646,8 +645,7 @@ public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOE
.setAction(PBHelper.convert(action)).build();
try {
final RollingUpgradeResponseProto proto = rpcProxy.rollingUpgrade(null, r);
final RollingUpgradeInfoProto info = proto.getRollingUpgradeInfo();
return new RollingUpgradeInfo(info.getStartTime(), info.getFinalizeTime());
return PBHelper.convert(proto.getRollingUpgradeInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -183,7 +183,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
cmds[index] = PBHelper.convert(p);
index++;
}
return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()));
return new HeartbeatResponse(cmds,
PBHelper.convert(resp.getHaStatus()),
PBHelper.convert(resp.getRollingUpgradeStatus()));
}
@Override

View File

@ -121,6 +121,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
}
}
builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState()));
builder.setRollingUpgradeStatus(PBHelper.convertRollingUpgradeStatus(
response.getRollingUpdateStatus()));
return builder.build();
}

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
@ -124,6 +125,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto;
@ -1453,13 +1455,31 @@ public static RollingUpgradeAction convert(RollingUpgradeActionProto a) {
}
}
public static RollingUpgradeStatusProto convertRollingUpgradeStatus(
RollingUpgradeStatus status) {
return RollingUpgradeStatusProto.newBuilder()
.setBlockPoolId(status.getBlockPoolId())
.build();
}
public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) {
return new RollingUpgradeStatus(proto.getBlockPoolId());
}
public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {
return RollingUpgradeInfoProto.newBuilder()
.setStatus(convertRollingUpgradeStatus(info))
.setStartTime(info.getStartTime())
.setFinalizeTime(info.getFinalizeTime())
.build();
}
public static RollingUpgradeInfo convert(RollingUpgradeInfoProto proto) {
RollingUpgradeStatusProto status = proto.getStatus();
return new RollingUpgradeInfo(status.getBlockPoolId(),
proto.getStartTime(), proto.getFinalizeTime());
}
public static CorruptFileBlocks convert(CorruptFileBlocksProto c) {
if (c == null)
return null;

View File

@ -36,10 +36,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
@ -4193,23 +4193,24 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes);
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
} finally {
readUnlock();
}
}
private NNHAStatusHeartbeat createHaStatusHeartbeat() {
HAState state = haContext.getState();
return new NNHAStatusHeartbeat(state.getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
}
/**
* Returns whether or not there were available resources at the last check of
* resources.
@ -7102,12 +7103,12 @@ RollingUpgradeInfo startRollingUpgrade() throws IOException {
final String err = "Failed to start rolling upgrade";
checkNameNodeSafeMode(err);
if (rollingUpgradeInfo != null) {
if (isRollingUpgrade()) {
throw new RollingUpgradeException(err
+ " since a rolling upgrade is already in progress."
+ "\nExisting rolling upgrade info: " + rollingUpgradeInfo);
}
final CheckpointSignature cs = getFSImage().rollEditLog();
LOG.info("Successfully rolled edit log for preparing rolling upgrade."
+ " Checkpoint signature: " + cs);
@ -7125,7 +7126,12 @@ RollingUpgradeInfo startRollingUpgrade() throws IOException {
}
void setRollingUpgradeInfo(long startTime) {
rollingUpgradeInfo = new RollingUpgradeInfo(startTime);;
rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId, startTime);
}
/** Is rolling upgrade in progress? */
boolean isRollingUpgrade() {
return rollingUpgradeInfo != null;
}
RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
@ -7138,12 +7144,13 @@ RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
final String err = "Failed to finalize rolling upgrade";
checkNameNodeSafeMode(err);
if (rollingUpgradeInfo == null) {
if (!isRollingUpgrade()) {
throw new RollingUpgradeException(err
+ " since there is no rolling upgrade in progress.");
}
returnInfo = new RollingUpgradeInfo(rollingUpgradeInfo.getStartTime(), now());
returnInfo = new RollingUpgradeInfo(blockPoolId,
rollingUpgradeInfo.getStartTime(), now());
getFSImage().saveNamespace(this);
rollingUpgradeInfo = null;
} finally {

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -31,11 +32,14 @@ public class HeartbeatResponse {
/** Information about the current HA-related state of the NN */
private NNHAStatusHeartbeat haStatus;
private RollingUpgradeStatus rollingUpdateStatus;
public HeartbeatResponse(DatanodeCommand[] cmds,
NNHAStatusHeartbeat haStatus) {
NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus) {
commands = cmds;
this.haStatus = haStatus;
this.rollingUpdateStatus = rollingUpdateStatus;
}
public DatanodeCommand[] getCommands() {
@ -45,4 +49,8 @@ public DatanodeCommand[] getCommands() {
public NNHAStatusHeartbeat getNameNodeHaState() {
return haStatus;
}
public RollingUpgradeStatus getRollingUpdateStatus() {
return rollingUpdateStatus;
}
}

View File

@ -343,8 +343,9 @@ message RollingUpgradeRequestProto {
}
message RollingUpgradeInfoProto {
required uint64 startTime = 1;
required uint64 finalizeTime = 2;
required RollingUpgradeStatusProto status = 1;
required uint64 startTime = 2;
required uint64 finalizeTime = 3;
}
message RollingUpgradeResponseProto {

View File

@ -221,6 +221,7 @@ message NNHAStatusHeartbeatProto {
message HeartbeatResponseProto {
repeated DatanodeCommandProto cmds = 1; // Returned commands can be null
required NNHAStatusHeartbeatProto haStatus = 2;
optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
}
/**

View File

@ -459,4 +459,9 @@ message SnapshotInfoProto {
// TODO: do we need access time?
}
/**
* Rolling upgrade status
*/
message RollingUpgradeStatusProto {
required string blockPoolId = 1;
}

View File

@ -153,7 +153,7 @@ public HeartbeatAnswer(int nnIdx) {
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
heartbeatCounts[nnIdx]++;
return new HeartbeatResponse(new DatanodeCommand[0],
mockHaStatuses[nnIdx]);
mockHaStatuses[nnIdx], null);
}
}

View File

@ -163,7 +163,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyInt()))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1)));
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null));
dn = new DataNode(conf, locations, null) {
@Override

View File

@ -151,10 +151,9 @@ public void tearDown() throws Exception {
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
throws IOException {
HeartbeatResponse response = new HeartbeatResponse(
cmds,
new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId()));
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),