HDFS-12739. Add Support for SCM --init command. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
709d56fdc9
commit
9734f505ea
|
@ -147,6 +147,15 @@ public final class OzoneConsts {
|
|||
|
||||
public static final int INVALID_PORT = -1;
|
||||
|
||||
/**
|
||||
* Type of the node.
|
||||
*/
|
||||
public enum NodeType {
|
||||
KSM,
|
||||
SCM,
|
||||
DATANODE
|
||||
}
|
||||
|
||||
private OzoneConsts() {
|
||||
// Never Constructed
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.common;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* The exception is thrown when file system state is inconsistent
|
||||
* and is not recoverable.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class InconsistentStorageStateException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InconsistentStorageStateException(String descr) {
|
||||
super(descr);
|
||||
}
|
||||
|
||||
public InconsistentStorageStateException(File dir, String descr) {
|
||||
super("Directory " + getFilePath(dir) + " is in an inconsistent state: "
|
||||
+ descr);
|
||||
}
|
||||
|
||||
private static String getFilePath(File dir) {
|
||||
try {
|
||||
return dir.getCanonicalPath();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
return dir.getPath();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,249 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.common;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts.NodeType;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
* Storage information file. This Class defines the methods to check
|
||||
* the consistency of the storage dir and the version file.
|
||||
* <p>
|
||||
* Local storage information is stored in a separate file VERSION.
|
||||
* It contains type of the node,
|
||||
* the storage layout version, the SCM id, and
|
||||
* the KSM/SCM state creation time.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class Storage {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Storage.class);
|
||||
|
||||
protected static final String STORAGE_DIR_CURRENT = "current";
|
||||
protected static final String STORAGE_FILE_VERSION = "VERSION";
|
||||
|
||||
private final NodeType nodeType;
|
||||
private final File root;
|
||||
private final File storageDir;
|
||||
|
||||
private StorageState state;
|
||||
private StorageInfo storageInfo;
|
||||
|
||||
|
||||
/**
|
||||
* Determines the state of the Version file.
|
||||
*/
|
||||
public enum StorageState {
|
||||
NON_EXISTENT, NOT_INITIALIZED, INITIALIZED
|
||||
}
|
||||
|
||||
public Storage(NodeType type, File root, String sdName)
|
||||
throws IOException {
|
||||
this.nodeType = type;
|
||||
this.root = root;
|
||||
this.storageDir = new File(root, sdName);
|
||||
this.state = getStorageState();
|
||||
if (state == StorageState.INITIALIZED) {
|
||||
this.storageInfo = new StorageInfo(type, getVersionFile());
|
||||
} else {
|
||||
this.storageInfo = new StorageInfo(
|
||||
nodeType, StorageInfo.newClusterID(), Time.now());
|
||||
setNodeProperties();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the path of the Storage dir.
|
||||
* @return Stoarge dir path
|
||||
*/
|
||||
public String getStorageDir() {
|
||||
return storageDir.getAbsoluteFile().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the state of the version file.
|
||||
* @return the state of the Version file
|
||||
*/
|
||||
public StorageState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public NodeType getNodeType() {
|
||||
return storageInfo.getNodeType();
|
||||
}
|
||||
|
||||
public String getClusterID() {
|
||||
return storageInfo.getClusterID();
|
||||
}
|
||||
|
||||
public long getCreationTime() {
|
||||
return storageInfo.getCreationTime();
|
||||
}
|
||||
|
||||
public void setClusterId(String clusterId) throws IOException {
|
||||
if (state == StorageState.INITIALIZED) {
|
||||
throw new IOException(
|
||||
"Storage directory " + storageDir + "already initialized.");
|
||||
} else {
|
||||
storageInfo.setClusterId(clusterId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retreives the storageInfo instance to read/write the common
|
||||
* version file properties.
|
||||
* @return the instance of the storageInfo class
|
||||
*/
|
||||
protected StorageInfo getStorageInfo() {
|
||||
return storageInfo;
|
||||
}
|
||||
|
||||
abstract protected Properties getNodeProperties();
|
||||
|
||||
/**
|
||||
* Sets the Node properties spaecific to KSM/SCM.
|
||||
*/
|
||||
private void setNodeProperties() {
|
||||
Properties nodeProperties = getNodeProperties();
|
||||
if (nodeProperties != null) {
|
||||
for (String key : nodeProperties.stringPropertyNames()) {
|
||||
storageInfo.setProperty(key, nodeProperties.getProperty(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Directory {@code current} contains latest files defining
|
||||
* the file system meta-data.
|
||||
*
|
||||
* @return the directory path
|
||||
*/
|
||||
private File getCurrentDir() {
|
||||
return new File(storageDir, STORAGE_DIR_CURRENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* File {@code VERSION} contains the following fields:
|
||||
* <ol>
|
||||
* <li>node type</li>
|
||||
* <li>KSM/SCM state creation time</li>
|
||||
* <li>other fields specific for this node type</li>
|
||||
* </ol>
|
||||
* The version file is always written last during storage directory updates.
|
||||
* The existence of the version file indicates that all other files have
|
||||
* been successfully written in the storage directory, the storage is valid
|
||||
* and does not need to be recovered.
|
||||
*
|
||||
* @return the version file path
|
||||
*/
|
||||
private File getVersionFile() {
|
||||
return new File(getCurrentDir(), STORAGE_FILE_VERSION);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check to see if current/ directory is empty. This method is used
|
||||
* before determining to format the directory.
|
||||
* @throws IOException if unable to list files under the directory.
|
||||
*/
|
||||
private void checkEmptyCurrent() throws IOException {
|
||||
File currentDir = getCurrentDir();
|
||||
if (!currentDir.exists()) {
|
||||
// if current/ does not exist, it's safe to format it.
|
||||
return;
|
||||
}
|
||||
try (DirectoryStream<Path> dirStream = Files
|
||||
.newDirectoryStream(currentDir.toPath())) {
|
||||
if (dirStream.iterator().hasNext()) {
|
||||
throw new InconsistentStorageStateException(getCurrentDir(),
|
||||
"Can't initialize the storage directory because the current "
|
||||
+ "it is not empty.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check consistency of the storage directory.
|
||||
*
|
||||
* @return state {@link StorageState} of the storage directory
|
||||
* @throws IOException
|
||||
*/
|
||||
private StorageState getStorageState() throws IOException {
|
||||
assert root != null : "root is null";
|
||||
String rootPath = root.getCanonicalPath();
|
||||
try { // check that storage exists
|
||||
if (!root.exists()) {
|
||||
// storage directory does not exist
|
||||
LOG.warn("Storage directory " + rootPath + " does not exist");
|
||||
return StorageState.NON_EXISTENT;
|
||||
}
|
||||
// or is inaccessible
|
||||
if (!root.isDirectory()) {
|
||||
LOG.warn(rootPath + "is not a directory");
|
||||
return StorageState.NON_EXISTENT;
|
||||
}
|
||||
if (!FileUtil.canWrite(root)) {
|
||||
LOG.warn("Cannot access storage directory " + rootPath);
|
||||
return StorageState.NON_EXISTENT;
|
||||
}
|
||||
} catch (SecurityException ex) {
|
||||
LOG.warn("Cannot access storage directory " + rootPath, ex);
|
||||
return StorageState.NON_EXISTENT;
|
||||
}
|
||||
|
||||
// check whether current directory is valid
|
||||
File versionFile = getVersionFile();
|
||||
boolean hasCurrent = versionFile.exists();
|
||||
|
||||
if (hasCurrent) {
|
||||
return StorageState.INITIALIZED;
|
||||
} else {
|
||||
checkEmptyCurrent();
|
||||
return StorageState.NOT_INITIALIZED;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the Version file if not present,
|
||||
* otherwise returns with IOException.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void initialize() throws IOException {
|
||||
if (state == StorageState.INITIALIZED) {
|
||||
throw new IOException("Storage directory already initialized.");
|
||||
}
|
||||
if (!getCurrentDir().mkdirs()) {
|
||||
throw new IOException("Cannot create directory " + getCurrentDir());
|
||||
}
|
||||
storageInfo.writeTo(getVersionFile());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ozone.OzoneConsts.NodeType;
|
||||
|
||||
/**
|
||||
* Common class for storage information. This class defines the common
|
||||
* properties and functions to set them , write them into the version file
|
||||
* and read them from the version file.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StorageInfo {
|
||||
|
||||
private Properties properties = new Properties();
|
||||
|
||||
/**
|
||||
* Property to hold node type.
|
||||
*/
|
||||
private static final String NODE_TYPE = "nodeType";
|
||||
/**
|
||||
* Property to hold ID of the cluster.
|
||||
*/
|
||||
private static final String CLUSTER_ID = "clusterID";
|
||||
/**
|
||||
* Property to hold creation time of the storage.
|
||||
*/
|
||||
private static final String CREATION_TIME = "cTime";
|
||||
|
||||
/**
|
||||
* Constructs StorageInfo instance.
|
||||
* @param type
|
||||
* Type of the node using the storage
|
||||
* @param cid
|
||||
* Cluster ID
|
||||
* @param cT
|
||||
* Cluster creation Time
|
||||
|
||||
* @throws IOException
|
||||
*/
|
||||
public StorageInfo(NodeType type, String cid, long cT)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(type);
|
||||
Preconditions.checkNotNull(cid);
|
||||
Preconditions.checkNotNull(cT);
|
||||
properties.setProperty(NODE_TYPE, type.name());
|
||||
properties.setProperty(CLUSTER_ID, cid);
|
||||
properties.setProperty(CREATION_TIME, String.valueOf(cT));
|
||||
}
|
||||
|
||||
public StorageInfo(NodeType type, File propertiesFile)
|
||||
throws IOException {
|
||||
this.properties = readFrom(propertiesFile);
|
||||
verifyNodeType(type);
|
||||
verifyClusterId();
|
||||
verifyCreationTime();
|
||||
}
|
||||
|
||||
public NodeType getNodeType() {
|
||||
return NodeType.valueOf(properties.getProperty(NODE_TYPE));
|
||||
}
|
||||
|
||||
public String getClusterID() {
|
||||
return properties.getProperty(CLUSTER_ID);
|
||||
}
|
||||
|
||||
public Long getCreationTime() {
|
||||
String creationTime = properties.getProperty(CREATION_TIME);
|
||||
if(creationTime != null) {
|
||||
return Long.parseLong(creationTime);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public String getProperty(String key) {
|
||||
return properties.getProperty(key);
|
||||
}
|
||||
|
||||
public void setProperty(String key, String value) {
|
||||
properties.setProperty(key, value);
|
||||
}
|
||||
|
||||
public void setClusterId(String clusterId) {
|
||||
properties.setProperty(CLUSTER_ID, clusterId);
|
||||
}
|
||||
|
||||
private void verifyNodeType(NodeType type)
|
||||
throws InconsistentStorageStateException {
|
||||
NodeType nodeType = getNodeType();
|
||||
Preconditions.checkNotNull(nodeType);
|
||||
if(type != nodeType) {
|
||||
throw new InconsistentStorageStateException("Expected NodeType: " + type +
|
||||
", but found: " + nodeType);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyClusterId()
|
||||
throws InconsistentStorageStateException {
|
||||
String clusterId = getClusterID();
|
||||
Preconditions.checkNotNull(clusterId);
|
||||
if(clusterId.isEmpty()) {
|
||||
throw new InconsistentStorageStateException("Cluster ID not found");
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyCreationTime() {
|
||||
Long creationTime = getCreationTime();
|
||||
Preconditions.checkNotNull(creationTime);
|
||||
}
|
||||
|
||||
|
||||
public void writeTo(File to)
|
||||
throws IOException {
|
||||
try (RandomAccessFile file = new RandomAccessFile(to, "rws");
|
||||
FileOutputStream out = new FileOutputStream(file.getFD())) {
|
||||
file.seek(0);
|
||||
/*
|
||||
* If server is interrupted before this line,
|
||||
* the version file will remain unchanged.
|
||||
*/
|
||||
properties.store(out, null);
|
||||
/*
|
||||
* Now the new fields are flushed to the head of the file, but file
|
||||
* length can still be larger then required and therefore the file can
|
||||
* contain whole or corrupted fields from its old contents in the end.
|
||||
* If server is interrupted here and restarted later these extra fields
|
||||
* either should not effect server behavior or should be handled
|
||||
* by the server correctly.
|
||||
*/
|
||||
file.setLength(out.getChannel().position());
|
||||
}
|
||||
}
|
||||
|
||||
private Properties readFrom(File from) throws IOException {
|
||||
try (RandomAccessFile file = new RandomAccessFile(from, "rws");
|
||||
FileInputStream in = new FileInputStream(file.getFD())) {
|
||||
Properties props = new Properties();
|
||||
file.seek(0);
|
||||
props.load(in);
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate new clusterID.
|
||||
*
|
||||
* clusterID is a persistent attribute of the cluster.
|
||||
* It is generated when the cluster is created and remains the same
|
||||
* during the life cycle of the cluster. When a new SCM node is initialized,
|
||||
* if this is a new cluster, a new clusterID is generated and stored.
|
||||
* @return new clusterID
|
||||
*/
|
||||
public static String newClusterID() {
|
||||
return "CID-" + UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.common;
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.scm;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.common.Storage;
|
||||
import org.apache.hadoop.ozone.OzoneConsts.NodeType;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* SCMStorage is responsible for management of the StorageDirectories used by
|
||||
* the SCM.
|
||||
*/
|
||||
public class SCMStorage extends Storage {
|
||||
|
||||
public static final String STORAGE_DIR = "scm";
|
||||
public static final String SCM_ID = "scmUuid";
|
||||
|
||||
/**
|
||||
* Construct SCMStorage.
|
||||
* @throws IOException if any directories are inaccessible.
|
||||
*/
|
||||
public SCMStorage(OzoneConfiguration conf) throws IOException {
|
||||
super(NodeType.SCM, OzoneUtils.getScmMetadirPath(conf), STORAGE_DIR);
|
||||
}
|
||||
|
||||
public void setScmUuid(String scmUuid) throws IOException {
|
||||
if (getState() == StorageState.INITIALIZED) {
|
||||
throw new IOException("SCM is already initialized.");
|
||||
} else {
|
||||
getStorageInfo().setProperty(SCM_ID, scmUuid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the SCM ID from the version file.
|
||||
* @return SCM_ID
|
||||
*/
|
||||
public String getscmUuid() {
|
||||
return getStorageInfo().getProperty(SCM_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Properties getNodeProperties() {
|
||||
String scmUuid = getscmUuid();
|
||||
if (scmUuid == null) {
|
||||
scmUuid = UUID.randomUUID().toString();
|
||||
}
|
||||
Properties scmProperties = new Properties();
|
||||
scmProperties.setProperty(SCM_ID, scmUuid);
|
||||
return scmProperties;
|
||||
}
|
||||
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|||
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.StorageInfo;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
|
@ -82,6 +83,8 @@ import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
|||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.ozone.common.Storage.StorageState;
|
||||
import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -89,6 +92,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -100,7 +104,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -138,12 +141,45 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StorageContainerManager.class);
|
||||
|
||||
/**
|
||||
* Startup options.
|
||||
*/
|
||||
public enum StartupOption {
|
||||
INIT("-init"),
|
||||
CLUSTERID("-clusterid"),
|
||||
GENCLUSTERID("-genclusterid"),
|
||||
REGULAR("-regular"),
|
||||
HELP("-help");
|
||||
|
||||
private final String name;
|
||||
private String clusterId = null;
|
||||
|
||||
public void setClusterId(String cid) {
|
||||
if(cid != null && !cid.isEmpty()) {
|
||||
clusterId = cid;
|
||||
}
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
StartupOption(String arg) {
|
||||
this.name = arg;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* NodeManager and container Managers for SCM.
|
||||
*/
|
||||
private final NodeManager scmNodeManager;
|
||||
private final Mapping scmContainerManager;
|
||||
private final BlockManager scmBlockManager;
|
||||
private final SCMStorage scmStorage;
|
||||
|
||||
/** The RPC server that listens to requests from DataNodes. */
|
||||
private final RPC.Server datanodeRpcServer;
|
||||
|
@ -169,13 +205,18 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
/** SCM metrics. */
|
||||
private static SCMMetrics metrics;
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ "
|
||||
+ StartupOption.CLUSTERID.getName() + " <cid> ] ]\n " + "hdfs scm [ "
|
||||
+ StartupOption.GENCLUSTERID.getName() + " ]\n " + "hdfs scm [ "
|
||||
+ StartupOption.HELP.getName() + " ]\n";
|
||||
/**
|
||||
* Creates a new StorageContainerManager. Configuration will be updated with
|
||||
* information on the actual listening addresses used for RPC servers.
|
||||
*
|
||||
* @param conf configuration
|
||||
*/
|
||||
public StorageContainerManager(OzoneConfiguration conf)
|
||||
private StorageContainerManager(OzoneConfiguration conf)
|
||||
throws IOException {
|
||||
|
||||
final int handlerCount = conf.getInt(
|
||||
|
@ -184,8 +225,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
StorageContainerManager.initMetrics();
|
||||
// TODO : Fix the ClusterID generation code.
|
||||
scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
|
||||
scmStorage = new SCMStorage(conf);
|
||||
String clusterId = scmStorage.getClusterID();
|
||||
if (clusterId == null) {
|
||||
throw new SCMException("clusterId not found",
|
||||
ResultCodes.SCM_NOT_INITIALIZED);
|
||||
}
|
||||
scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID());
|
||||
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
|
||||
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
|
||||
scmContainerManager, cacheSize);
|
||||
|
@ -321,22 +367,125 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
public static void main(String[] argv) throws IOException {
|
||||
StringUtils.startupShutdownMessage(StorageContainerManager.class,
|
||||
argv, LOG);
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
try {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
if (!DFSUtil.isOzoneEnabled(conf)) {
|
||||
System.out.println("SCM cannot be started in secure mode or when " +
|
||||
OZONE_ENABLED + " is set to false");
|
||||
System.exit(1);
|
||||
StorageContainerManager scm = createSCM(argv, conf);
|
||||
if (scm != null) {
|
||||
scm.start();
|
||||
scm.join();
|
||||
}
|
||||
StorageContainerManager scm = new StorageContainerManager(conf);
|
||||
scm.start();
|
||||
scm.join();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Failed to start the StorageContainerManager.", t);
|
||||
terminate(1, t);
|
||||
}
|
||||
}
|
||||
|
||||
private static void printUsage(PrintStream out) {
|
||||
out.println(USAGE + "\n");
|
||||
}
|
||||
|
||||
public static StorageContainerManager createSCM(String[] argv,
|
||||
OzoneConfiguration conf) throws IOException {
|
||||
if (!DFSUtil.isOzoneEnabled(conf)) {
|
||||
System.err.println("SCM cannot be started in secure mode or when " +
|
||||
OZONE_ENABLED + " is set to false");
|
||||
System.exit(1);
|
||||
}
|
||||
StartupOption startOpt = parseArguments(argv);
|
||||
if (startOpt == null) {
|
||||
printUsage(System.err);
|
||||
terminate(1);
|
||||
return null;
|
||||
}
|
||||
switch (startOpt) {
|
||||
case INIT:
|
||||
terminate(scmInit(conf) ? 0 : 1);
|
||||
return null;
|
||||
case GENCLUSTERID:
|
||||
System.out.println("Generating new cluster id:");
|
||||
System.out.println(StorageInfo.newClusterID());
|
||||
terminate(0);
|
||||
return null;
|
||||
case HELP:
|
||||
printUsage(System.err);
|
||||
terminate(0);
|
||||
return null;
|
||||
default:
|
||||
return new StorageContainerManager(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Routine to set up the Version info for StorageContainerManager.
|
||||
*
|
||||
* @param conf OzoneConfiguration
|
||||
* @return true if SCM initialization is successful, false otherwise.
|
||||
* @throws IOException if init fails due to I/O error
|
||||
*/
|
||||
public static boolean scmInit(OzoneConfiguration conf) throws IOException {
|
||||
SCMStorage scmStorage = new SCMStorage(conf);
|
||||
StorageState state = scmStorage.getState();
|
||||
if (state != StorageState.INITIALIZED) {
|
||||
try {
|
||||
String clusterId = StartupOption.INIT.getClusterId();
|
||||
if (clusterId != null && !clusterId.isEmpty()) {
|
||||
scmStorage.setClusterId(clusterId);
|
||||
}
|
||||
scmStorage.initialize();
|
||||
System.out.println("SCM initialization succeeded." +
|
||||
"Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
|
||||
+ scmStorage.getClusterID());
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not initialize SCM version file", ioe);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
System.out.println("SCM already initialized. Reusing existing" +
|
||||
" cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
|
||||
+ scmStorage.getClusterID());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static StartupOption parseArguments(String[] args) {
|
||||
int argsLen = (args == null) ? 0 : args.length;
|
||||
StartupOption startOpt = StartupOption.HELP;
|
||||
if (argsLen == 0) {
|
||||
startOpt = StartupOption.REGULAR;
|
||||
}
|
||||
for (int i = 0; i < argsLen; i++) {
|
||||
String cmd = args[i];
|
||||
if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.INIT;
|
||||
if (argsLen > 3) {
|
||||
return null;
|
||||
}
|
||||
for (i = i + 1; i < argsLen; i++) {
|
||||
if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
|
||||
i++;
|
||||
if (i < argsLen && !args[i].isEmpty()) {
|
||||
startOpt.setClusterId(args[i]);
|
||||
} else {
|
||||
// if no cluster id specified or is empty string, return null
|
||||
LOG.error("Must specify a valid cluster ID after the "
|
||||
+ StartupOption.CLUSTERID.getName() + " flag");
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
} else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
|
||||
if (argsLen > 1) {
|
||||
return null;
|
||||
}
|
||||
startOpt = StartupOption.GENCLUSTERID;
|
||||
}
|
||||
}
|
||||
return startOpt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a SCMCommandRepose from the SCM Command.
|
||||
* @param cmd - Cmd
|
||||
|
|
|
@ -113,6 +113,7 @@ public class SCMException extends IOException {
|
|||
BLOCK_EXISTS,
|
||||
FAILED_TO_FIND_BLOCK,
|
||||
IO_EXCEPTION,
|
||||
UNEXPECTED_CONTAINER_STATE
|
||||
UNEXPECTED_CONTAINER_STATE,
|
||||
SCM_NOT_INITIALIZED
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.ksm.KeySpaceManager;
|
|||
import org.apache.hadoop.ozone.ksm.protocolPB
|
||||
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||
import org.apache.hadoop.ozone.scm.SCMStorage;
|
||||
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.scm.protocolPB
|
||||
|
@ -458,6 +459,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
|
|||
configureTrace();
|
||||
configureSCMheartbeat();
|
||||
configScmMetadata();
|
||||
configVersionFile();
|
||||
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
|
||||
|
@ -475,7 +477,8 @@ public final class MiniOzoneCluster extends MiniDFSCluster
|
|||
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
|
||||
randomContainerPort);
|
||||
|
||||
StorageContainerManager scm = new StorageContainerManager(conf);
|
||||
StorageContainerManager scm =
|
||||
StorageContainerManager.createSCM(null, conf);
|
||||
scm.start();
|
||||
|
||||
KeySpaceManager ksm = new KeySpaceManager(conf);
|
||||
|
@ -528,6 +531,12 @@ public final class MiniOzoneCluster extends MiniDFSCluster
|
|||
scmPath.toString() + "/datanode.id");
|
||||
}
|
||||
|
||||
private void configVersionFile() throws IOException {
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
scmStore.setClusterId(runID.toString());
|
||||
scmStore.initialize();
|
||||
}
|
||||
|
||||
private void configureHandler() {
|
||||
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
|
||||
if (!ozoneHandlerType.isPresent()) {
|
||||
|
|
|
@ -29,7 +29,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.scm.SCMStorage;
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager.StartupOption;
|
||||
import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
|
@ -39,11 +41,15 @@ import org.junit.Rule;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -349,4 +355,45 @@ public class TestStorageContainerManager {
|
|||
|
||||
return containerBlocks;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMInitialization() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final String path = GenericTestUtils.getTempPath(
|
||||
UUID.randomUUID().toString());
|
||||
Path scmPath = Paths.get(path, "scm-meta");
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
|
||||
|
||||
StartupOption.INIT.setClusterId("testClusterId");
|
||||
// This will initialize SCM
|
||||
StorageContainerManager.scmInit(conf);
|
||||
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
|
||||
Assert.assertEquals("testClusterId", scmStore.getClusterID());
|
||||
StartupOption.INIT.setClusterId("testClusterIdNew");
|
||||
StorageContainerManager.scmInit(conf);
|
||||
Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
|
||||
Assert.assertEquals("testClusterId", scmStore.getClusterID());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMReinitialization() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final String path = GenericTestUtils.getTempPath(
|
||||
UUID.randomUUID().toString());
|
||||
Path scmPath = Paths.get(path, "scm-meta");
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
|
||||
//This will set the cluster id in the version file
|
||||
MiniOzoneCluster cluster =
|
||||
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||
StartupOption.INIT.setClusterId("testClusterId");
|
||||
// This will initialize SCM
|
||||
StorageContainerManager.scmInit(conf);
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
|
||||
Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue