HDFS-11830. Ozone: Datanode needs to re-register to SCM if SCM is restarted. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-05-20 10:28:12 +08:00 committed by Owen O'Malley
parent 877e751c84
commit ae5242accb
7 changed files with 149 additions and 15 deletions

View File

@ -23,12 +23,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.container.common.statemachine import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine; .EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -125,10 +125,28 @@ public class HeartbeatEndpointTask
private void processResponse(SCMHeartbeatResponseProto response) { private void processResponse(SCMHeartbeatResponseProto response) {
for (SCMCommandResponseProto commandResponseProto : response for (SCMCommandResponseProto commandResponseProto : response
.getCommandsList()) { .getCommandsList()) {
if (commandResponseProto.getCmdType() == switch (commandResponseProto.getCmdType()) {
StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) { case sendContainerReport:
this.context.addCommand(SendContainerCommand.getFromProtobuf( this.context.addCommand(SendContainerCommand.getFromProtobuf(
commandResponseProto.getSendReport())); commandResponseProto.getSendReport()));
break;
case reregisterCommand:
if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM notification to register."
+ " Interrupt HEARTBEAT and transit to REGISTER state.");
}
rpcEndpoint.setState(EndPointStates.REGISTER);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Illegal state {} found, expecting {}.",
rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
}
}
break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCmdType().name());
} }
} }
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type.reregisterCommand;
/**
* Informs a datanode to register itself with SCM again.
*/
public class ReregisterCommand extends
SCMCommand<SCMReregisterCmdResponseProto>{
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public Type getType() {
return reregisterCommand;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
public SCMReregisterCmdResponseProto getProto() {
return SCMReregisterCmdResponseProto
.newBuilder()
.build();
}
}

View File

@ -68,6 +68,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type; .StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos; .StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolServerSideTranslatorPB; .StorageContainerDatanodeProtocolServerSideTranslatorPB;
@ -325,6 +327,11 @@ public class StorageContainerManager
return builder.setCmdType(Type.sendContainerReport) return builder.setCmdType(Type.sendContainerReport)
.setSendReport(SendContainerReportProto.getDefaultInstance()) .setSendReport(SendContainerReportProto.getDefaultInstance())
.build(); .build();
case reregisterCommand:
return builder.setCmdType(Type.reregisterCommand)
.setReregisterProto(SCMReregisterCmdResponseProto
.getDefaultInstance())
.build();
default: default:
throw new IllegalArgumentException("Not implemented"); throw new IllegalArgumentException("Not implemented");
} }

View File

@ -60,19 +60,12 @@ public class CommandQueue {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<SCMCommand> getCommand(final DatanodeID datanodeID) { List<SCMCommand> getCommand(final DatanodeID datanodeID) {
lock.lock(); lock.lock();
try { try {
if (commandMap.containsKey(datanodeID)) { List<SCMCommand> cmds = commandMap.remove(datanodeID);
List temp = commandMap.get(datanodeID); return cmds == null ? DEFAULT_LIST : cmds;
if (temp.size() > 0) {
commandMap.put(datanodeID, DEFAULT_LIST);
return temp;
}
}
} finally { } finally {
lock.unlock(); lock.unlock();
} }
return DEFAULT_LIST;
} }
/** /**

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -585,7 +586,10 @@ public class SCMNodeManager
updateNodeStat(datanodeID, nodeReport); updateNodeStat(datanodeID, nodeReport);
return; return;
} }
LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID); LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
this.commandQueue.addCommand(hbItem.getDatanodeID(),
new ReregisterCommand());
} }
private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) { private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
@ -704,7 +708,6 @@ public class SCMNodeManager
* @return SCMCommand * @return SCMCommand
*/ */
private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
if (datanodeID.getDatanodeUuid() != null && if (datanodeID.getDatanodeUuid() != null &&
nodes.containsKey(datanodeID.getDatanodeUuid())) { nodes.containsKey(datanodeID.getDatanodeUuid())) {
LOG.trace("Datanode is already registered. Datanode: {}", LOG.trace("Datanode is already registered. Datanode: {}",

View File

@ -171,6 +171,12 @@ message SCMRegisteredCmdResponseProto {
optional SCMNodeAddressList addressList = 5; optional SCMNodeAddressList addressList = 5;
} }
/**
* SCM informs a datanode to register itself again.
* With recieving this command, datanode will transit to REGISTER state.
*/
message SCMReregisterCmdResponseProto {}
/** /**
* Container ID maintains the container's Identity along with cluster ID * Container ID maintains the container's Identity along with cluster ID
* after the registration is done. * after the registration is done.
@ -195,6 +201,7 @@ enum Type {
versionCommand = 2; versionCommand = 2;
registeredCommand = 3; registeredCommand = 3;
sendContainerReport = 4; sendContainerReport = 4;
reregisterCommand = 5;
} }
/* /*
@ -205,7 +212,7 @@ message SCMCommandResponseProto {
optional SCMRegisteredCmdResponseProto registeredProto = 3; optional SCMRegisteredCmdResponseProto registeredProto = 3;
optional SCMVersionResponseProto versionProto = 4; optional SCMVersionResponseProto versionProto = 4;
optional SendContainerReportProto sendReport = 5; optional SendContainerReportProto sendReport = 5;
optional SCMReregisterCmdResponseProto reregisterProto = 6;
} }

View File

@ -17,11 +17,13 @@
*/ */
package org.apache.hadoop.ozone.scm.node; package org.apache.hadoop.ozone.scm.node;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -46,6 +48,8 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
@ -241,6 +245,49 @@ public class TestNodeManager {
nodeManager.getLastHBProcessedCount()); nodeManager.getLastHBProcessedCount());
} }
/**
* Asserts scm informs datanodes to re-register with the nodemanager
* on a restart.
*
* @throws Exception
*/
@Test
public void testScmHeartbeatAfterRestart() throws Exception {
OzoneConfiguration conf = getConf();
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
nodemanager.register(datanodeID);
List<SCMCommand> command = nodemanager.sendHeartbeat(datanodeID, null);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID));
Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty());
}
// Sends heartbeat without registering to SCM.
// This happens when SCM restarts.
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
Assert.assertFalse(nodemanager
.getAllNodes().contains(datanodeID));
try {
// SCM handles heartbeat asynchronously.
// It may need more than one heartbeat processing to
// send the notification.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
List<SCMCommand> command =
nodemanager.sendHeartbeat(datanodeID, null);
return command.size() == 1 && command.get(0).getType()
.equals(Type.reregisterCommand);
}
}, 100, 3 * 1000);
} catch (TimeoutException e) {
Assert.fail("Times out to verify that scm informs "
+ "datanode to re-register itself.");
}
}
}
/** /**
* Asserts that we detect as many healthy nodes as we have generated heartbeat * Asserts that we detect as many healthy nodes as we have generated heartbeat
* for. * for.