diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5574c9fc838..8063e234520 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; @@ -43,27 +44,26 @@ public class DatanodeStateMachine implements Closeable { private final ExecutorService executorService; private final Configuration conf; private final SCMConnectionManager connectionManager; - private final long taskWaitTime; private final long heartbeatFrequency; private StateContext context; + private final OzoneContainer container; /** - * Constructs a container state machine. + * Constructs a a datanode state machine. * * @param conf - Configration. */ - public DatanodeStateMachine(Configuration conf) { + public DatanodeStateMachine(Configuration conf) throws IOException { this.conf = conf; executorService = HadoopExecutors.newScheduledThreadPool( this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Container State Machine Thread - %d").build()); + .setNameFormat("Datanode State Machine Thread - %d").build()); connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); - taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT, - OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT); heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf); + container = new OzoneContainer(conf); } /** @@ -81,10 +81,12 @@ public class DatanodeStateMachine implements Closeable { public void start() throws IOException { long now = 0; long nextHB = 0; + container.start(); while (context.getState() != DatanodeStates.SHUTDOWN) { try { nextHB = Time.monotonicNow() + heartbeatFrequency; - context.execute(executorService, taskWaitTime, TimeUnit.SECONDS); + context.execute(executorService, heartbeatFrequency, + TimeUnit.MILLISECONDS); now = Time.monotonicNow(); if (now < nextHB) { Thread.sleep(nextHB - now); @@ -144,6 +146,10 @@ public class DatanodeStateMachine implements Closeable { for (EndpointStateMachine endPoint : connectionManager.getValues()) { endPoint.close(); } + + if(container != null) { + container.stop(); + } } /** @@ -159,7 +165,7 @@ public class DatanodeStateMachine implements Closeable { /** * Constructs ContainerStates. * - * @param value + * @param value Enum Value */ DatanodeStates(int value) { this.value = value; @@ -210,4 +216,22 @@ public class DatanodeStateMachine implements Closeable { return getLastState(); } } + + public static DatanodeStateMachine initStateMachine(Configuration conf) + throws IOException { + DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + Runnable startStateMachineTask = () -> { + try { + stateMachine.start(); + } catch (Exception ex) { + LOG.error("Unable to start the DatanodeState Machine", ex); + } + }; + Thread thread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Datanode State Machine Thread - %d") + .build().newThread(startStateMachineTask); + thread.start(); + return stateMachine; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 19b849d93d2..6f765e2de47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import java.io.IOException; + /** * Creates a netty server endpoint that acts as the communication layer for * Ozone containers. @@ -58,9 +60,9 @@ public final class XceiverServer { /** * Starts running the server. * - * @throws Exception + * @throws IOException */ - public void start() throws Exception { + public void start() throws IOException { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); channel = new ServerBootstrap() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 2565a04c695..d088b4e3748 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ *

* http://www.apache.org/licenses/LICENSE-2.0 - *

+ *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; @@ -38,6 +36,8 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + /** * Ozone main class sets up the network server and initializes the container * layer. @@ -57,12 +57,11 @@ public class OzoneContainer { * Creates a network endpoint and enables Ozone container. * * @param ozoneConfig - Config - * @param dataSet - FsDataset. * @throws IOException */ public OzoneContainer( - Configuration ozoneConfig, - FsDatasetSpi dataSet) throws Exception { + Configuration ozoneConfig) throws IOException { + this.ozoneConfig = ozoneConfig; List locations = new LinkedList<>(); String[] paths = ozoneConfig.getStrings( OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS); @@ -71,11 +70,9 @@ public class OzoneContainer { locations.add(StorageLocation.parse(p)); } } else { - getDataDir(dataSet, locations); + getDataDir(locations); } - this.ozoneConfig = ozoneConfig; - manager = new ContainerManagerImpl(); manager.init(this.ozoneConfig, locations); this.chunkManager = new ChunkManagerImpl(manager); @@ -90,43 +87,44 @@ public class OzoneContainer { /** * Starts serving requests to ozone container. - * @throws Exception + * + * @throws IOException */ - public void start() throws Exception { + public void start() throws IOException { server.start(); } /** * Stops the ozone container. - * - * Shutdown logic is not very obvious from the following code. - * if you need to modify the logic, please keep these comments in mind. - * Here is the shutdown sequence. - * + *

+ * Shutdown logic is not very obvious from the following code. if you need to + * modify the logic, please keep these comments in mind. Here is the shutdown + * sequence. + *

* 1. We shutdown the network ports. - * + *

* 2. Now we need to wait for all requests in-flight to finish. - * - * 3. The container manager lock is a read-write lock with "Fairness" enabled. - * + *

+ * 3. The container manager lock is a read-write lock with "Fairness" + * enabled. + *

* 4. This means that the waiting threads are served in a "first-come-first * -served" manner. Please note that this applies to waiting threads only. - * + *

* 5. Since write locks are exclusive, if we are waiting to get a lock it * implies that we are waiting for in-flight operations to complete. - * + *

* 6. if there are other write operations waiting on the reader-writer lock, * fairness guarantees that they will proceed before the shutdown lock * request. - * + *

* 7. Since all operations either take a reader or writer lock of container * manager, we are guaranteed that we are the last operation since we have * closed the network port, and we wait until close is successful. - * + *

* 8. We take the writer lock and call shutdown on each of the managers in * reverse order. That is chunkManager, keyManager and containerManager is * shutdown. - * */ public void stop() { LOG.info("Attempting to stop container services."); @@ -144,26 +142,14 @@ public class OzoneContainer { /** * Returns a paths to data dirs. - * @param dataset - FSDataset. + * * @param pathList - List of paths. * @throws IOException */ - private void getDataDir( - FsDatasetSpi dataset, - List pathList) throws IOException { - FsDatasetSpi.FsVolumeReferences references; - try { - synchronized (dataset) { - references = dataset.getFsVolumeReferences(); - for (int ndx = 0; ndx < references.size(); ndx++) { - FsVolumeSpi vol = references.get(ndx); - pathList.add(StorageLocation.parse(vol.getBaseURI().getPath())); - } - references.close(); - } - } catch (IOException ex) { - LOG.error("Unable to get volume paths.", ex); - throw new IOException("Internal error", ex); + private void getDataDir(List pathList) throws IOException { + for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { + StorageLocation location = StorageLocation.parse(dir); + pathList.add(location); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index b75a925e80b..28658bd15e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -20,19 +20,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine; - -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; - +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .InitDatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .RunningDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -54,18 +47,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + /** * Tests the datanode state machine class and its states. */ public class TestDatanodeStateMachine { + private static final Logger LOG = + LoggerFactory.getLogger(TestDatanodeStateMachine.class); private final int scmServerCount = 3; private List serverAddresses; private List scmServers; private List mockServers; private ExecutorService executorService; private Configuration conf; - private static final Logger LOG = - LoggerFactory.getLogger(TestDatanodeStateMachine.class); @Before public void setUp() throws Exception { @@ -91,14 +86,15 @@ public class TestDatanodeStateMachine { String path = p.getPath().concat( TestDatanodeStateMachine.class.getSimpleName()); File f = new File(path); - if(!f.mkdirs()) { + if (!f.mkdirs()) { LOG.info("Required directories already exist."); } - + conf.set(DFS_DATANODE_DATA_DIR_KEY, path); path = Paths.get(path.toString(), TestDatanodeStateMachine.class.getSimpleName() + ".id").toString(); conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path); + executorService = HadoopExecutors.newScheduledThreadPool( conf.getInt( OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, @@ -122,7 +118,6 @@ public class TestDatanodeStateMachine { /** * Assert that starting statemachine executes the Init State. * - * @throws IOException * @throws InterruptedException */ @Test @@ -132,7 +127,7 @@ public class TestDatanodeStateMachine { Runnable startStateMachineTask = () -> { try { stateMachine.start(); - } catch (IOException ex) { + } catch (Exception ex) { } }; Thread thread1 = new Thread(startStateMachineTask); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index e765ad6caad..79217821674 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -32,6 +32,9 @@ import org.junit.rules.Timeout; import java.net.URL; +/** + * Tests ozone containers. + */ public class TestOzoneContainer { /** * Set the timeout for every test. @@ -55,12 +58,11 @@ public class TestOzoneContainer { // We don't start Ozone Container via data node, we will do it // independently in our test path. - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline - (containerName); + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline( + containerName); conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); - OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes - ().get(0).getFSDataset()); + OzoneContainer container = new OzoneContainer(conf); container.start(); XceiverClient client = new XceiverClient(pipeline, conf);