HDFS-11108. Ozone: use containers with the state machine. Contributed by Anu Engineer

This commit is contained in:
Anu Engineer 2016-11-17 13:01:09 -08:00
parent d10f39e751
commit 52925ef824
6 changed files with 92 additions and 84 deletions

View File

@ -72,7 +72,6 @@ import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -116,6 +115,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -192,7 +193,6 @@ import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
@ -396,7 +396,7 @@ public class DataNode extends ReconfigurableBase
private DiskBalancer diskBalancer;
private final SocketFactory socketFactory;
private OzoneContainer ozoneServer;
private DatanodeStateMachine datanodeStateMachine;
private static Tracer createTracer(Configuration conf) {
return new Tracer.Builder("DataNode").
@ -1615,11 +1615,10 @@ public class DataNode extends ReconfigurableBase
initDirectoryScanner(getConf());
if(this.ozoneEnabled) {
try {
ozoneServer = new OzoneContainer(getConf(), this.getFSDataset());
ozoneServer.start();
datanodeStateMachine = DatanodeStateMachine.initStateMachine(getConf());
LOG.info("Ozone container server started.");
} catch (Exception ex) {
LOG.error("Unable to start Ozone. ex: {}", ex.toString());
} catch (IOException ex) {
LOG.error("Unable to start Ozone. ex: {}", ex);
}
}
initDiskBalancer(data, getConf());
@ -1975,9 +1974,9 @@ public class DataNode extends ReconfigurableBase
}
if(this.ozoneEnabled) {
if(ozoneServer != null) {
if(datanodeStateMachine != null) {
try {
ozoneServer.stop();
datanodeStateMachine.close();
} catch (Exception e) {
LOG.error("Error is ozone shutdown. ex {}", e.toString());
}

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
@ -43,27 +44,26 @@ public class DatanodeStateMachine implements Closeable {
private final ExecutorService executorService;
private final Configuration conf;
private final SCMConnectionManager connectionManager;
private final long taskWaitTime;
private final long heartbeatFrequency;
private StateContext context;
private final OzoneContainer container;
/**
* Constructs a container state machine.
* Constructs a a datanode state machine.
*
* @param conf - Configration.
*/
public DatanodeStateMachine(Configuration conf) {
public DatanodeStateMachine(Configuration conf) throws IOException {
this.conf = conf;
executorService = HadoopExecutors.newScheduledThreadPool(
this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Container State Machine Thread - %d").build());
.setNameFormat("Datanode State Machine Thread - %d").build());
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT,
OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT);
heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
container = new OzoneContainer(conf);
}
/**
@ -81,10 +81,12 @@ public class DatanodeStateMachine implements Closeable {
public void start() throws IOException {
long now = 0;
long nextHB = 0;
container.start();
while (context.getState() != DatanodeStates.SHUTDOWN) {
try {
nextHB = Time.monotonicNow() + heartbeatFrequency;
context.execute(executorService, taskWaitTime, TimeUnit.SECONDS);
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
if (now < nextHB) {
Thread.sleep(nextHB - now);
@ -144,6 +146,10 @@ public class DatanodeStateMachine implements Closeable {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
endPoint.close();
}
if(container != null) {
container.stop();
}
}
/**
@ -159,7 +165,7 @@ public class DatanodeStateMachine implements Closeable {
/**
* Constructs ContainerStates.
*
* @param value
* @param value Enum Value
*/
DatanodeStates(int value) {
this.value = value;
@ -210,4 +216,22 @@ public class DatanodeStateMachine implements Closeable {
return getLastState();
}
}
public static DatanodeStateMachine initStateMachine(Configuration conf)
throws IOException {
DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
Runnable startStateMachineTask = () -> {
try {
stateMachine.start();
} catch (Exception ex) {
LOG.error("Unable to start the DatanodeState Machine", ex);
}
};
Thread thread = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Datanode State Machine Thread - %d")
.build().newThread(startStateMachineTask);
thread.start();
return stateMachine;
}
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import java.io.IOException;
/**
* Creates a netty server endpoint that acts as the communication layer for
* Ozone containers.
@ -58,9 +60,9 @@ public final class XceiverServer {
/**
* Starts running the server.
*
* @throws Exception
* @throws IOException
*/
public void start() throws Exception {
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
channel = new ServerBootstrap()

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
@ -38,6 +36,8 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
/**
* Ozone main class sets up the network server and initializes the container
* layer.
@ -57,12 +57,11 @@ public class OzoneContainer {
* Creates a network endpoint and enables Ozone container.
*
* @param ozoneConfig - Config
* @param dataSet - FsDataset.
* @throws IOException
*/
public OzoneContainer(
Configuration ozoneConfig,
FsDatasetSpi<? extends FsVolumeSpi> dataSet) throws Exception {
Configuration ozoneConfig) throws IOException {
this.ozoneConfig = ozoneConfig;
List<StorageLocation> locations = new LinkedList<>();
String[] paths = ozoneConfig.getStrings(
OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS);
@ -71,11 +70,9 @@ public class OzoneContainer {
locations.add(StorageLocation.parse(p));
}
} else {
getDataDir(dataSet, locations);
getDataDir(locations);
}
this.ozoneConfig = ozoneConfig;
manager = new ContainerManagerImpl();
manager.init(this.ozoneConfig, locations);
this.chunkManager = new ChunkManagerImpl(manager);
@ -90,43 +87,44 @@ public class OzoneContainer {
/**
* Starts serving requests to ozone container.
* @throws Exception
*
* @throws IOException
*/
public void start() throws Exception {
public void start() throws IOException {
server.start();
}
/**
* Stops the ozone container.
*
* Shutdown logic is not very obvious from the following code.
* if you need to modify the logic, please keep these comments in mind.
* Here is the shutdown sequence.
*
* <p>
* Shutdown logic is not very obvious from the following code. if you need to
* modify the logic, please keep these comments in mind. Here is the shutdown
* sequence.
* <p>
* 1. We shutdown the network ports.
*
* <p>
* 2. Now we need to wait for all requests in-flight to finish.
*
* 3. The container manager lock is a read-write lock with "Fairness" enabled.
*
* <p>
* 3. The container manager lock is a read-write lock with "Fairness"
* enabled.
* <p>
* 4. This means that the waiting threads are served in a "first-come-first
* -served" manner. Please note that this applies to waiting threads only.
*
* <p>
* 5. Since write locks are exclusive, if we are waiting to get a lock it
* implies that we are waiting for in-flight operations to complete.
*
* <p>
* 6. if there are other write operations waiting on the reader-writer lock,
* fairness guarantees that they will proceed before the shutdown lock
* request.
*
* <p>
* 7. Since all operations either take a reader or writer lock of container
* manager, we are guaranteed that we are the last operation since we have
* closed the network port, and we wait until close is successful.
*
* <p>
* 8. We take the writer lock and call shutdown on each of the managers in
* reverse order. That is chunkManager, keyManager and containerManager is
* shutdown.
*
*/
public void stop() {
LOG.info("Attempting to stop container services.");
@ -144,26 +142,14 @@ public class OzoneContainer {
/**
* Returns a paths to data dirs.
* @param dataset - FSDataset.
*
* @param pathList - List of paths.
* @throws IOException
*/
private void getDataDir(
FsDatasetSpi<? extends FsVolumeSpi> dataset,
List<StorageLocation> pathList) throws IOException {
FsDatasetSpi.FsVolumeReferences references;
try {
synchronized (dataset) {
references = dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
pathList.add(StorageLocation.parse(vol.getBaseURI().getPath()));
}
references.close();
}
} catch (IOException ex) {
LOG.error("Unable to get volume paths.", ex);
throw new IOException("Internal error", ex);
private void getDataDir(List<StorageLocation> pathList) throws IOException {
for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
pathList.add(location);
}
}
}

View File

@ -20,19 +20,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
@ -54,18 +47,20 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
/**
* Tests the datanode state machine class and its states.
*/
public class TestDatanodeStateMachine {
private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
private final int scmServerCount = 3;
private List<String> serverAddresses;
private List<RPC.Server> scmServers;
private List<ScmTestMock> mockServers;
private ExecutorService executorService;
private Configuration conf;
private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
@Before
public void setUp() throws Exception {
@ -91,14 +86,15 @@ public class TestDatanodeStateMachine {
String path = p.getPath().concat(
TestDatanodeStateMachine.class.getSimpleName());
File f = new File(path);
if(!f.mkdirs()) {
if (!f.mkdirs()) {
LOG.info("Required directories already exist.");
}
conf.set(DFS_DATANODE_DATA_DIR_KEY, path);
path = Paths.get(path.toString(),
TestDatanodeStateMachine.class.getSimpleName() + ".id").toString();
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path);
executorService = HadoopExecutors.newScheduledThreadPool(
conf.getInt(
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
@ -122,7 +118,6 @@ public class TestDatanodeStateMachine {
/**
* Assert that starting statemachine executes the Init State.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
@ -132,7 +127,7 @@ public class TestDatanodeStateMachine {
Runnable startStateMachineTask = () -> {
try {
stateMachine.start();
} catch (IOException ex) {
} catch (Exception ex) {
}
};
Thread thread1 = new Thread(startStateMachineTask);

View File

@ -32,6 +32,9 @@ import org.junit.rules.Timeout;
import java.net.URL;
/**
* Tests ozone containers.
*/
public class TestOzoneContainer {
/**
* Set the timeout for every test.
@ -55,12 +58,11 @@ public class TestOzoneContainer {
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
(containerName);
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
().get(0).getFSDataset());
OzoneContainer container = new OzoneContainer(conf);
container.start();
XceiverClient client = new XceiverClient(pipeline, conf);