From 347c9550135ea10fd84d5007124452bf5f2d6619 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Fri, 20 Jul 2018 14:37:13 -0700 Subject: [PATCH] HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar. --- .../apache/hadoop/hdds/HddsConfigKeys.java | 6 + .../src/main/resources/ozone-default.xml | 10 + .../common/statemachine/StateContext.java | 55 +++- .../endpoint/HeartbeatEndpointTask.java | 33 +- .../StorageContainerDatanodeProtocol.proto | 4 +- .../common/report/TestReportPublisher.java | 41 --- .../endpoint/TestHeartbeatEndpointTask.java | 300 ++++++++++++++++++ .../common/states/endpoint/package-info.java | 18 ++ 8 files changed, 414 insertions(+), 53 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 0283615a197..fd4bf08c691 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -48,4 +48,10 @@ private HddsConfigKeys() { "hdds.command.status.report.interval"; public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT = "60s"; + + public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT = + "hdds.container.action.max.limit"; + public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT = + 20; + } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 69a382a0137..84a3e0c230f 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1098,4 +1098,14 @@ + + hdds.container.action.max.limit + 20 + DATANODE + + Maximum number of Container Actions sent by the datanode to SCM in a + single heartbeat. + + + \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index faaff69b8d7..7862cc62368 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -20,14 +20,18 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .RunningDatanodeState; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus + .CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -59,6 +64,7 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private final Queue reports; + private final Queue containerActions; private DatanodeStateMachine.DatanodeStates state; /** @@ -76,6 +82,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates commandQueue = new LinkedList<>(); cmdStatusMap = new ConcurrentHashMap<>(); reports = new LinkedList<>(); + containerActions = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); } @@ -187,15 +194,45 @@ public List getAllAvailableReports() { * @return List */ public List getReports(int maxLimit) { - List results = new ArrayList<>(); synchronized (reports) { - GeneratedMessage report = reports.poll(); - while(results.size() < maxLimit && report != null) { - results.add(report); - report = reports.poll(); - } + return reports.parallelStream().limit(maxLimit) + .collect(Collectors.toList()); + } + } + + + /** + * Adds the ContainerAction to ContainerAction queue. + * + * @param containerAction ContainerAction to be added + */ + public void addContainerAction(ContainerAction containerAction) { + synchronized (containerActions) { + containerActions.add(containerAction); + } + } + + /** + * Returns all the pending ContainerActions from the ContainerAction queue, + * or empty list if the queue is empty. + * + * @return List + */ + public List getAllPendingContainerActions() { + return getPendingContainerAction(Integer.MAX_VALUE); + } + + /** + * Returns pending ContainerActions from the ContainerAction queue with a + * max limit on list size, or empty list if the queue is empty. + * + * @return List + */ + public List getPendingContainerAction(int maxLimit) { + synchronized (containerActions) { + return containerActions.parallelStream().limit(maxLimit) + .collect(Collectors.toList()); } - return results; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 260a245ceb3..020fb714263 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -24,6 +24,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -46,8 +50,14 @@ import java.io.IOException; import java.time.ZonedDateTime; +import java.util.List; import java.util.concurrent.Callable; +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_CONTAINER_ACTION_MAX_LIMIT; +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT; + /** * Heartbeat class for SCMs. */ @@ -59,6 +69,7 @@ public class HeartbeatEndpointTask private final Configuration conf; private DatanodeDetailsProto datanodeDetailsProto; private StateContext context; + private int maxContainerActionsPerHB; /** * Constructs a SCM heart beat. @@ -70,6 +81,8 @@ public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, this.rpcEndpoint = rpcEndpoint; this.conf = conf; this.context = context; + this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT, + HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT); } /** @@ -107,7 +120,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception { SCMHeartbeatRequestProto.newBuilder() .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); - + addContainerActions(requestBuilder); SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() .sendHeartbeat(requestBuilder.build()); processResponse(reponse, datanodeDetailsProto); @@ -139,6 +152,24 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { } } + /** + * Adds all the pending ContainerActions to the heartbeat. + * + * @param requestBuilder builder to which the report has to be added. + */ + private void addContainerActions( + SCMHeartbeatRequestProto.Builder requestBuilder) { + List actions = context.getPendingContainerAction( + maxContainerActionsPerHB); + if (!actions.isEmpty()) { + ContainerActionsProto cap = ContainerActionsProto.newBuilder() + .addAllContainerActions(actions) + .build(); + requestBuilder.setContainerActions(cap); + } + } + + /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 4238389a20e..d89567b5d90 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional ContainerActionsProto containerActions = 4; - optional CommandStatusReportsProto commandStatusReport = 5; + optional CommandStatusReportsProto commandStatusReport = 4; + optional ContainerActionsProto containerActions = 5; } /* diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index a0db2e8aa6c..811599f01a8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.common.report; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Descriptors; import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -178,22 +171,6 @@ public void testCommandStatusPublisher() throws InterruptedException { executorService.shutdown(); } - @Test - public void testAddingReportToHeartbeat() { - GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance(); - GeneratedMessage containerReport = ContainerReportsProto - .getDefaultInstance(); - SCMHeartbeatRequestProto.Builder heartbeatBuilder = - SCMHeartbeatRequestProto.newBuilder(); - heartbeatBuilder.setDatanodeDetails( - getDatanodeDetails().getProtoBufMessage()); - addReport(heartbeatBuilder, nodeReport); - addReport(heartbeatBuilder, containerReport); - SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build(); - Assert.assertTrue(heartbeat.hasNodeReport()); - Assert.assertTrue(heartbeat.hasContainerReport()); - } - /** * Get a datanode details. * @@ -222,22 +199,4 @@ private static DatanodeDetails getDatanodeDetails() { return builder.build(); } - /** - * Adds the report to heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - * @param report the report to be added. - */ - private static void addReport(SCMHeartbeatRequestProto.Builder - requestBuilder, GeneratedMessage report) { - String reportName = report.getDescriptorForType().getFullName(); - for (Descriptors.FieldDescriptor descriptor : - SCMHeartbeatRequestProto.getDescriptor().getFields()) { - String heartbeatFieldName = descriptor.getMessageType().getFullName(); - if (heartbeatFieldName.equals(reportName)) { - requestBuilder.setField(descriptor, report); - } - } - } - } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java new file mode 100644 index 00000000000..b4d718d4ff6 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine.DatanodeStates; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.UUID; + +/** + * This class tests the functionality of HeartbeatEndpointTask. + */ +public class TestHeartbeatEndpointTask { + + + @Test + public void testheartbeatWithoutReports() throws Exception { + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithNodeReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(NodeReportProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertTrue(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithContainerReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(ContainerReportsProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertTrue(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithCommandStatusReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(CommandStatusReportsProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithContainerActions() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addContainerAction(getContainerAction()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithAllReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(NodeReportProto.getDefaultInstance()); + context.addReport(ContainerReportsProto.getDefaultInstance()); + context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.addContainerAction(getContainerAction()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertTrue(heartbeat.hasNodeReport()); + Assert.assertTrue(heartbeat.hasContainerReport()); + Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.hasContainerActions()); + } + + /** + * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy. + * + * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB + * + * @return HeartbeatEndpointTask + */ + private HeartbeatEndpointTask getHeartbeatEndpointTask( + StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + return getHeartbeatEndpointTask(conf, context, proxy); + + } + + /** + * Creates HeartbeatEndpointTask with the given conf, context and + * StorageContainerManager client side proxy. + * + * @param conf Configuration + * @param context StateContext + * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB + * + * @return HeartbeatEndpointTask + */ + private HeartbeatEndpointTask getHeartbeatEndpointTask( + Configuration conf, + StateContext context, + StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { + DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder() + .setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress("127.0.0.1") + .build(); + EndpointStateMachine endpointStateMachine = Mockito + .mock(EndpointStateMachine.class); + Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy); + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setDatanodeDetails(datanodeDetails) + .setContext(context) + .setEndpointStateMachine(endpointStateMachine) + .build(); + } + + private ContainerAction getContainerAction() { + ContainerAction.Builder builder = ContainerAction.newBuilder(); + ContainerInfo containerInfo = ContainerInfo.newBuilder() + .setContainerID(1L) + .build(); + builder.setContainer(containerInfo) + .setAction(ContainerAction.Action.CLOSE) + .setReason(ContainerAction.Reason.CONTAINER_FULL); + return builder.build(); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java new file mode 100644 index 00000000000..d120a5cd4b7 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint;