HDFS-11444. Ozone: Fix datanode ID handling in MiniOzoneCluster. Contributed by Weiwei Yang.

This commit is contained in:
Anu Engineer 2017-03-20 13:18:26 -07:00
parent 65487b579e
commit 603f2c18ec
16 changed files with 646 additions and 204 deletions

View File

@ -318,9 +318,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
public HdfsProtos.DatanodeIDProto getProtoBufMessage() {
HdfsProtos.DatanodeIDProto.Builder builder =
HdfsProtos.DatanodeIDProto.newBuilder();
return builder.setDatanodeUuid(this.getDatanodeUuid())
.setIpAddr(this.getIpcAddr())
.setIpAddr(this.getIpAddr())
.setHostName(this.getHostName())
.setXferPort(this.getXferPort())
.setInfoPort(this.getInfoPort())

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.scm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
@ -87,6 +88,15 @@ public class XceiverClient implements XceiverClientSpi {
channelFuture = b.connect(leader.getHostName(), port).sync();
}
/**
* Returns if the exceiver client connects to a server.
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
public boolean isConnected() {
return channelFuture.channel().isActive();
}
@Override
public void close() {
if(group != null) {

View File

@ -400,6 +400,7 @@ public class DataNode extends ReconfigurableBase
private final DatasetVolumeChecker volumeChecker;
private final SocketFactory socketFactory;
private DatanodeStateMachine datanodeStateMachine;
private static Tracer createTracer(Configuration conf) {
@ -1543,7 +1544,15 @@ public class DataNode extends ReconfigurableBase
+ bpRegistration.getDatanodeUuid()
+ ". Expecting " + storage.getDatanodeUuid());
}
if (isOzoneEnabled()) {
if (datanodeStateMachine == null) {
datanodeStateMachine = new DatanodeStateMachine(
getDatanodeId(),
getConf());
datanodeStateMachine.startDaemon();
}
}
registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
}
@ -1643,14 +1652,6 @@ public class DataNode extends ReconfigurableBase
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
initDirectoryScanner(getConf());
if(this.ozoneEnabled) {
try {
datanodeStateMachine = DatanodeStateMachine.initStateMachine(getConf());
LOG.info("Ozone container server started.");
} catch (IOException ex) {
LOG.error("Unable to start Ozone. ex: {}", ex);
}
}
initDiskBalancer(data, getConf());
}
@ -1661,11 +1662,11 @@ public class DataNode extends ReconfigurableBase
BPOfferService getBPOfferService(String bpid){
return blockPoolManager.get(bpid);
}
int getBpOsCount() {
return blockPoolManager.getAllNamenodeThreads().size();
}
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
@ -1998,125 +1999,119 @@ public class DataNode extends ReconfigurableBase
}
// Stop the object store handler
if (this.objectStoreHandler != null) {
this.objectStoreHandler.close();
if (isOzoneEnabled()) {
if (this.objectStoreHandler != null) {
this.objectStoreHandler.close();
}
}
if (this.ozoneEnabled) {
if (datanodeStateMachine != null) {
volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
}
if (pauseMonitor != null) {
pauseMonitor.stop();
}
// shouldRun is set to false here to prevent certain threads from exiting
// before the restart prep is done.
this.shouldRun = false;
// wait reconfiguration thread, if any, to exit
shutdownReconfigurationTask();
// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
while (true) {
// When shutting down for restart, wait 2.5 seconds before forcing
// termination of receiver threads.
if (!this.shutdownForUpgrade || (this.shutdownForUpgrade && (
Time.monotonicNow() - timeNotified > 1000))) {
this.threadGroup.interrupt();
break;
}
LOG.info("Waiting for threadgroup to exit, active threads is "
+ this.threadGroup.activeCount());
if (this.threadGroup.activeCount() == 0) {
break;
}
try {
datanodeStateMachine.close();
} catch (Exception e) {
LOG.error("Error is ozone shutdown. ex {}", e.toString());
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
}
sleepMs = sleepMs * 3 / 2; // exponential backoff
if (sleepMs > 200) {
sleepMs = 200;
}
}
volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
this.threadGroup = null;
}
if (this.dataXceiverServer != null) {
// wait for dataXceiverServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
if (pauseMonitor != null) {
pauseMonitor.stop();
}
if (this.localDataXceiverServer != null) {
// wait for localDataXceiverServer to terminate
try {
this.localDataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0);
}
// shouldRun is set to false here to prevent certain threads from exiting
// before the restart prep is done.
// IPC server needs to be shutdown late in the process, otherwise
// shutdown command response won't get sent.
if (ipcServer != null) {
ipcServer.stop();
}
if (blockPoolManager != null) {
try {
this.blockPoolManager.shutDownAll(bposArray);
} catch (InterruptedException ie) {
LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
}
}
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
if (data != null) {
data.shutdown();
}
if (metrics != null) {
metrics.shutdown();
}
if (diskMetrics != null) {
diskMetrics.shutdownAndWait();
}
if (dataNodeInfoBeanName != null) {
MBeans.unregister(dataNodeInfoBeanName);
dataNodeInfoBeanName = null;
}
if (shortCircuitRegistry != null) {
shortCircuitRegistry.shutdown();
}
LOG.info("Shutdown complete.");
synchronized (this) {
// it is already false, but setting it again to avoid a findbug warning.
this.shouldRun = false;
// wait reconfiguration thread, if any, to exit
shutdownReconfigurationTask();
// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
while (true) {
// When shutting down for restart, wait 2.5 seconds before forcing
// termination of receiver threads.
if (!this.shutdownForUpgrade ||
(this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
> 1000))) {
this.threadGroup.interrupt();
break;
}
LOG.info("Waiting for threadgroup to exit, active threads is " +
this.threadGroup.activeCount());
if (this.threadGroup.activeCount() == 0) {
break;
}
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
}
sleepMs = sleepMs * 3 / 2; // exponential backoff
if (sleepMs > 200) {
sleepMs = 200;
}
}
this.threadGroup = null;
}
if (this.dataXceiverServer != null) {
// wait for dataXceiverServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if (this.localDataXceiverServer != null) {
// wait for localDataXceiverServer to terminate
try {
this.localDataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0);
}
// IPC server needs to be shutdown late in the process, otherwise
// shutdown command response won't get sent.
if (ipcServer != null) {
ipcServer.stop();
}
if (blockPoolManager != null) {
try {
this.blockPoolManager.shutDownAll(bposArray);
} catch (InterruptedException ie) {
LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
}
}
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
if (data != null) {
data.shutdown();
}
if (metrics != null) {
metrics.shutdown();
}
if (diskMetrics != null) {
diskMetrics.shutdownAndWait();
}
if (dataNodeInfoBeanName != null) {
MBeans.unregister(dataNodeInfoBeanName);
dataNodeInfoBeanName = null;
}
if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
LOG.info("Shutdown complete.");
synchronized (this) {
// it is already false, but setting it again to avoid a findbug warning.
this.shouldRun = false;
// Notify the main thread.
notifyAll();
}
tracer.close();
// Notify the main thread.
notifyAll();
}
tracer.close();
}
/**
@ -3149,6 +3144,12 @@ public class DataNode extends ReconfigurableBase
} catch (InterruptedException ie) { }
}
shutdown();
if (isOzoneEnabled()) {
if(datanodeStateMachine != null) {
datanodeStateMachine.stopDaemon();
}
}
}
};

View File

@ -30,6 +30,20 @@ public final class OzoneConfigKeys {
public static final String DFS_CONTAINER_IPC_PORT =
"dfs.container.ipc";
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
/**
*
* When set to true, allocate a random free port for ozone container,
* so that a mini cluster is able to launch multiple containers on a node.
*
* When set to false (default), container port is fixed as specified by
* DFS_CONTAINER_IPC_PORT_DEFAULT.
*/
public static final String DFS_CONTAINER_IPC_RANDOM_PORT =
"dfs.container.ipc.random.port";
public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
false;
public static final String OZONE_LOCALSTORAGE_ROOT =
"ozone.localstorage.root";
public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";

View File

@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
import org.apache.hadoop.utils.LevelDBStore;
@ -30,9 +32,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import static org.apache.commons.io.FilenameUtils.removeExtension;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
@ -367,4 +373,83 @@ public final class ContainerUtils {
FileUtils.forceDelete(containerPath.toFile());
FileUtils.forceDelete(metaPath.toFile());
}
/**
* Write datanode ID protobuf messages to an ID file.
* The old ID file will be overwritten.
*
* @param ids A set of {@link DatanodeID}
* @param path Local ID file path
* @throws IOException When read/write error occurs
*/
private synchronized static void writeDatanodeIDs(List<DatanodeID> ids,
File path) throws IOException {
if (path.exists()) {
if (!path.delete() || !path.createNewFile()) {
throw new IOException("Unable to overwrite the datanode ID file.");
}
} else {
if(!path.getParentFile().exists() &&
!path.getParentFile().mkdirs()) {
throw new IOException("Unable to create datanode ID directories.");
}
}
try (FileOutputStream out = new FileOutputStream(path)) {
for (DatanodeID id : ids) {
HdfsProtos.DatanodeIDProto dnId = id.getProtoBufMessage();
dnId.writeDelimitedTo(out);
}
}
}
/**
* Persistent a {@link DatanodeID} to a local file.
* It reads the IDs first and append a new entry only if the ID is new.
* This is to avoid on some dirty environment, this file gets too big.
*
* @throws IOException when read/write error occurs
*/
public synchronized static void writeDatanodeIDTo(DatanodeID dnID,
File path) throws IOException {
List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(path);
// Only create or overwrite the file
// if the ID doesn't exist in the ID file
for (DatanodeID id : ids) {
if (id.getProtoBufMessage()
.equals(dnID.getProtoBufMessage())) {
return;
}
}
ids.add(dnID);
writeDatanodeIDs(ids, path);
}
/**
* Read {@link DatanodeID} from a local ID file and return a set of
* datanode IDs. If the ID file doesn't exist, an empty set is returned.
*
* @param path ID file local path
* @return A set of {@link DatanodeID}
* @throws IOException If the id file is malformed or other I/O exceptions
*/
public synchronized static List<DatanodeID> readDatanodeIDsFrom(File path)
throws IOException {
List<DatanodeID> ids = new ArrayList<DatanodeID>();
if (!path.exists()) {
return ids;
}
try(FileInputStream in = new FileInputStream(path)) {
while(in.available() > 0) {
try {
HdfsProtos.DatanodeIDProto id =
HdfsProtos.DatanodeIDProto.parseDelimitedFrom(in);
ids.add(DatanodeID.getFromProtoBuf(id));
} catch (IOException e) {
throw new IOException("Failed to parse Datanode ID from "
+ path.getAbsolutePath(), e);
}
}
}
return ids;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
@ -44,13 +45,16 @@ public class DatanodeStateMachine implements Closeable {
private final long heartbeatFrequency;
private StateContext context;
private final OzoneContainer container;
private DatanodeID datanodeID = null;
/**
* Constructs a a datanode state machine.
*
* @param datanodeID - DatanodeID used to identify a datanode
* @param conf - Configration.
*/
public DatanodeStateMachine(Configuration conf) throws IOException {
public DatanodeStateMachine(DatanodeID datanodeID,
Configuration conf) throws IOException {
this.conf = conf;
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
@ -60,6 +64,26 @@ public class DatanodeStateMachine implements Closeable {
heartbeatFrequency = TimeUnit.SECONDS.toMillis(
OzoneClientUtils.getScmHeartbeatInterval(conf));
container = new OzoneContainer(conf);
this.datanodeID = datanodeID;
}
public DatanodeStateMachine(Configuration conf)
throws IOException {
this(null, conf);
}
public void setDatanodeID(DatanodeID datanodeID) {
this.datanodeID = datanodeID;
}
/**
*
* Return DatanodeID if set, return null otherwise.
*
* @return datanodeID
*/
public DatanodeID getDatanodeID() {
return this.datanodeID;
}
/**
@ -71,10 +95,14 @@ public class DatanodeStateMachine implements Closeable {
return connectionManager;
}
public OzoneContainer getContainer() {
return this.container;
}
/**
* Runs the state machine at a fixed frequency.
*/
public void start() throws IOException {
private void start() throws IOException {
long now = 0;
long nextHB = 0;
container.start();
@ -216,12 +244,14 @@ public class DatanodeStateMachine implements Closeable {
}
}
public static DatanodeStateMachine initStateMachine(Configuration conf)
throws IOException {
DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
/**
* Start datanode state machine as a single thread daemon.
*/
public void startDaemon() {
Runnable startStateMachineTask = () -> {
try {
stateMachine.start();
start();
LOG.info("Ozone container server started.");
} catch (Exception ex) {
LOG.error("Unable to start the DatanodeState Machine", ex);
}
@ -231,6 +261,32 @@ public class DatanodeStateMachine implements Closeable {
.setNameFormat("Datanode State Machine Thread - %d")
.build().newThread(startStateMachineTask);
thread.start();
return stateMachine;
}
/**
* Stop the daemon thread of the datanode state machine.
*/
public synchronized void stopDaemon() {
try {
context.setState(DatanodeStates.SHUTDOWN);
this.close();
LOG.info("Ozone container server stopped.");
} catch (IOException e) {
LOG.error("Stop ozone container server failed.", e);
}
}
/**
*
* Check if the datanode state machine daemon is stopped.
*
* @return True if datanode state machine daemon is stopped
* and false otherwise.
*/
@VisibleForTesting
public boolean isDaemonStopped() {
return this.executorService.isShutdown()
&& this.getContext().getExecutionCount() == 0
&& this.getContext().getState() == DatanodeStates.SHUTDOWN;
}
}

View File

@ -80,6 +80,15 @@ public class StateContext {
return parent;
}
/**
* Get the container server port.
* @return The container server port if available, return -1 if otherwise
*/
public int getContainerPort() {
return parent == null ?
-1 : parent.getContainer().getContainerServerPort();
}
/**
* Returns true if we are entering a new state.
*

View File

@ -18,14 +18,19 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.Callable;
@ -87,9 +92,29 @@ public class InitDatanodeState implements DatanodeState,
connectionManager.addSCMServer(addr);
}
}
// If datanode ID is set, persist it to the ID file.
persistContainerDatanodeID();
return this.context.getState().getNextState();
}
/**
* Update Ozone container port to the datanode ID,
* and persist the ID to a local file.
*/
private void persistContainerDatanodeID() throws IOException {
String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID);
File idPath = new File(dataNodeIDPath);
int containerPort = this.context.getContainerPort();
DatanodeID datanodeID = this.context.getParent().getDatanodeID();
if (datanodeID != null) {
datanodeID.setContainerPort(containerPort);
ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
}
}
/**
* Called before entering this state.
*/

View File

@ -19,8 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
@ -35,16 +34,11 @@ import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@ -83,63 +77,30 @@ public class RunningDatanodeState implements DatanodeState {
private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
readPersistedDatanodeID(Path idPath) throws IOException {
Preconditions.checkNotNull(idPath);
StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
containerIDProto;
try (FileInputStream stream = new FileInputStream(idPath.toFile())) {
containerIDProto = StorageContainerDatanodeProtocolProtos
.ContainerNodeIDProto.parseFrom(stream);
return containerIDProto;
DatanodeID datanodeID = null;
List<DatanodeID> datanodeIDs =
ContainerUtils.readDatanodeIDsFrom(idPath.toFile());
int containerPort = this.context.getContainerPort();
for(DatanodeID dnId : datanodeIDs) {
if(dnId.getContainerPort() == containerPort) {
datanodeID = dnId;
break;
}
}
}
/**
* Create a DatanodeID from the datanode information.
*
* @return DatanodeID
* @throws UnknownHostException
*/
private DatanodeID createDatanodeID() throws UnknownHostException {
DatanodeID temp = new DatanodeID(
//TODO : Replace this with proper network and kerberos
// support code.
InetAddress.getLocalHost().getHostAddress(),
DataNode.getHostName(conf),
UUID.randomUUID().toString(),
0, /** XferPort - SCM does not use this port */
0, /** Info port - SCM does not use this port */
0, /** Info Secure Port - SCM does not use this port */
0); /** IPC port - SCM does not use this port */
// TODO: make this dynamically discoverable. SCM can hand out this
// port number to calling applications. This makes it easy to run multiple
// container endpoints on the same machine.
temp.setContainerPort(OzoneClientUtils.getContainerPort(conf));
return temp;
}
/**
* Creates a new ContainerID that persists both DatanodeID and ClusterID.
*
* @param idPath Path to the id file.
* @return ContainerNodeIDProto
* @throws UnknownHostException
*/
private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
createNewContainerID(Path idPath)
throws IOException {
if(!idPath.getParent().toFile().exists() &&
!idPath.getParent().toFile().mkdirs()) {
LOG.error("Failed to create container ID locations. Path: {}",
idPath.getParent());
throw new IOException("Unable to create container ID directories.");
}
StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
containerIDProto = StorageContainerDatanodeProtocolProtos
.ContainerNodeIDProto.newBuilder()
.setDatanodeID(createDatanodeID().getProtoBufMessage()).build();
try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) {
stream.write(containerIDProto.toByteArray());
if (datanodeID == null) {
throw new IOException("No valid datanode ID found from "
+ idPath.toFile().getAbsolutePath()
+ " that matches container port "
+ containerPort);
} else {
StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
containerIDProto =
StorageContainerDatanodeProtocolProtos
.ContainerNodeIDProto
.newBuilder()
.setDatanodeID(datanodeID.getProtoBufMessage())
.build();
return containerIDProto;
}
}
@ -171,15 +132,7 @@ public class RunningDatanodeState implements DatanodeState {
} catch (IOException ex) {
LOG.trace("Not able to find container Node ID, creating it.", ex);
}
// Not found, let us create a new datanode ID, persist it and return that
// info to SCM.
try {
nodeID = createNewContainerID(Paths.get(dataNodeIDPath));
LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
return nodeID;
} catch (IOException ex) {
LOG.error("Creating new node ID failed.", ex);
}
this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
return null;
}

View File

@ -29,15 +29,20 @@ import io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
/**
* Creates a netty server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServer implements XceiverServerSpi {
private final int port;
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServer.class);
private int port;
private final ContainerDispatcher storageContainer;
private EventLoopGroup bossGroup;
@ -52,11 +57,30 @@ public final class XceiverServer implements XceiverServerSpi {
public XceiverServer(Configuration conf,
ContainerDispatcher dispatcher) {
Preconditions.checkNotNull(conf);
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
// use that as the container port
if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
this.port = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", this.port);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", this.port, e);
}
}
this.storageContainer = dispatcher;
}
@Override
public int getIPCPort() {
return this.port;
}
@Override
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();

View File

@ -28,4 +28,7 @@ public interface XceiverServerSpi {
/** Stops a running server. */
void stop();
/** Get server IPC port. */
int getIPCPort();
}

View File

@ -167,4 +167,13 @@ public class OzoneContainer {
public SCMNodeReport getNodeReport() throws IOException {
return this.manager.getNodeReport();
}
/**
* Returns the container server IPC port.
*
* @return Container server IPC port.
*/
public int getContainerServerPort() {
return server.getIPCPort();
}
}

View File

@ -173,6 +173,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
numDataNodes);
return false;
}
}, 1000, 5 * 60 * 1000); //wait for 5 mins.
@ -228,6 +229,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private int containerWorkerThreadInterval = 1;
private Boolean randomContainerPort = true;
/**
* Creates a new Builder.
@ -247,6 +249,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster
runID = UUID.randomUUID();
}
public Builder setRandomContainerPort(boolean randomPort) {
this.randomContainerPort = randomPort;
return this;
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
@ -319,6 +326,10 @@ public final class MiniOzoneCluster extends MiniDFSCluster
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
// Use random ports for ozone containers in mini cluster,
// in order to launch multiple container servers per node.
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
randomContainerPort);
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.*;
/**
* Test cases for mini ozone cluster.
*/
public class TestMiniOzoneCluster {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
private final static File WRITE_TMP = new File(TEST_ROOT, "write");
private final static File READ_TMP = new File(TEST_ROOT, "read");
@BeforeClass
public static void setup() {
conf = new OzoneConfiguration();
WRITE_TMP.mkdirs();
READ_TMP.mkdirs();
WRITE_TMP.deleteOnExit();
READ_TMP.deleteOnExit();
}
@AfterClass
public static void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster.close();
}
}
@Test(timeout = 30000)
public void testStartMultipleDatanodes() throws Exception {
final int numberOfNodes = 3;
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(numberOfNodes)
.setHandlerType("distributed").build();
// make sure datanode.id file is correct
File idPath = new File(
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
assertTrue(idPath.exists());
List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(idPath);
assertEquals(numberOfNodes, ids.size());
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), numberOfNodes);
for(DataNode dn : datanodes) {
// Each datanode ID should match an entry in the ID file
assertTrue("Datanode ID not found in ID file",
ids.contains(dn.getDatanodeId()));
// Create a single member pipe line
String containerName = OzoneUtils.getRequestID();
DatanodeID dnId = dn.getDatanodeId();
Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid());
pipeline.addMember(dnId);
pipeline.setContainerName(containerName);
// Verify client is able to connect to the container
try (XceiverClient client = new XceiverClient(pipeline, conf)){
client.connect();
assertTrue(client.isConnected());
}
}
}
@Test
public void testDatanodeIDPersistent() throws Exception {
// Generate IDs for testing
DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
DatanodeID id2 = DFSTestUtil.getLocalDatanodeID(2);
DatanodeID id3 = DFSTestUtil.getLocalDatanodeID(3);
id1.setContainerPort(1);
id2.setContainerPort(2);
id3.setContainerPort(3);
// Write a single ID to the file and read it out
File validIdsFile = new File(WRITE_TMP, "valid-values.id");
validIdsFile.delete();
ContainerUtils.writeDatanodeIDTo(id1, validIdsFile);
List<DatanodeID> validIds = ContainerUtils
.readDatanodeIDsFrom(validIdsFile);
assertEquals(1, validIds.size());
DatanodeID id11 = validIds.iterator().next();
assertEquals(id11, id1);
assertEquals(id11.getProtoBufMessage(), id1.getProtoBufMessage());
// Write should avoid duplicate entries
File noDupIDFile = new File(WRITE_TMP, "no-dup-values.id");
noDupIDFile.delete();
ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
ContainerUtils.writeDatanodeIDTo(id2, noDupIDFile);
ContainerUtils.writeDatanodeIDTo(id3, noDupIDFile);
List<DatanodeID> noDupIDs =ContainerUtils
.readDatanodeIDsFrom(noDupIDFile);
assertEquals(3, noDupIDs.size());
assertTrue(noDupIDs.contains(id1));
assertTrue(noDupIDs.contains(id2));
assertTrue(noDupIDs.contains(id3));
// Write should fail if unable to create file or directory
File invalidPath = new File(WRITE_TMP, "an/invalid/path");
try {
ContainerUtils.writeDatanodeIDTo(id1, invalidPath);
} catch (Exception e) {
e.printStackTrace();
assertTrue(e instanceof IOException);
}
// Read should return an empty value if file doesn't exist
File nonExistFile = new File(READ_TMP, "non_exist.id");
nonExistFile.delete();
List<DatanodeID> emptyIDs =
ContainerUtils.readDatanodeIDsFrom(nonExistFile);
assertTrue(emptyIDs.isEmpty());
// Read should fail if the file is malformed
File malformedFile = new File(READ_TMP, "malformed.id");
createMalformedIDFile(malformedFile);
try {
ContainerUtils.readDatanodeIDsFrom(malformedFile);
fail("Read a malformed ID file should fail");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
}
@Test
public void testContainerRandomPort() throws IOException {
Configuration ozoneConf = SCMTestUtils.getConf();
File testDir = PathUtils.getTestDir(TestOzoneContainer.class);
ozoneConf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
// Each instance of SM will create an ozone container
// that bounds to a random port.
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
assertTrue(ports.add(sm2.getContainer().getContainerServerPort()));
assertTrue(ports.add(sm3.getContainer().getContainerServerPort()));
}
// Turn off the random port flag and test again
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
assertFalse(ports.add(sm2.getContainer().getContainerServerPort()));
assertFalse(ports.add(sm3.getContainer().getContainerServerPort()));
assertEquals(ports.iterator().next().intValue(),
conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT));
}
}
private void createMalformedIDFile(File malformedFile)
throws IOException{
malformedFile.delete();
DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
ContainerUtils.writeDatanodeIDTo(id1, malformedFile);
FileOutputStream out = new FileOutputStream(malformedFile);
out.write("malformed".getBytes());
out.close();
}
}

View File

@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.container.common;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@ -51,6 +54,7 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static org.junit.Assert.assertTrue;
/**
* Tests the datanode state machine class and its states.
@ -134,14 +138,18 @@ public class TestDatanodeStateMachine {
* @throws InterruptedException
*/
@Test
public void testDatanodeStateMachineStartThread() throws IOException,
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
DatanodeStateMachine.initStateMachine(conf)) {
new DatanodeStateMachine(conf)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
1000, 30000);
stateMachine.stopDaemon();
assertTrue(stateMachine.isDaemonStopped());
}
}
@ -178,6 +186,15 @@ public class TestDatanodeStateMachine {
@Test
public void testDatanodeStateContext() throws IOException,
InterruptedException, ExecutionException, TimeoutException {
// There is no mini cluster started in this test,
// create a ID file so that state machine could load a fake datanode ID.
File idPath = new File(
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
idPath.delete();
DatanodeID dnID = DFSTestUtil.getLocalDatanodeID();
dnID.setContainerPort(ScmConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
ContainerUtils.writeDatanodeIDTo(dnID, idPath);
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();

View File

@ -57,6 +57,7 @@ public class TestOzoneContainer {
MiniOzoneCluster cluster = null;
try {
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
@ -108,6 +109,7 @@ public class TestOzoneContainer {
pipeline.getLeader().getContainerPort());
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
@ -208,6 +210,7 @@ public class TestOzoneContainer {
pipeline.getLeader().getContainerPort());
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
@ -273,6 +276,7 @@ public class TestOzoneContainer {
pipeline.getLeader().getContainerPort());
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
@ -364,5 +368,4 @@ public class TestOzoneContainer {
}
}
}
}