HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-07-20 13:03:25 -07:00
parent ba25d27ddb
commit 6837121a43
16 changed files with 668 additions and 146 deletions

View File

@ -236,6 +236,11 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
"ozone.scm.pipeline.creation.lease.timeout";
public static final String
OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";

View File

@ -213,6 +213,13 @@ public class Pipeline {
return lifeCycleState;
}
/**
* Update the State of the pipeline.
*/
public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
lifeCycleState = nextState;
}
/**
* Gets the pipeline Name.
*

View File

@ -1085,5 +1085,17 @@
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
<name>ozone.scm.pipeline.creation.lease.timeout</name>
<value>60s</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
Pipeline creation timeout in milliseconds to be used by SCM. When
BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
CREATING state, SCM will now wait for the configured amount of time
to get COMPLETE_CREATE event if it doesn't receive it will move the
pipeline to DELETING.
</description>
</property>
</configuration>

View File

@ -20,14 +20,18 @@ import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus
.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,6 +47,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@ -59,6 +64,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
private final Queue<ContainerAction> containerActions;
private DatanodeStateMachine.DatanodeStates state;
/**
@ -76,6 +82,7 @@ public class StateContext {
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
containerActions = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
}
@ -198,6 +205,47 @@ public class StateContext {
return results;
}
/**
* Adds the ContainerAction to ContainerAction queue.
*
* @param containerAction ContainerAction to be added
*/
public void addContainerAction(ContainerAction containerAction) {
synchronized (containerActions) {
containerActions.add(containerAction);
}
}
/**
* Returns all the pending ContainerActions from the ContainerAction queue,
* or empty list if the queue is empty.
*
* @return List<ContainerAction>
*/
public List<ContainerAction> getAllPendingContainerActions() {
return getPendingContainerAction(Integer.MAX_VALUE);
}
/**
* Returns pending ContainerActions from the ContainerAction queue with a
* max limit on list size, or empty list if the queue is empty.
*
* @return List<ContainerAction>
*/
public List<ContainerAction> getPendingContainerAction(int maxLimit) {
List<ContainerAction> results = new ArrayList<>();
synchronized (containerActions) {
containerActions.parallelStream().limit(maxLimit).collect(Collectors.toList());
ContainerAction action = containerActions.poll();
while(results.size() < maxLimit && action != null) {
results.add(action);
action = containerActions.poll();
}
}
return results;
}
/**
* Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the

View File

@ -24,6 +24,10 @@ import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.Callable;
/**
@ -107,7 +112,7 @@ public class HeartbeatEndpointTask
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
addContainerActions(requestBuilder);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto);
@ -139,6 +144,23 @@ public class HeartbeatEndpointTask
}
}
/**
* Adds all the pending ContainerActions to the heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
*/
private void addContainerActions(
SCMHeartbeatRequestProto.Builder requestBuilder) {
List<ContainerAction> actions = context.getAllPendingContainerActions();
if (!actions.isEmpty()) {
ContainerActionsProto cap = ContainerActionsProto.newBuilder()
.addAllContainerActions(actions)
.build();
requestBuilder.setContainerActions(cap);
}
}
/**
* Returns a builder class for HeartbeatEndpointTask task.
* @return Builder.

View File

@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
optional ContainerActionsProto containerActions = 4;
optional CommandStatusReportsProto commandStatusReport = 5;
optional CommandStatusReportsProto commandStatusReport = 4;
optional ContainerActionsProto containerActions = 5;
}
/*

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@ -178,22 +171,6 @@ public class TestReportPublisher {
executorService.shutdown();
}
@Test
public void testAddingReportToHeartbeat() {
GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
GeneratedMessage containerReport = ContainerReportsProto
.getDefaultInstance();
SCMHeartbeatRequestProto.Builder heartbeatBuilder =
SCMHeartbeatRequestProto.newBuilder();
heartbeatBuilder.setDatanodeDetails(
getDatanodeDetails().getProtoBufMessage());
addReport(heartbeatBuilder, nodeReport);
addReport(heartbeatBuilder, containerReport);
SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
}
/**
* Get a datanode details.
*
@ -222,22 +199,4 @@ public class TestReportPublisher {
return builder.build();
}
/**
* Adds the report to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
* @param report the report to be added.
*/
private static void addReport(SCMHeartbeatRequestProto.Builder
requestBuilder, GeneratedMessage report) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
requestBuilder.setField(descriptor, report);
}
}
}
}

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.states.endpoint;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolClientSideTranslatorPB;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
/**
* This class tests the functionality of HeartbeatEndpointTask.
*/
public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithoutReports() throws Exception {
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertFalse(heartbeat.hasContainerActions());
}
@Test
public void testheartbeatWithNodeReports() throws Exception {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addReport(NodeReportProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertFalse(heartbeat.hasContainerActions());
}
@Test
public void testheartbeatWithContainerReports() throws Exception {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addReport(ContainerReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertFalse(heartbeat.hasContainerActions());
}
@Test
public void testheartbeatWithCommandStatusReports() throws Exception {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addReport(CommandStatusReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertTrue(heartbeat.hasCommandStatusReport());
Assert.assertFalse(heartbeat.hasContainerActions());
}
@Test
public void testheartbeatWithContainerActions() throws Exception {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addContainerAction(getContainerAction());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.hasContainerActions());
}
@Test
public void testheartbeatWithAllReports() throws Exception {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
.forClass(SCMHeartbeatRequestProto.class);
Mockito.when(scm.sendHeartbeat(argument.capture()))
.thenAnswer(invocation ->
SCMHeartbeatResponseProto.newBuilder()
.setDatanodeUUID(
((SCMHeartbeatRequestProto)invocation.getArgument(0))
.getDatanodeDetails().getUuid())
.build());
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addReport(NodeReportProto.getDefaultInstance());
context.addReport(ContainerReportsProto.getDefaultInstance());
context.addReport(CommandStatusReportsProto.getDefaultInstance());
context.addContainerAction(getContainerAction());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
Assert.assertTrue(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.hasContainerActions());
}
/**
* Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
*
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
*
* @return HeartbeatEndpointTask
*/
private HeartbeatEndpointTask getHeartbeatEndpointTask(
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
Configuration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
Mockito.mock(DatanodeStateMachine.class));
return getHeartbeatEndpointTask(conf, context, proxy);
}
/**
* Creates HeartbeatEndpointTask with the given conf, context and
* StorageContainerManager client side proxy.
*
* @param conf Configuration
* @param context StateContext
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
*
* @return HeartbeatEndpointTask
*/
private HeartbeatEndpointTask getHeartbeatEndpointTask(
Configuration conf,
StateContext context,
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setHostName("localhost")
.setIpAddress("127.0.0.1")
.build();
EndpointStateMachine endpointStateMachine = Mockito
.mock(EndpointStateMachine.class);
Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
return HeartbeatEndpointTask.newBuilder()
.setConfig(conf)
.setDatanodeDetails(datanodeDetails)
.setContext(context)
.setEndpointStateMachine(endpointStateMachine)
.build();
}
private ContainerAction getContainerAction() {
ContainerAction.Builder builder = ContainerAction.newBuilder();
ContainerInfo containerInfo = ContainerInfo.newBuilder()
.setContainerID(1L)
.build();
builder.setContainer(containerInfo)
.setAction(ContainerAction.Action.CLOSE)
.setReason(ContainerAction.Reason.CONTAINER_FULL);
return builder.build();
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.states.endpoint;

View File

@ -658,6 +658,10 @@ public class ContainerMapping implements Mapping {
if (containerStore != null) {
containerStore.close();
}
if (pipelineSelector != null) {
pipelineSelector.shutdown();
}
}
/**

View File

@ -107,6 +107,7 @@ public class SCMException extends IOException {
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
FAILED_TO_CHANGE_PIPELINE_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,

View File

@ -59,41 +59,16 @@ public abstract class PipelineManager {
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
ReplicationFactor replicationFactor, ReplicationType replicationType)
throws IOException {
/**
* In the Ozone world, we have a very simple policy.
*
* 1. Try to create a pipeline if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return pipeline in a
* round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline = allocatePipeline(replicationFactor);
ReplicationFactor replicationFactor, ReplicationType replicationType) {
Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with " +
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
} else {
pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
}
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
"free nodes or operational pipeline.");
" operational pipeline.");
return null;
} else {
return pipeline;
@ -109,7 +84,7 @@ public abstract class PipelineManager {
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
// 1. Check if pipeline channel already exists
// 1. Check if pipeline already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@ -132,7 +107,13 @@ public abstract class PipelineManager {
}
public abstract Pipeline allocatePipeline(
ReplicationFactor replicationFactor) throws IOException;
ReplicationFactor replicationFactor);
/**
* Initialize the pipeline
* TODO: move the initialization to Ozone Client later
*/
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
@ -179,12 +160,23 @@ public abstract class PipelineManager {
}
/**
* Creates a pipeline from a specified set of Nodes.
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
* Creates a pipeline with a specified replication factor and type.
* @param replicationFactor - Replication Factor.
* @param replicationType - Replication Type.
*/
public abstract void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) throws IOException;
public Pipeline createPipeline(ReplicationFactor replicationFactor,
ReplicationType replicationType) throws IOException {
Pipeline pipeline = allocatePipeline(replicationFactor);
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
}
/**
* Close the pipeline with the given clusterId.

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@ -33,17 +34,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_PIPELINE_STATE;
/**
* Sends the request to the right pipeline manager.
*/
@ -57,6 +69,10 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
private final StateMachine<LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
/**
* Constructs a pipeline Selector.
*
@ -77,6 +93,74 @@ public class PipelineSelector {
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap);
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.trace("Starting Pipeline Lease Manager.");
pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
pipelineLeaseManager.start();
// These are the steady states of a container.
finalStates.add(HddsProtos.LifeCycleState.OPEN);
finalStates.add(HddsProtos.LifeCycleState.CLOSED);
this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
}
/**
* Event and State Transition Mapping:
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
private void initializeStateMachine() {
stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleEvent.CREATE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleEvent.CREATED);
stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleEvent.FINALIZE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.CLOSE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
@ -88,15 +172,14 @@ public class PipelineSelector {
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(
List<DatanodeDetails> nodes, LifeCycleState state,
ReplicationType replicationType, ReplicationFactor replicationFactor,
String name) {
List<DatanodeDetails> nodes, ReplicationType replicationType,
ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
Pipeline
pipeline = new Pipeline(leaderId, state, replicationType,
replicationFactor, name);
// A new pipeline always starts in allocated state
Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
replicationType, replicationFactor, name);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@ -175,8 +258,35 @@ public class PipelineSelector {
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" ReplicationFactor {}", replicationType.toString(),
replicationFactor.toString());
return manager.
getPipeline(replicationFactor, replicationType);
/**
* In the Ozone world, we have a very simple policy.
*
* 1. Try to create a pipeline if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return already allocated pipeline
* in a round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline =
manager.createPipeline(replicationFactor, replicationType);
if (pipeline == null) {
// try to return a pipeline from already allocated pipelines
pipeline = manager.getPipeline(replicationFactor, replicationType);
} else {
// if a new pipeline is created, initialize its state machine
updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
//TODO: move the initialization of pipeline to Ozone Client
manager.initializePipeline(pipeline);
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
}
return pipeline;
}
/**
@ -194,19 +304,6 @@ public class PipelineSelector {
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
/**
* Creates a pipeline from a specified set of Nodes.
*/
public void createPipeline(ReplicationType replicationType, String
pipelineID, List<DatanodeDetails> datanodes) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
datanodes.stream().map(DatanodeDetails::toString)
.collect(Collectors.joining(",")));
manager.createPipeline(pipelineID, datanodes);
}
/**
* Close the pipeline with the given clusterId.
@ -251,12 +348,77 @@ public class PipelineSelector {
}
public void removePipeline(UUID dnId) {
Set<Pipeline> pipelineChannelSet =
Set<Pipeline> pipelineSet =
node2PipelineMap.getPipelines(dnId);
for (Pipeline pipelineChannel : pipelineChannelSet) {
getPipelineManager(pipelineChannel.getType())
.removePipeline(pipelineChannel);
for (Pipeline pipeline : pipelineSet) {
getPipelineManager(pipeline.getType())
.removePipeline(pipeline);
}
node2PipelineMap.removeDatanode(dnId);
}
/**
* Update the Pipeline State to the next state.
*
* @param pipeline - Pipeline
* @param event - LifeCycle Event
* @throws SCMException on Failure.
*/
public void updatePipelineState(Pipeline pipeline,
HddsProtos.LifeCycleEvent event) throws IOException {
HddsProtos.LifeCycleState newState;
try {
newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
Preconditions.checkNotNull(pipeline);
try {
switch (event) {
case CREATE:
// Acquire lease on pipeline
Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
// Register callback to be executed in case of timeout
pipelineLease.registerCallBack(() -> {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
return null;
});
break;
case CREATED:
// Release the lease on pipeline
pipelineLeaseManager.release(pipeline);
break;
case FINALIZE:
//TODO: cleanup pipeline by closing all the containers on the pipeline
break;
case CLOSE:
case TIMEOUT:
// TODO: Release the nodes here when pipelines are destroyed
break;
default:
throw new SCMException("Unsupported pipeline LifeCycleEvent.",
FAILED_TO_CHANGE_PIPELINE_STATE);
}
pipeline.setLifeCycleState(newState);
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
}
}
public void shutdown() {
if (pipelineLeaseManager != null) {
pipelineLeaseManager.shutdown();
}
}
}

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -72,7 +71,7 @@ public class RatisManagerImpl extends PipelineManager {
* Allocates a new ratis Pipeline from the free nodes.
*
* @param factor - One or Three
* @return PipelineChannel.
* @return Pipeline.
*/
public Pipeline allocatePipeline(ReplicationFactor factor) {
List<DatanodeDetails> newNodesList = new LinkedList<>();
@ -89,35 +88,23 @@ public class RatisManagerImpl extends PipelineManager {
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
// Start all channel names with "Ratis", easy to grep the logs.
// Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
Pipeline pipeline=
PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), newNodesList);
} catch (IOException e) {
return null;
}
return pipeline;
return PipelineSelector.newPipelineFromNodes(newNodesList,
ReplicationType.RATIS, factor, pipelineName);
}
}
}
return null;
}
/**
* Creates a pipeline from a specified set of Nodes.
*
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
@Override
public void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) {
public void initializePipeline(Pipeline pipeline) throws IOException {
//TODO:move the initialization from SCM to client
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
}
}
/**

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -86,29 +85,19 @@ public class StandaloneManagerImpl extends PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
LOG.info("Allocating a new standalone pipeline channel of size: {}",
count);
String channelName =
LOG.info("Allocating a new standalone pipeline of size: {}", count);
String pipelineName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, channelName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
}
}
}
return null;
}
/**
* Creates a pipeline from a specified set of Nodes.
*
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
@Override
public void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) {
//return newPipelineFromNodes(datanodes, pipelineID);
public void initializePipeline(Pipeline pipeline) {
// Nothing to be done for standalone pipeline
}
/**

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
@ -66,6 +69,7 @@ public class TestNode2PipelineMap {
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
pipelineSelector = mapping.getPipelineSelector();
}
/**
@ -113,5 +117,15 @@ public class TestNode2PipelineMap {
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
try {
pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
HddsProtos.LifeCycleEvent.CLOSE);
Assert.fail("closing of pipeline without finalize should fail");
} catch (Exception e) {
Assert.assertTrue(e instanceof SCMException);
Assert.assertEquals(((SCMException)e).getResult(),
SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
}
}
}