HDFS-11451. Ozone: Add protobuf definitions for container reports. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2017-03-07 10:28:21 -08:00 committed by Owen O'Malley
parent 02fb9aed32
commit 3e1317de02
14 changed files with 331 additions and 68 deletions

View File

@ -73,7 +73,7 @@ public class SCMConnectionManager {
* *
* @return - Return RPC timeout. * @return - Return RPC timeout.
*/ */
public long getRpcTimeout() { public int getRpcTimeout() {
return rpcTimeout; return rpcTimeout;
} }
@ -128,7 +128,7 @@ public class SCMConnectionManager {
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
StorageContainerDatanodeProtocolPB.class, version, StorageContainerDatanodeProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf, address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout); NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);

View File

@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
@ -34,6 +34,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
/** /**
* Current Context of State Machine. * Current Context of State Machine.
*/ */
@ -45,6 +49,9 @@ public class StateContext {
private final Configuration conf; private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;
private SCMNodeReport nrState; private SCMNodeReport nrState;
private ReportState reportState;
private static final ReportState DEFAULT_REPORT_STATE =
ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
/** /**
* Constructs a StateContext. * Constructs a StateContext.
@ -174,6 +181,7 @@ public class StateContext {
if (isExiting(newState)) { if (isExiting(newState)) {
task.onExit(); task.onExit();
} }
this.clearReportState();
this.setState(newState); this.setState(newState);
} }
} }
@ -215,4 +223,32 @@ public class StateContext {
} }
/**
* Gets the ReportState.
* @return ReportState.
*/
public synchronized ReportState getContainerReportState() {
if (reportState == null) {
return DEFAULT_REPORT_STATE;
}
return reportState;
}
/**
* Sets the ReportState.
* @param rState - ReportState.
*/
public synchronized void setContainerReportState(ReportState rState) {
this.reportState = rState;
}
/**
* Clears report state after it has been communicated.
*/
public synchronized void clearReportState() {
if(reportState != null) {
setContainerReportState(null);
}
}
} }

View File

@ -26,6 +26,13 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -89,12 +96,13 @@ public class HeartbeatEndpointTask
DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
.containerNodeIDProto.getDatanodeID()); .containerNodeIDProto.getDatanodeID());
rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID, SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
this.context.getNodeReport()); .sendHeartbeat(datanodeID, this.context.getNodeReport(),
this.context.getContainerReportState());
processResponse(reponse);
rpcEndpoint.zeroMissedCount(); rpcEndpoint.zeroMissedCount();
} catch (IOException ex) { } catch (IOException ex) {
rpcEndpoint.logIfNeeded(ex rpcEndpoint.logIfNeeded(ex);
);
} finally { } finally {
rpcEndpoint.unlock(); rpcEndpoint.unlock();
} }
@ -109,6 +117,27 @@ public class HeartbeatEndpointTask
return new Builder(); return new Builder();
} }
/**
* Add this command to command processing Queue.
*
* @param response - SCMHeartbeat response.
*/
private void processResponse(SCMHeartbeatResponseProto response) {
for (SCMCommandResponseProto commandResponseProto : response
.getCommandsList()) {
if (commandResponseProto.getCmdType() ==
StorageContainerDatanodeProtocolProtos.Type.nullCmd) {
//this.context.addCommand(NullCommand.newBuilder().build());
LOG.debug("Discarding a null command from SCM.");
}
if (commandResponseProto.getCmdType() ==
StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
this.context.addCommand(SendContainerCommand.getFromProtobuf(
commandResponseProto.getSendReport()));
}
}
}
/** /**
* Builder class for HeartbeatEndpointTask. * Builder class for HeartbeatEndpointTask.
*/ */

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
@ -43,11 +44,12 @@ public interface StorageContainerDatanodeProtocol {
* Used by data node to send a Heartbeat. * Used by data node to send a Heartbeat.
* @param datanodeID - Datanode ID. * @param datanodeID - Datanode ID.
* @param nodeReport - node report state * @param nodeReport - node report state
* @param reportState - container report state.
* @return - SCMHeartbeatResponseProto * @return - SCMHeartbeatResponseProto
* @throws IOException * @throws IOException
*/ */
SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException; SCMNodeReport nodeReport, ReportState reportState) throws IOException;
/** /**
* Register Datanode. * Register Datanode.

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
/**
* Allows a Datanode to send in the container report.
*/
public class SendContainerCommand extends SCMCommand<SendContainerReportProto> {
/**
* Returns a NullCommand class from NullCommandResponse Proto.
* @param unused - unused
* @return NullCommand
*/
public static SendContainerCommand getFromProtobuf(
final SendContainerReportProto unused) {
return new SendContainerCommand();
}
/**
* returns a new builder.
* @return Builder
*/
public static SendContainerCommand.Builder newBuilder() {
return new SendContainerCommand.Builder();
}
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public Type getType() {
return Type.sendContainerReport;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public byte[] getProtoBufMessage() {
return SendContainerReportProto.newBuilder().build().toByteArray();
}
/**
* A Builder class this is the standard pattern we are using for all commands.
*/
public static class Builder {
/**
* Return a null command.
* @return - NullCommand.
*/
public SendContainerCommand build() {
return new SendContainerCommand();
}
}
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -121,11 +123,12 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
@Override @Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException { SCMNodeReport nodeReport, ReportState reportState) throws IOException {
SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
.newBuilder(); .newBuilder();
req.setDatanodeID(datanodeID.getProtoBufMessage()); req.setDatanodeID(datanodeID.getProtoBufMessage());
req.setNodeReport(nodeReport); req.setNodeReport(nodeReport);
req.setContainerReportState(reportState);
final SCMHeartbeatResponseProto resp; final SCMHeartbeatResponseProto resp;
try { try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());

View File

@ -78,7 +78,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
SCMHeartbeatRequestProto request) throws ServiceException { SCMHeartbeatRequestProto request) throws ServiceException {
try { try {
return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
.getDatanodeID()), request.getNodeReport()); .getDatanodeID()), request.getNodeReport(),
request.getContainerReportState());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos; .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto; .StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -391,12 +393,14 @@ public class StorageContainerManager
* Used by data node to send a Heartbeat. * Used by data node to send a Heartbeat.
* *
* @param datanodeID - Datanode ID. * @param datanodeID - Datanode ID.
* @param nodeReport - Node Report
* @param reportState - Container report ready info.
* @return - SCMHeartbeatResponseProto * @return - SCMHeartbeatResponseProto
* @throws IOException * @throws IOException
*/ */
@Override @Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) throws IOException { SCMNodeReport nodeReport, ReportState reportState) throws IOException {
List<SCMCommand> commands = List<SCMCommand> commands =
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport); getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
List<SCMCommandResponseProto> cmdReponses = new LinkedList<>(); List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();

View File

@ -49,7 +49,7 @@ public class ContainerMapping implements Mapping {
private final long cacheSize; private final long cacheSize;
private final Lock lock; private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8"); private final Charset encoding = Charset.forName("UTF-8");
private final LevelDBStore store; private final LevelDBStore containerStore;
private final Random rand; private final Random rand;
/** /**
@ -75,11 +75,14 @@ public class ContainerMapping implements Mapping {
throw throw
new IllegalArgumentException("SCM metadata directory is not valid."); new IllegalArgumentException("SCM metadata directory is not valid.");
} }
File dbPath = new File(scmMetaDataDir, "SCM.db");
Options options = new Options(); Options options = new Options();
options.cacheSize(this.cacheSize * (1024L * 1024L)); options.cacheSize(this.cacheSize * (1024L * 1024L));
options.createIfMissing(); options.createIfMissing();
store = new LevelDBStore(dbPath, options);
// Write the container name to pipeline mapping.
File containerDBPath = new File(scmMetaDataDir, "container.db");
containerStore = new LevelDBStore(containerDBPath, options);
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
rand = new Random(); rand = new Random();
} }
@ -103,6 +106,8 @@ public class ContainerMapping implements Mapping {
return pipeline; return pipeline;
} }
/** /**
* Returns the Pipeline from the container name. * Returns the Pipeline from the container name.
* *
@ -114,7 +119,8 @@ public class ContainerMapping implements Mapping {
Pipeline pipeline = null; Pipeline pipeline = null;
lock.lock(); lock.lock();
try { try {
byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes == null) { if (pipelineBytes == null) {
throw new IOException("Specified key does not exist. key : " + throw new IOException("Specified key does not exist. key : " +
containerName); containerName);
@ -145,7 +151,8 @@ public class ContainerMapping implements Mapping {
lock.lock(); lock.lock();
try { try {
byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes != null) { if (pipelineBytes != null) {
throw new IOException("Specified container already exists. key : " + throw new IOException("Specified container already exists. key : " +
containerName); containerName);
@ -153,7 +160,7 @@ public class ContainerMapping implements Mapping {
DatanodeID id = getDatanodeID(); DatanodeID id = getDatanodeID();
if (id != null) { if (id != null) {
pipeline = newPipelineFromNodes(id, containerName); pipeline = newPipelineFromNodes(id, containerName);
store.put(containerName.getBytes(encoding), containerStore.put(containerName.getBytes(encoding),
pipeline.getProtobufMessage().toByteArray()); pipeline.getProtobufMessage().toByteArray());
} }
} finally { } finally {
@ -193,8 +200,8 @@ public class ContainerMapping implements Mapping {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (store != null) { if (containerStore != null) {
store.close(); containerStore.close();
} }
} }
} }

View File

@ -39,7 +39,8 @@ public class CommandQueue {
private final Map<DatanodeID, List<SCMCommand>> commandMap; private final Map<DatanodeID, List<SCMCommand>> commandMap;
private final Lock lock; private final Lock lock;
private final List<SCMCommand> nullList; // This map is used as default return value containing one null command.
private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
/** /**
* Constructs a Command Queue. * Constructs a Command Queue.
@ -47,8 +48,7 @@ public class CommandQueue {
public CommandQueue() { public CommandQueue() {
commandMap = new HashMap<>(); commandMap = new HashMap<>();
lock = new ReentrantLock(); lock = new ReentrantLock();
nullList = new LinkedList<>(); DEFAULT_LIST.add(NullCommand.newBuilder().build());
nullList.add(NullCommand.newBuilder().build());
} }
/** /**
@ -75,7 +75,7 @@ public class CommandQueue {
} finally { } finally {
lock.unlock(); lock.unlock();
} }
return nullList; return DEFAULT_LIST;
} }
/** /**

View File

@ -47,15 +47,80 @@ import "DatanodeContainerProtocol.proto";
*/ */
message SCMHeartbeatRequestProto { message SCMHeartbeatRequestProto {
required DatanodeIDProto datanodeID = 1; required DatanodeIDProto datanodeID = 1;
optional SCMNodeReport nodeReport= 2; optional SCMNodeReport nodeReport = 2;
optional ReportState containerReportState = 3;
} }
enum ContainerState {
closed = 0;
open = 1;
}
/**
NodeState contains messages from datanode to SCM saying that it has
some information that SCM might be interested in.*/
message ReportState {
enum states {
noContainerReports = 0;
completeContinerReport = 1;
deltaContainerReport = 2;
}
required states state = 1;
required int64 count = 2 [default = 0];
}
/**
This message is used to persist the information about a container in the
SCM database, This information allows SCM to startup faster and avoid having
all container info in memory all the time.
*/
message ContainerPersistanceProto {
required ContainerState state = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
required ContainerInfo info = 3;
}
/**
This message is used to do a quick look up of which containers are effected
if a node goes down
*/
message NodeContianerMapping {
repeated string contianerName = 1;
}
/**
A container report contains the following information.
*/
message ContainerInfo {
required string containerName = 1;
repeated bytes finalhash = 2;
optional int64 size = 3;
optional int64 keycount = 4;
}
/**
A set of container reports, max count is generally set to
8192 since that keeps the size of the reports under 1 MB.
*/
message ContainerReports {
enum reportType {
fullReport = 0;
deltaReport = 1;
}
repeated ContainerInfo reports = 1;
required reportType type = 2;
}
/** /**
* This message is send along with the heart beat to report datanode * This message is send along with the heart beat to report datanode
* storage utilization by SCM. * storage utilization by SCM.
*/ */
message SCMNodeReport { message SCMNodeReport {
repeated SCMStorageReport storageReport= 1; repeated SCMStorageReport storageReport = 1;
} }
message SCMStorageReport { message SCMStorageReport {
@ -123,6 +188,12 @@ message NullCmdResponseProto {
} }
/**
This command tells the data node to send in the container report when possible
*/
message SendContainerReportProto {
}
/** /**
Type of commands supported by SCM to datanode protocol. Type of commands supported by SCM to datanode protocol.
*/ */
@ -130,6 +201,7 @@ enum Type {
nullCmd = 1; nullCmd = 1;
versionCommand = 2; versionCommand = 2;
registeredCommand = 3; registeredCommand = 3;
sendContainerReport = 4;
} }
/* /*
@ -138,6 +210,10 @@ enum Type {
message SCMCommandResponseProto { message SCMCommandResponseProto {
required Type cmdType = 2; // Type of the command required Type cmdType = 2; // Type of the command
optional NullCmdResponseProto nullCommand = 3; optional NullCmdResponseProto nullCommand = 3;
optional SCMRegisteredCmdResponseProto registeredProto = 4;
optional SCMVersionResponseProto versionProto = 5;
optional SendContainerReportProto sendReport = 6;
} }
@ -157,9 +233,9 @@ message SCMHeartbeatResponseProto {
* Here is a simple state diagram that shows how a datanode would boot up and * Here is a simple state diagram that shows how a datanode would boot up and
* communicate with SCM. * communicate with SCM.
* *
* +-----------------------+ * -----------------------
* | Start | * | Start |
* +----------+------------+ * ---------- ------------
* | * |
* | * |
* | * |
@ -167,19 +243,19 @@ message SCMHeartbeatResponseProto {
* | * |
* | * |
* | * |
* +----------v-------------+ * ----------v-------------
* | Searching for SCM +------------+ * | Searching for SCM ------------
* +----------+-------------+ | * ---------- ------------- |
* | | * | |
* | | * | |
* | +----------v-------------+ * | ----------v-------------
* | | Register if needed | * | | Register if needed |
* | +-----------+------------+ * | ----------- ------------
* | | * | |
* v | * v |
* +-----------+----------------+ | * ----------- ---------------- |
* +---------+ Heartbeat state <--------+ * --------- Heartbeat state <--------
* | +--------^-------------------+ * | --------^-------------------
* | | * | |
* | | * | |
* | | * | |
@ -187,7 +263,7 @@ message SCMHeartbeatResponseProto {
* | | * | |
* | | * | |
* | | * | |
* +------------------+ * ------------------
* *
* *
* *

View File

@ -20,10 +20,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.NullCommand; import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException; import java.io.IOException;
@ -37,6 +36,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private int rpcResponseDelay; private int rpcResponseDelay;
private AtomicInteger heartbeatCount = new AtomicInteger(0); private AtomicInteger heartbeatCount = new AtomicInteger(0);
private AtomicInteger rpcCount = new AtomicInteger(0); private AtomicInteger rpcCount = new AtomicInteger(0);
private ReportState reportState;
/** /**
* Returns the number of heartbeats made to this class. * Returns the number of heartbeats made to this class.
@ -112,10 +112,11 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
*/ */
@Override @Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport) sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport,
throws IOException { ReportState reportState) throws IOException {
rpcCount.incrementAndGet(); rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet(); heartbeatCount.incrementAndGet();
this.reportState = reportState;
sleepIfNeeded(); sleepIfNeeded();
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
cmdResponse = StorageContainerDatanodeProtocolProtos cmdResponse = StorageContainerDatanodeProtocolProtos
@ -153,4 +154,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
StorageContainerDatanodeProtocolProtos StorageContainerDatanodeProtocolProtos
.SCMRegisteredCmdResponseProto.ErrorCode.success).build(); .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
} }
public ReportState getReportState() {
return this.reportState;
}
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManage
import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After; import org.junit.After;
@ -48,6 +49,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
/** /**
* Tests the datanode state machine class and its states. * Tests the datanode state machine class and its states.
@ -65,6 +68,7 @@ public class TestDatanodeStateMachine {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf = SCMTestUtils.getConf(); conf = SCMTestUtils.getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, 500);
serverAddresses = new LinkedList<>(); serverAddresses = new LinkedList<>();
scmServers = new LinkedList<>(); scmServers = new LinkedList<>();
mockServers = new LinkedList<>(); mockServers = new LinkedList<>();
@ -194,9 +198,9 @@ public class TestDatanodeStateMachine {
// This execute will invoke getVersion calls against all SCM endpoints // This execute will invoke getVersion calls against all SCM endpoints
// that we know of. // that we know of.
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
task.execute(executorService);
newState = task.await(10, TimeUnit.SECONDS);
// If we are in running state, we should be in running. // If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState); newState);
@ -246,8 +250,14 @@ public class TestDatanodeStateMachine {
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState); newState);
for (ScmTestMock mock : mockServers) { for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount()); Assert.assertEquals(1, mock.getHeartbeatCount());
// Assert that heartbeat did indeed carry that State that we said
// have in the datanode.
Assert.assertEquals(mock.getReportState().getState().getNumber(),
StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports.getNumber());
} }
} }

View File

@ -20,7 +20,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine; .EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@ -30,16 +31,18 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
.RegisterEndpointTask; .RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask; .VersionEndpointTask;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -58,6 +61,9 @@ import java.net.InetSocketAddress;
import java.util.UUID; import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
/** /**
* Tests the endpoints. * Tests the endpoints.
@ -67,6 +73,27 @@ public class TestEndPoint {
private static RPC.Server scmServer; private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl; private static ScmTestMock scmServerImpl;
private static File testDir; private static File testDir;
private static StorageContainerDatanodeProtocolProtos.ReportState
defaultReportState;
@AfterClass
public static void tearDown() throws Exception {
if (scmServer != null) {
scmServer.stop();
}
FileUtil.fullyDelete(testDir);
}
@BeforeClass
public static void setUp() throws Exception {
serverAddress = SCMTestUtils.getReuseableAddress();
scmServerImpl = new ScmTestMock();
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState.
newBuilder().setState(noContainerReports).build();
}
@Test @Test
/** /**
@ -255,7 +282,7 @@ public class TestEndPoint {
srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build(); srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
nrb.addStorageReport(srb); nrb.addStorageReport(srb);
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode, nrb.build()); .sendHeartbeat(dataNode, nrb.build(), defaultReportState);
Assert.assertNotNull(responseProto); Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount()); Assert.assertEquals(1, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0)); Assert.assertNotNull(responseProto.getCommandsList().get(0));
@ -322,21 +349,4 @@ public class TestEndPoint {
scmServerImpl.setRpcResponseDelay(0); scmServerImpl.setRpcResponseDelay(0);
Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
} }
@AfterClass
public static void tearDown() throws Exception {
if (scmServer != null) {
scmServer.stop();
}
FileUtil.fullyDelete(testDir);
}
@BeforeClass
public static void setUp() throws Exception {
serverAddress = SCMTestUtils.getReuseableAddress();
scmServerImpl = new ScmTestMock();
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
}
} }