From 6837121a43231f854b0b22ad20330012439313ce Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Fri, 20 Jul 2018 13:03:25 -0700 Subject: [PATCH] HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh. --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 + .../container/common/helpers/Pipeline.java | 7 + .../src/main/resources/ozone-default.xml | 12 + .../common/statemachine/StateContext.java | 52 ++- .../endpoint/HeartbeatEndpointTask.java | 24 +- .../StorageContainerDatanodeProtocol.proto | 4 +- .../common/report/TestReportPublisher.java | 41 --- .../endpoint/TestHeartbeatEndpointTask.java | 302 ++++++++++++++++++ .../common/states/endpoint/package-info.java | 18 ++ .../hdds/scm/container/ContainerMapping.java | 4 + .../hdds/scm/exceptions/SCMException.java | 1 + .../hdds/scm/pipelines/PipelineManager.java | 64 ++-- .../hdds/scm/pipelines/PipelineSelector.java | 212 ++++++++++-- .../scm/pipelines/ratis/RatisManagerImpl.java | 33 +- .../standalone/StandaloneManagerImpl.java | 21 +- .../scm/pipeline/TestNode2PipelineMap.java | 14 + 16 files changed, 668 insertions(+), 146 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 71184cf8904..6e940adbc3e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -236,6 +236,11 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; + public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT = + "ozone.scm.pipeline.creation.lease.timeout"; + + public static final String + OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = "ozone.scm.block.deletion.max.retry"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index c5794f4c036..534c9fd5419 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -213,6 +213,13 @@ public class Pipeline { return lifeCycleState; } + /** + * Update the State of the pipeline. + */ + public void setLifeCycleState(HddsProtos.LifeCycleState nextState) { + lifeCycleState = nextState; + } + /** * Gets the pipeline Name. * diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 5a1d26a8dde..69a382a0137 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1085,5 +1085,17 @@ executed since last report. Unit could be defined with postfix (ns,ms,s,m,h,d) + + ozone.scm.pipeline.creation.lease.timeout + 60s + OZONE, SCM, PIPELINE + + Pipeline creation timeout in milliseconds to be used by SCM. When + BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to + CREATING state, SCM will now wait for the configured amount of time + to get COMPLETE_CREATE event if it doesn't receive it will move the + pipeline to DELETING. + + \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index faaff69b8d7..4951f2ab7ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -20,14 +20,18 @@ import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .RunningDatanodeState; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus + .CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +47,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -59,6 +64,7 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private final Queue reports; + private final Queue containerActions; private DatanodeStateMachine.DatanodeStates state; /** @@ -76,6 +82,7 @@ public class StateContext { commandQueue = new LinkedList<>(); cmdStatusMap = new ConcurrentHashMap<>(); reports = new LinkedList<>(); + containerActions = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); } @@ -198,6 +205,47 @@ public class StateContext { return results; } + + /** + * Adds the ContainerAction to ContainerAction queue. + * + * @param containerAction ContainerAction to be added + */ + public void addContainerAction(ContainerAction containerAction) { + synchronized (containerActions) { + containerActions.add(containerAction); + } + } + + /** + * Returns all the pending ContainerActions from the ContainerAction queue, + * or empty list if the queue is empty. + * + * @return List + */ + public List getAllPendingContainerActions() { + return getPendingContainerAction(Integer.MAX_VALUE); + } + + /** + * Returns pending ContainerActions from the ContainerAction queue with a + * max limit on list size, or empty list if the queue is empty. + * + * @return List + */ + public List getPendingContainerAction(int maxLimit) { + List results = new ArrayList<>(); + synchronized (containerActions) { + containerActions.parallelStream().limit(maxLimit).collect(Collectors.toList()); + ContainerAction action = containerActions.poll(); + while(results.size() < maxLimit && action != null) { + results.add(action); + action = containerActions.poll(); + } + } + return results; + } + /** * Returns the next task to get executed by the datanode state machine. * @return A callable that will be executed by the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 260a245ceb3..214e1cd64eb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -24,6 +24,10 @@ import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.ZonedDateTime; +import java.util.List; import java.util.concurrent.Callable; /** @@ -107,7 +112,7 @@ public class HeartbeatEndpointTask SCMHeartbeatRequestProto.newBuilder() .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); - + addContainerActions(requestBuilder); SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() .sendHeartbeat(requestBuilder.build()); processResponse(reponse, datanodeDetailsProto); @@ -139,6 +144,23 @@ public class HeartbeatEndpointTask } } + /** + * Adds all the pending ContainerActions to the heartbeat. + * + * @param requestBuilder builder to which the report has to be added. + */ + private void addContainerActions( + SCMHeartbeatRequestProto.Builder requestBuilder) { + List actions = context.getAllPendingContainerActions(); + if (!actions.isEmpty()) { + ContainerActionsProto cap = ContainerActionsProto.newBuilder() + .addAllContainerActions(actions) + .build(); + requestBuilder.setContainerActions(cap); + } + } + + /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 4238389a20e..d89567b5d90 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional ContainerActionsProto containerActions = 4; - optional CommandStatusReportsProto commandStatusReport = 5; + optional CommandStatusReportsProto commandStatusReport = 4; + optional ContainerActionsProto containerActions = 5; } /* diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index a0db2e8aa6c..811599f01a8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.common.report; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Descriptors; import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -178,22 +171,6 @@ public class TestReportPublisher { executorService.shutdown(); } - @Test - public void testAddingReportToHeartbeat() { - GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance(); - GeneratedMessage containerReport = ContainerReportsProto - .getDefaultInstance(); - SCMHeartbeatRequestProto.Builder heartbeatBuilder = - SCMHeartbeatRequestProto.newBuilder(); - heartbeatBuilder.setDatanodeDetails( - getDatanodeDetails().getProtoBufMessage()); - addReport(heartbeatBuilder, nodeReport); - addReport(heartbeatBuilder, containerReport); - SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build(); - Assert.assertTrue(heartbeat.hasNodeReport()); - Assert.assertTrue(heartbeat.hasContainerReport()); - } - /** * Get a datanode details. * @@ -222,22 +199,4 @@ public class TestReportPublisher { return builder.build(); } - /** - * Adds the report to heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - * @param report the report to be added. - */ - private static void addReport(SCMHeartbeatRequestProto.Builder - requestBuilder, GeneratedMessage report) { - String reportName = report.getDescriptorForType().getFullName(); - for (Descriptors.FieldDescriptor descriptor : - SCMHeartbeatRequestProto.getDescriptor().getFields()) { - String heartbeatFieldName = descriptor.getMessageType().getFullName(); - if (heartbeatFieldName.equals(reportName)) { - requestBuilder.setField(descriptor, report); - } - } - } - } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java new file mode 100644 index 00000000000..87bd811991d --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine.DatanodeStates; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; + +/** + * This class tests the functionality of HeartbeatEndpointTask. + */ +public class TestHeartbeatEndpointTask { + + + @Test + public void testheartbeatWithoutReports() throws Exception { + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithNodeReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(NodeReportProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertTrue(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithContainerReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(ContainerReportsProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertTrue(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithCommandStatusReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(CommandStatusReportsProto.getDefaultInstance()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertFalse(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithContainerActions() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addContainerAction(getContainerAction()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertFalse(heartbeat.hasNodeReport()); + Assert.assertFalse(heartbeat.hasContainerReport()); + Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.hasContainerActions()); + } + + @Test + public void testheartbeatWithAllReports() throws Exception { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + + StorageContainerDatanodeProtocolClientSideTranslatorPB scm = + Mockito.mock( + StorageContainerDatanodeProtocolClientSideTranslatorPB.class); + ArgumentCaptor argument = ArgumentCaptor + .forClass(SCMHeartbeatRequestProto.class); + Mockito.when(scm.sendHeartbeat(argument.capture())) + .thenAnswer(invocation -> + SCMHeartbeatResponseProto.newBuilder() + .setDatanodeUUID( + ((SCMHeartbeatRequestProto)invocation.getArgument(0)) + .getDatanodeDetails().getUuid()) + .build()); + + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( + conf, context, scm); + context.addReport(NodeReportProto.getDefaultInstance()); + context.addReport(ContainerReportsProto.getDefaultInstance()); + context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.addContainerAction(getContainerAction()); + endpointTask.call(); + SCMHeartbeatRequestProto heartbeat = argument.getValue(); + Assert.assertTrue(heartbeat.hasDatanodeDetails()); + Assert.assertTrue(heartbeat.hasNodeReport()); + Assert.assertTrue(heartbeat.hasContainerReport()); + Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.hasContainerActions()); + } + + /** + * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy. + * + * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB + * + * @return HeartbeatEndpointTask + */ + private HeartbeatEndpointTask getHeartbeatEndpointTask( + StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { + Configuration conf = new OzoneConfiguration(); + StateContext context = new StateContext(conf, DatanodeStates.RUNNING, + Mockito.mock(DatanodeStateMachine.class)); + return getHeartbeatEndpointTask(conf, context, proxy); + + } + + /** + * Creates HeartbeatEndpointTask with the given conf, context and + * StorageContainerManager client side proxy. + * + * @param conf Configuration + * @param context StateContext + * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB + * + * @return HeartbeatEndpointTask + */ + private HeartbeatEndpointTask getHeartbeatEndpointTask( + Configuration conf, + StateContext context, + StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { + DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder() + .setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress("127.0.0.1") + .build(); + EndpointStateMachine endpointStateMachine = Mockito + .mock(EndpointStateMachine.class); + Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy); + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setDatanodeDetails(datanodeDetails) + .setContext(context) + .setEndpointStateMachine(endpointStateMachine) + .build(); + } + + private ContainerAction getContainerAction() { + ContainerAction.Builder builder = ContainerAction.newBuilder(); + ContainerInfo containerInfo = ContainerInfo.newBuilder() + .setContainerID(1L) + .build(); + builder.setContainer(containerInfo) + .setAction(ContainerAction.Action.CLOSE) + .setReason(ContainerAction.Reason.CONTAINER_FULL); + return builder.build(); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java new file mode 100644 index 00000000000..d120a5cd4b7 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 26f4d868f7d..f07d22baafd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -658,6 +658,10 @@ public class ContainerMapping implements Mapping { if (containerStore != null) { containerStore.close(); } + + if (pipelineSelector != null) { + pipelineSelector.shutdown(); + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index d7d70ef98ce..00855426eaa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -107,6 +107,7 @@ public class SCMException extends IOException { FAILED_TO_LOAD_OPEN_CONTAINER, FAILED_TO_ALLOCATE_CONTAINER, FAILED_TO_CHANGE_CONTAINER_STATE, + FAILED_TO_CHANGE_PIPELINE_STATE, CONTAINER_EXISTS, FAILED_TO_FIND_CONTAINER, FAILED_TO_FIND_CONTAINER_WITH_SPACE, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index a041973f93b..77d8211699b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -59,41 +59,16 @@ public abstract class PipelineManager { * @return a Pipeline. */ public synchronized final Pipeline getPipeline( - ReplicationFactor replicationFactor, ReplicationType replicationType) - throws IOException { - /** - * In the Ozone world, we have a very simple policy. - * - * 1. Try to create a pipeline if there are enough free nodes. - * - * 2. This allows all nodes to part of a pipeline quickly. - * - * 3. if there are not enough free nodes, return pipeline in a - * round-robin fashion. - * - * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns pipelines in round robin - * fashion. - */ - Pipeline pipeline = allocatePipeline(replicationFactor); + ReplicationFactor replicationFactor, ReplicationType replicationType) { + Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor); if (pipeline != null) { - LOG.debug("created new pipeline:{} for container with " + + LOG.debug("re-used pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", pipeline.getPipelineName(), replicationType, replicationFactor); - activePipelines.add(pipeline); - activePipelineMap.put(pipeline.getPipelineName(), pipeline); - node2PipelineMap.addPipeline(pipeline); - } else { - pipeline = findOpenPipeline(replicationType, replicationFactor); - if (pipeline != null) { - LOG.debug("re-used pipeline:{} for container with " + - "replicationType:{} replicationFactor:{}", - pipeline.getPipelineName(), replicationType, replicationFactor); - } } if (pipeline == null) { LOG.error("Get pipeline call failed. We are not able to find" + - "free nodes or operational pipeline."); + " operational pipeline."); return null; } else { return pipeline; @@ -109,7 +84,7 @@ public abstract class PipelineManager { public synchronized final Pipeline getPipeline(String pipelineName) { Pipeline pipeline = null; - // 1. Check if pipeline channel already exists + // 1. Check if pipeline already exists if (activePipelineMap.containsKey(pipelineName)) { pipeline = activePipelineMap.get(pipelineName); LOG.debug("Returning pipeline for pipelineName:{}", pipelineName); @@ -132,7 +107,13 @@ public abstract class PipelineManager { } public abstract Pipeline allocatePipeline( - ReplicationFactor replicationFactor) throws IOException; + ReplicationFactor replicationFactor); + + /** + * Initialize the pipeline + * TODO: move the initialization to Ozone Client later + */ + public abstract void initializePipeline(Pipeline pipeline) throws IOException; public void removePipeline(Pipeline pipeline) { activePipelines.remove(pipeline); @@ -179,12 +160,23 @@ public abstract class PipelineManager { } /** - * Creates a pipeline from a specified set of Nodes. - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. + * Creates a pipeline with a specified replication factor and type. + * @param replicationFactor - Replication Factor. + * @param replicationType - Replication Type. */ - public abstract void createPipeline(String pipelineID, - List datanodes) throws IOException; + public Pipeline createPipeline(ReplicationFactor replicationFactor, + ReplicationType replicationType) throws IOException { + Pipeline pipeline = allocatePipeline(replicationFactor); + if (pipeline != null) { + LOG.debug("created new pipeline:{} for container with " + + "replicationType:{} replicationFactor:{}", + pipeline.getPipelineName(), replicationType, replicationFactor); + activePipelines.add(pipeline); + activePipelineMap.put(pipeline.getPipelineName(), pipeline); + node2PipelineMap.addPipeline(pipeline); + } + return pipeline; + } /** * Close the pipeline with the given clusterId. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 2955af58fbe..08710e7937a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms .SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; @@ -33,17 +34,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.statemachine + .InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.apache.hadoop.ozone.lease.Lease; +import org.apache.hadoop.ozone.lease.LeaseException; +import org.apache.hadoop.ozone.lease.LeaseManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_PIPELINE_STATE; + /** * Sends the request to the right pipeline manager. */ @@ -57,6 +69,10 @@ public class PipelineSelector { private final StandaloneManagerImpl standaloneManager; private final long containerSize; private final Node2PipelineMap node2PipelineMap; + private final LeaseManager pipelineLeaseManager; + private final StateMachine stateMachine; + /** * Constructs a pipeline Selector. * @@ -77,6 +93,74 @@ public class PipelineSelector { this.ratisManager = new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, conf, node2PipelineMap); + // Initialize the container state machine. + Set finalStates = new HashSet(); + long pipelineCreationLeaseTimeout = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.trace("Starting Pipeline Lease Manager."); + pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout); + pipelineLeaseManager.start(); + + // These are the steady states of a container. + finalStates.add(HddsProtos.LifeCycleState.OPEN); + finalStates.add(HddsProtos.LifeCycleState.CLOSED); + + this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED, + finalStates); + initializeStateMachine(); + } + + /** + * Event and State Transition Mapping: + * + * State: ALLOCATED ---------------> CREATING + * Event: CREATE + * + * State: CREATING ---------------> OPEN + * Event: CREATED + * + * State: OPEN ---------------> CLOSING + * Event: FINALIZE + * + * State: CLOSING ---------------> CLOSED + * Event: CLOSE + * + * State: CREATING ---------------> CLOSED + * Event: TIMEOUT + * + * + * Container State Flow: + * + * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] + * (CREATE) | (CREATED) (FINALIZE) | + * | | + * | | + * |(TIMEOUT) |(CLOSE) + * | | + * +--------> [CLOSED] <--------+ + */ + private void initializeStateMachine() { + stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED, + HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleEvent.CREATE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleState.OPEN, + HddsProtos.LifeCycleEvent.CREATED); + + stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN, + HddsProtos.LifeCycleState.CLOSING, + HddsProtos.LifeCycleEvent.FINALIZE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING, + HddsProtos.LifeCycleState.CLOSED, + HddsProtos.LifeCycleEvent.CLOSE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleState.CLOSED, + HddsProtos.LifeCycleEvent.TIMEOUT); } /** @@ -88,15 +172,14 @@ public class PipelineSelector { * @return pipeline corresponding to nodes */ public static Pipeline newPipelineFromNodes( - List nodes, LifeCycleState state, - ReplicationType replicationType, ReplicationFactor replicationFactor, - String name) { + List nodes, ReplicationType replicationType, + ReplicationFactor replicationFactor, String name) { Preconditions.checkNotNull(nodes); Preconditions.checkArgument(nodes.size() > 0); String leaderId = nodes.get(0).getUuidString(); - Pipeline - pipeline = new Pipeline(leaderId, state, replicationType, - replicationFactor, name); + // A new pipeline always starts in allocated state + Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED, + replicationType, replicationFactor, name); for (DatanodeDetails node : nodes) { pipeline.addMember(node); } @@ -175,8 +258,35 @@ public class PipelineSelector { LOG.debug("Getting replication pipeline forReplicationType {} :" + " ReplicationFactor {}", replicationType.toString(), replicationFactor.toString()); - return manager. - getPipeline(replicationFactor, replicationType); + + /** + * In the Ozone world, we have a very simple policy. + * + * 1. Try to create a pipeline if there are enough free nodes. + * + * 2. This allows all nodes to part of a pipeline quickly. + * + * 3. if there are not enough free nodes, return already allocated pipeline + * in a round-robin fashion. + * + * TODO: Might have to come up with a better algorithm than this. + * Create a new placement policy that returns pipelines in round robin + * fashion. + */ + Pipeline pipeline = + manager.createPipeline(replicationFactor, replicationType); + if (pipeline == null) { + // try to return a pipeline from already allocated pipelines + pipeline = manager.getPipeline(replicationFactor, replicationType); + } else { + // if a new pipeline is created, initialize its state machine + updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE); + + //TODO: move the initialization of pipeline to Ozone Client + manager.initializePipeline(pipeline); + updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED); + } + return pipeline; } /** @@ -194,19 +304,6 @@ public class PipelineSelector { " pipelineName:{}", replicationType, pipelineName); return manager.getPipeline(pipelineName); } - /** - * Creates a pipeline from a specified set of Nodes. - */ - - public void createPipeline(ReplicationType replicationType, String - pipelineID, List datanodes) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID, - datanodes.stream().map(DatanodeDetails::toString) - .collect(Collectors.joining(","))); - manager.createPipeline(pipelineID, datanodes); - } /** * Close the pipeline with the given clusterId. @@ -251,12 +348,77 @@ public class PipelineSelector { } public void removePipeline(UUID dnId) { - Set pipelineChannelSet = + Set pipelineSet = node2PipelineMap.getPipelines(dnId); - for (Pipeline pipelineChannel : pipelineChannelSet) { - getPipelineManager(pipelineChannel.getType()) - .removePipeline(pipelineChannel); + for (Pipeline pipeline : pipelineSet) { + getPipelineManager(pipeline.getType()) + .removePipeline(pipeline); } node2PipelineMap.removeDatanode(dnId); } + + /** + * Update the Pipeline State to the next state. + * + * @param pipeline - Pipeline + * @param event - LifeCycle Event + * @throws SCMException on Failure. + */ + public void updatePipelineState(Pipeline pipeline, + HddsProtos.LifeCycleEvent event) throws IOException { + HddsProtos.LifeCycleState newState; + try { + newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event); + } catch (InvalidStateTransitionException ex) { + String error = String.format("Failed to update pipeline state %s, " + + "reason: invalid state transition from state: %s upon " + + "event: %s.", + pipeline.getPipelineName(), pipeline.getLifeCycleState(), event); + LOG.error(error); + throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); + } + + // This is a post condition after executing getNextState. + Preconditions.checkNotNull(newState); + Preconditions.checkNotNull(pipeline); + try { + switch (event) { + case CREATE: + // Acquire lease on pipeline + Lease pipelineLease = pipelineLeaseManager.acquire(pipeline); + // Register callback to be executed in case of timeout + pipelineLease.registerCallBack(() -> { + updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT); + return null; + }); + break; + case CREATED: + // Release the lease on pipeline + pipelineLeaseManager.release(pipeline); + break; + + case FINALIZE: + //TODO: cleanup pipeline by closing all the containers on the pipeline + break; + + case CLOSE: + case TIMEOUT: + // TODO: Release the nodes here when pipelines are destroyed + break; + default: + throw new SCMException("Unsupported pipeline LifeCycleEvent.", + FAILED_TO_CHANGE_PIPELINE_STATE); + } + + pipeline.setLifeCycleState(newState); + } catch (LeaseException e) { + throw new IOException("Lease Exception.", e); + } + } + + public void shutdown() { + if (pipelineLeaseManager != null) { + pipelineLeaseManager.shutdown(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index a8f8b206df7..c726ef6fb40 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -72,7 +71,7 @@ public class RatisManagerImpl extends PipelineManager { * Allocates a new ratis Pipeline from the free nodes. * * @param factor - One or Three - * @return PipelineChannel. + * @return Pipeline. */ public Pipeline allocatePipeline(ReplicationFactor factor) { List newNodesList = new LinkedList<>(); @@ -89,35 +88,23 @@ public class RatisManagerImpl extends PipelineManager { // further allocations ratisMembers.addAll(newNodesList); LOG.info("Allocating a new ratis pipeline of size: {}", count); - // Start all channel names with "Ratis", easy to grep the logs. + // Start all pipeline names with "Ratis", easy to grep the logs. String pipelineName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); - Pipeline pipeline= - PipelineSelector.newPipelineFromNodes(newNodesList, - LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName); - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(pipeline.getPipelineName(), newNodesList); - } catch (IOException e) { - return null; - } - return pipeline; + return PipelineSelector.newPipelineFromNodes(newNodesList, + ReplicationType.RATIS, factor, pipelineName); } } } return null; } - /** - * Creates a pipeline from a specified set of Nodes. - * - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. - */ - @Override - public void createPipeline(String pipelineID, - List datanodes) { - + public void initializePipeline(Pipeline pipeline) throws IOException { + //TODO:move the initialization from SCM to client + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index cf691bfb20f..bb4951f22c3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -86,29 +85,19 @@ public class StandaloneManagerImpl extends PipelineManager { // once a datanode has been added to a pipeline, exclude it from // further allocations standAloneMembers.addAll(newNodesList); - LOG.info("Allocating a new standalone pipeline channel of size: {}", - count); - String channelName = + LOG.info("Allocating a new standalone pipeline of size: {}", count); + String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3); return PipelineSelector.newPipelineFromNodes(newNodesList, - LifeCycleState.OPEN, ReplicationType.STAND_ALONE, - ReplicationFactor.ONE, channelName); + ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); } } } return null; } - /** - * Creates a pipeline from a specified set of Nodes. - * - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. - */ - @Override - public void createPipeline(String pipelineID, - List datanodes) { - //return newPipelineFromNodes(datanodes, pipelineID); + public void initializePipeline(Pipeline pipeline) { + // Nothing to be done for standalone pipeline } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index bc3505fa7ed..ffac6d52881 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.AfterClass; @@ -51,6 +53,7 @@ public class TestNode2PipelineMap { private static ContainerWithPipeline ratisContainer; private static ContainerStateMap stateMap; private static ContainerMapping mapping; + private static PipelineSelector pipelineSelector; /** * Create a MiniDFSCluster for testing. @@ -66,6 +69,7 @@ public class TestNode2PipelineMap { mapping = (ContainerMapping)scm.getScmContainerManager(); stateMap = mapping.getStateManager().getContainerStateMap(); ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner"); + pipelineSelector = mapping.getPipelineSelector(); } /** @@ -113,5 +117,15 @@ public class TestNode2PipelineMap { NavigableSet set2 = stateMap.getOpenContainerIDsByPipeline( ratisContainer.getPipeline().getPipelineName()); Assert.assertEquals(0, set2.size()); + + try { + pipelineSelector.updatePipelineState(ratisContainer.getPipeline(), + HddsProtos.LifeCycleEvent.CLOSE); + Assert.fail("closing of pipeline without finalize should fail"); + } catch (Exception e) { + Assert.assertTrue(e instanceof SCMException); + Assert.assertEquals(((SCMException)e).getResult(), + SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE); + } } }