HDDS-163. Add Datanode heartbeat dispatcher in SCM.

Contributed by Nandakumar.
This commit is contained in:
Anu Engineer 2018-06-13 20:18:22 -07:00
parent 7547740e5c
commit ddd09d59f3
14 changed files with 875 additions and 77 deletions

View File

@ -50,7 +50,8 @@ private ReportManager(StateContext context,
List<ReportPublisher> publishers) {
this.context = context;
this.publishers = publishers;
this.executorService = HadoopExecutors.newScheduledThreadPool(1,
this.executorService = HadoopExecutors.newScheduledThreadPool(
publishers.size(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Datanode ReportManager Thread - %d").build());
}

View File

@ -69,7 +69,7 @@
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.server.report.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -114,6 +114,7 @@ public class SCMDatanodeProtocolServer implements
private final StorageContainerManager scm;
private final InetSocketAddress datanodeRpcAddress;
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
@ -148,16 +149,24 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
updateRPCListenAddress(
conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
datanodeRpcServer);
heartbeatDispatcher = SCMDatanodeHeartbeatDispatcher.newBuilder(conf, scm)
.addHandlerFor(NodeReportProto.class)
.addHandlerFor(ContainerReportsProto.class)
.build();
}
public void start() {
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
"RPC server for DataNodes", datanodeRpcAddress));
datanodeRpcServer.start();
}
public InetSocketAddress getDatanodeRpcAddress() {
return datanodeRpcAddress;
}
public RPC.Server getDatanodeRpcServer() {
return datanodeRpcServer;
}
@Override
public SCMVersionResponseProto getVersion(SCMVersionRequestProto
versionRequest)
@ -166,25 +175,6 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
.getProtobufMessage();
}
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
SCMHeartbeatRequestProto heartbeat)
throws IOException {
// TODO: Add a heartbeat dispatcher.
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(heartbeat.getDatanodeDetails());
NodeReportProto nodeReport = heartbeat.getNodeReport();
List<SCMCommand> commands =
scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
List<SCMCommandProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd));
}
return SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(datanodeDetails.getUuidString())
.addAllCommands(cmdResponses).build();
}
@Override
public SCMRegisteredResponseProto register(
HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
@ -216,36 +206,27 @@ public static SCMRegisteredResponseProto getRegisteredResponse(
.build();
}
public void processContainerReports(DatanodeDetails datanodeDetails,
ContainerReportsProto reports)
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
SCMHeartbeatRequestProto heartbeat)
throws IOException {
updateContainerReportMetrics(datanodeDetails, reports);
// should we process container reports async?
scm.getScmContainerManager()
.processContainerReports(datanodeDetails, reports);
}
heartbeatDispatcher.dispatch(heartbeat);
private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
ContainerReportsProto reports) {
ContainerStat newStat = new ContainerStat();
for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
.getReportsList()) {
newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
info.getReadCount(), info.getWriteCount()));
// TODO: Remove the below code after SCM refactoring.
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(heartbeat.getDatanodeDetails());
NodeReportProto nodeReport = heartbeat.getNodeReport();
List<SCMCommand> commands =
scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
List<SCMCommandProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd));
}
// update container metrics
StorageContainerManager.getMetrics().setLastContainerStat(newStat);
// Update container stat entry, this will trigger a removal operation if it
// exists in cache.
String datanodeUuid = datanodeDetails.getUuidString();
scm.getContainerReportCache().put(datanodeUuid, newStat);
// update global view container metrics
StorageContainerManager.getMetrics().incrContainerStat(newStat);
return SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(datanodeDetails.getUuidString())
.addAllCommands(cmdResponses).build();
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto acks) throws IOException {
@ -271,28 +252,6 @@ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
.getDefaultInstanceForType();
}
public void start() {
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
"RPC server for DataNodes", getDatanodeRpcAddress()));
getDatanodeRpcServer().start();
}
public void stop() {
try {
LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop();
} catch (Exception ex) {
LOG.error(" datanodeRpcServer stop failed.", ex);
}
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
public void join() throws InterruptedException {
LOG.trace("Join RPC server for DataNodes");
datanodeRpcServer.join();
}
/**
* Returns a SCMCommandRepose from the SCM Command.
*
@ -338,4 +297,22 @@ public SCMCommandProto getCommandResponse(SCMCommand cmd)
throw new IllegalArgumentException("Not implemented");
}
}
public void join() throws InterruptedException {
LOG.trace("Join RPC server for DataNodes");
datanodeRpcServer.join();
}
public void stop() {
try {
LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop();
heartbeatDispatcher.shutdown();
} catch (Exception ex) {
LOG.error(" datanodeRpcServer stop failed.", ex);
}
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handler for Datanode Container Report.
*/
public class SCMDatanodeContainerReportHandler extends
SCMDatanodeReportHandler<ContainerReportsProto> {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeContainerReportHandler.class);
@Override
public void processReport(DatanodeDetails datanodeDetails,
ContainerReportsProto report) throws IOException {
LOG.trace("Processing container report from {}.", datanodeDetails);
updateContainerReportMetrics(datanodeDetails, report);
getSCM().getScmContainerManager()
.processContainerReports(datanodeDetails, report);
}
/**
* Updates container report metrics in SCM.
*
* @param datanodeDetails Datanode Information
* @param reports Container Reports
*/
private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
ContainerReportsProto reports) {
ContainerStat newStat = new ContainerStat();
for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
.getReportsList()) {
newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
info.getReadCount(), info.getWriteCount()));
}
// update container metrics
StorageContainerManager.getMetrics().setLastContainerStat(newStat);
// Update container stat entry, this will trigger a removal operation if it
// exists in cache.
String datanodeUuid = datanodeDetails.getUuidString();
getSCM().getContainerReportCache().put(datanodeUuid, newStat);
// update global view container metrics
StorageContainerManager.getMetrics().incrContainerStat(newStat);
}
}

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate ReportHandlers at SCM.
* Only one handler per report is supported now, it's very easy to support
* multiple handlers for a report.
*/
public final class SCMDatanodeHeartbeatDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeHeartbeatDispatcher.class);
/**
* This stores Report to Handler mapping.
*/
private final Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers;
/**
* Executor service which will be used for processing reports.
*/
private final ExecutorService executorService;
/**
* Constructs SCMDatanodeHeartbeatDispatcher instance with the given
* handlers.
*
* @param handlers report to report handler mapping
*/
private SCMDatanodeHeartbeatDispatcher(Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers) {
this.handlers = handlers;
this.executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCMDatanode Heartbeat Dispatcher Thread - %d")
.build());
}
/**
* Dispatches heartbeat to registered handlers.
*
* @param heartbeat heartbeat to be dispatched.
*/
public void dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(heartbeat.getDatanodeDetails());
if (heartbeat.hasNodeReport()) {
processReport(datanodeDetails, heartbeat.getNodeReport());
}
if (heartbeat.hasContainerReport()) {
processReport(datanodeDetails, heartbeat.getContainerReport());
}
}
/**
* Invokes appropriate ReportHandler and submits the task to executor
* service for processing.
*
* @param datanodeDetails Datanode Information
* @param report Report to be processed
*/
@SuppressWarnings("unchecked")
private void processReport(DatanodeDetails datanodeDetails,
GeneratedMessage report) {
executorService.submit(() -> {
try {
SCMDatanodeReportHandler handler = handlers.get(report.getClass());
handler.processReport(datanodeDetails, report);
} catch (IOException ex) {
LOG.error("Exception wile processing report {}, from {}",
report.getClass(), datanodeDetails, ex);
}
});
}
/**
* Shuts down SCMDatanodeHeartbeatDispatcher.
*/
public void shutdown() {
executorService.shutdown();
}
/**
* Returns a new Builder to construct {@link SCMDatanodeHeartbeatDispatcher}.
*
* @param conf Configuration to be used by SCMDatanodeHeartbeatDispatcher
* @param scm {@link StorageContainerManager} instance to be used by report
* handlers
*
* @return {@link SCMDatanodeHeartbeatDispatcher.Builder} instance
*/
public static Builder newBuilder(Configuration conf,
StorageContainerManager scm) {
return new Builder(conf, scm);
}
/**
* Builder for SCMDatanodeHeartbeatDispatcher.
*/
public static class Builder {
private final SCMDatanodeReportHandlerFactory reportHandlerFactory;
private final Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> report2handler;
/**
* Constructs SCMDatanodeHeartbeatDispatcher.Builder instance.
*
* @param conf Configuration object to be used.
* @param scm StorageContainerManager instance to be used for report
* handler initialization.
*/
private Builder(Configuration conf, StorageContainerManager scm) {
this.report2handler = new HashMap<>();
this.reportHandlerFactory =
new SCMDatanodeReportHandlerFactory(conf, scm);
}
/**
* Adds new report handler for the given report.
*
* @param report Report for which handler has to be added
*
* @return Builder
*/
public Builder addHandlerFor(Class<? extends GeneratedMessage> report) {
report2handler.put(report, reportHandlerFactory.getHandlerFor(report));
return this;
}
/**
* Associates the given report handler for the given report.
*
* @param report Report to be associated with
* @param handler Handler to be used for the report
*
* @return Builder
*/
public Builder addHandler(Class<? extends GeneratedMessage> report,
SCMDatanodeReportHandler<? extends GeneratedMessage> handler) {
report2handler.put(report, handler);
return this;
}
/**
* Builds and returns {@link SCMDatanodeHeartbeatDispatcher} instance.
*
* @return SCMDatanodeHeartbeatDispatcher
*/
public SCMDatanodeHeartbeatDispatcher build() {
return new SCMDatanodeHeartbeatDispatcher(report2handler);
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handles Datanode Node Report.
*/
public class SCMDatanodeNodeReportHandler extends
SCMDatanodeReportHandler<NodeReportProto> {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeNodeReportHandler.class);
@Override
public void processReport(DatanodeDetails datanodeDetails,
NodeReportProto report) throws IOException {
LOG.debug("Processing node report from {}.", datanodeDetails);
//TODO: add logic to process node report.
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import java.io.IOException;
/**
* Datanode Report handlers should implement this interface in order to get
* call back whenever the report is received from datanode.
*
* @param <T> Type of report the handler is interested in.
*/
public abstract class SCMDatanodeReportHandler<T extends GeneratedMessage>
implements Configurable {
private Configuration config;
private StorageContainerManager scm;
/**
* Initializes SCMDatanodeReportHandler and associates it with the given
* StorageContainerManager instance.
*
* @param storageContainerManager StorageContainerManager instance to be
* associated with.
*/
public void init(StorageContainerManager storageContainerManager) {
this.scm = storageContainerManager;
}
/**
* Returns the associated StorageContainerManager instance. This will be
* used by the ReportHandler implementations.
*
* @return {@link StorageContainerManager}
*/
protected StorageContainerManager getSCM() {
return scm;
}
@Override
public void setConf(Configuration conf) {
this.config = conf;
}
@Override
public Configuration getConf() {
return config;
}
/**
* Processes the report received from datanode. Each ReportHandler
* implementation is responsible for providing the logic to process the
* report it's interested in.
*
* @param datanodeDetails Datanode Information
* @param report Report to be processed
*
* @throws IOException In case of any exception
*/
abstract void processReport(DatanodeDetails datanodeDetails, T report)
throws IOException;
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.HashMap;
import java.util.Map;
/**
* Factory class to construct {@link SCMDatanodeReportHandler} given a report.
*/
public class SCMDatanodeReportHandlerFactory {
private final Configuration conf;
private final StorageContainerManager scm;
private final Map<Class<? extends GeneratedMessage>,
Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>>
report2handler;
/**
* Constructs {@link SCMDatanodeReportHandler} instance.
*
* @param conf Configuration to be passed to the
* {@link SCMDatanodeReportHandler}
*/
public SCMDatanodeReportHandlerFactory(Configuration conf,
StorageContainerManager scm) {
this.conf = conf;
this.scm = scm;
this.report2handler = new HashMap<>();
report2handler.put(NodeReportProto.class,
SCMDatanodeNodeReportHandler.class);
report2handler.put(ContainerReportsProto.class,
SCMDatanodeContainerReportHandler.class);
}
/**
* Returns the SCMDatanodeReportHandler for the corresponding report.
*
* @param report report
*
* @return report handler
*/
public SCMDatanodeReportHandler<? extends GeneratedMessage> getHandlerFor(
Class<? extends GeneratedMessage> report) {
Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>
handlerClass = report2handler.get(report);
if (handlerClass == null) {
throw new RuntimeException("No handler found for report " + report);
}
SCMDatanodeReportHandler<? extends GeneratedMessage> instance =
ReflectionUtils.newInstance(handlerClass, conf);
instance.init(scm);
return instance;
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
/**
* Handling of all the datanode reports in SCM which are received through
* heartbeat is done here.
*
* SCM Datanode Report Processing State Diagram:
*
* SCMDatanode SCMDatanodeHeartbeat SCMDatanodeReport
* ProtocolServer Dispatcher Handler
* | | |
* | | |
* | construct | |
* |----------------------->| |
* | | |
* | | register |
* | |<-----------------------|
* | | |
* +------------+------------------------+------------------------+--------+
* | loop | | | |
* | | | | |
* | | | | |
* | heartbeat | | | |
* - +----------->| | | |
* | from | heartbeat | | |
* | Datanode |----------------------->| | |
* | | | report | |
* | | |----------------------->| |
* | | | | |
* | DN | | | |
* <-+------------| | | |
* | commands | | | |
* | | | | |
* +------------+------------------------+------------------------+--------+
* | | |
* | | |
* | shutdown | |
* |----------------------->| |
* | | |
* | | |
* - - -
*/

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify SCMDatanodeContainerReportHandler's behavior.
*/
public class TestSCMDatanodeContainerReportHandler {
//TODO: add test cases to verify SCMDatanodeContainerReportHandler.
@Test
public void dummyTest() {
Assert.assertTrue(true);
}
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
*/
public class TestSCMDatanodeHeartbeatDispatcher {
@Test
public void testSCMDatanodeHeartbeatDispatcherBuilder() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandlerFor(NodeReportProto.class)
.addHandlerFor(ContainerReportsProto.class)
.build();
Assert.assertNotNull(dispatcher);
}
@Test
public void testNodeReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeNodeReportHandler nodeReportHandler =
Mockito.mock(SCMDatanodeNodeReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(NodeReportProto.class, nodeReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setNodeReport(nodeReport)
.build();
dispatcher.dispatch(heartbeat);
verify(nodeReportHandler,
times(1))
.processReport(any(DatanodeDetails.class), eq(nodeReport));
}
@Test
public void testContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeContainerReportHandler containerReportHandler =
Mockito.mock(SCMDatanodeContainerReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(ContainerReportsProto.class, containerReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
.build();
dispatcher.dispatch(heartbeat);
verify(containerReportHandler,
times(1))
.processReport(any(DatanodeDetails.class),
any(ContainerReportsProto.class));
}
@Test
public void testNodeAndContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeNodeReportHandler nodeReportHandler =
Mockito.mock(SCMDatanodeNodeReportHandler.class);
SCMDatanodeContainerReportHandler containerReportHandler =
Mockito.mock(SCMDatanodeContainerReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(NodeReportProto.class, nodeReportHandler)
.addHandler(ContainerReportsProto.class, containerReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setNodeReport(nodeReport)
.setContainerReport(containerReport)
.build();
dispatcher.dispatch(heartbeat);
verify(nodeReportHandler,
times(1))
.processReport(any(DatanodeDetails.class), any(NodeReportProto.class));
verify(containerReportHandler,
times(1))
.processReport(any(DatanodeDetails.class),
any(ContainerReportsProto.class));
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify TestSCMDatanodeNodeReportHandler's behavior.
*/
public class TestSCMDatanodeNodeReportHandler {
//TODO: add test cases to verify SCMDatanodeNodeReportHandler.
@Test
public void dummyTest() {
Assert.assertTrue(true);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify the functionality of SCMDatanodeReportHandlerFactory.
*/
public class TestSCMDatanodeReportHandlerFactory {
@Test
public void testNodeReportHandlerConstruction() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeReportHandlerFactory factory =
new SCMDatanodeReportHandlerFactory(conf, null);
Assert.assertTrue(factory.getHandlerFor(NodeReportProto.class)
instanceof SCMDatanodeNodeReportHandler);
}
@Test
public void testContainerReporttHandlerConstruction() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeReportHandlerFactory factory =
new SCMDatanodeReportHandlerFactory(conf, null);
Assert.assertTrue(factory.getHandlerFor(ContainerReportsProto.class)
instanceof SCMDatanodeContainerReportHandler);
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
/**
* Contains test-cases to test Datanode report handlers in SCM.
*/

View File

@ -28,6 +28,8 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.report
.SCMDatanodeContainerReportHandler;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
@ -80,7 +82,11 @@ public void testContainerMetrics() throws Exception {
DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails();
ContainerReportsProto request = createContainerReport(numReport, stat);
String fstDatanodeUuid = fstDatanodeDetails.getUuidString();
scmManager.getDatanodeProtocolServer().processContainerReports(
SCMDatanodeContainerReportHandler containerReportHandler =
new SCMDatanodeContainerReportHandler();
containerReportHandler.setConf(conf);
containerReportHandler.init(scmManager);
containerReportHandler.processReport(
fstDatanodeDetails, request);
// verify container stat metrics
@ -105,7 +111,7 @@ public void testContainerMetrics() throws Exception {
DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails();
request = createContainerReport(1, stat);
String sndDatanodeUuid = sndDatanodeDetails.getUuidString();
scmManager.getDatanodeProtocolServer().processContainerReports(
containerReportHandler.processReport(
sndDatanodeDetails, request);
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@ -128,11 +134,11 @@ public void testContainerMetrics() throws Exception {
// Re-send reports but with different value for validating
// the aggregation.
stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
scmManager.getDatanodeProtocolServer().processContainerReports(
containerReportHandler.processReport(
fstDatanodeDetails, createContainerReport(1, stat));
stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
scmManager.getDatanodeProtocolServer().processContainerReports(
containerReportHandler.processReport(
sndDatanodeDetails, createContainerReport(1, stat));
// the global container metrics value should be updated
@ -176,8 +182,12 @@ public void testStaleNodeContainerReport() throws Exception {
DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
.getDatanodeDetails();
SCMDatanodeContainerReportHandler containerReportHandler =
new SCMDatanodeContainerReportHandler();
containerReportHandler.setConf(conf);
containerReportHandler.init(scmManager);
ContainerReportsProto request = createContainerReport(numReport, stat);
scmManager.getDatanodeProtocolServer().processContainerReports(
containerReportHandler.processReport(
datanodeDetails, request);
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);