HDFS-10195. Ozone: Add container persistence. Contributed by Anu Engineer.
This commit is contained in:
parent
2dc48b7f1e
commit
643c5e5bdc
|
@ -188,6 +188,8 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -365,6 +367,7 @@ public class DataNode extends ReconfigurableBase
|
|||
private final String confVersion;
|
||||
private final long maxNumberOfBlocksToLog;
|
||||
private final boolean pipelineSupportECN;
|
||||
private final boolean ozoneEnabled;
|
||||
|
||||
private final List<String> usersWithLocalPathAccess;
|
||||
private final boolean connectToDnViaHostname;
|
||||
|
@ -387,6 +390,7 @@ public class DataNode extends ReconfigurableBase
|
|||
private static final int NUM_CORES = Runtime.getRuntime()
|
||||
.availableProcessors();
|
||||
private static final double CONGESTION_RATIO = 1.5;
|
||||
private OzoneContainer ozoneServer;
|
||||
|
||||
private static Tracer createTracer(Configuration conf) {
|
||||
return new Tracer.Builder("DataNode").
|
||||
|
@ -417,6 +421,7 @@ public class DataNode extends ReconfigurableBase
|
|||
this.connectToDnViaHostname = false;
|
||||
this.blockScanner = new BlockScanner(this, conf);
|
||||
this.pipelineSupportECN = false;
|
||||
this.ozoneEnabled = false;
|
||||
this.checkDiskErrorInterval =
|
||||
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
|
||||
initOOBTimeout();
|
||||
|
@ -451,6 +456,9 @@ public class DataNode extends ReconfigurableBase
|
|||
this.pipelineSupportECN = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
|
||||
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
|
||||
this.ozoneEnabled = conf.getBoolean(OzoneConfigKeys
|
||||
.DFS_OBJECTSTORE_ENABLED_KEY, OzoneConfigKeys
|
||||
.DFS_OBJECTSTORE_ENABLED_DEFAULT);
|
||||
|
||||
confVersion = "core-" +
|
||||
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
|
||||
|
@ -1540,6 +1548,15 @@ public class DataNode extends ReconfigurableBase
|
|||
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
||||
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
|
||||
initDirectoryScanner(conf);
|
||||
if(this.ozoneEnabled) {
|
||||
try {
|
||||
ozoneServer = new OzoneContainer(conf, this.getFSDataset());
|
||||
ozoneServer.start();
|
||||
LOG.info("Ozone container server started.");
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Unable to start Ozone. ex: {}", ex.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<BPOfferService> getAllBpOs() {
|
||||
|
@ -1828,6 +1845,17 @@ public class DataNode extends ReconfigurableBase
|
|||
*/
|
||||
public void shutdown() {
|
||||
stopMetricsLogger();
|
||||
|
||||
if(this.ozoneEnabled) {
|
||||
if(ozoneServer != null) {
|
||||
try {
|
||||
ozoneServer.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error is ozone shutdown. ex {}", e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (plugins != null) {
|
||||
for (ServicePlugin p : plugins) {
|
||||
try {
|
||||
|
|
|
@ -41,6 +41,9 @@ public final class OzoneConfigKeys {
|
|||
"dfs.objectstore.trace.enabled";
|
||||
public static final boolean DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT = false;
|
||||
|
||||
public static final String DFS_OZONE_METADATA_DIRS =
|
||||
"dfs.ozone.metadata.dirs";
|
||||
|
||||
|
||||
/**
|
||||
* There is no need to instantiate this class.
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.web.utils;
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
|
@ -53,6 +53,20 @@ public final class OzoneConsts {
|
|||
public static final String OZONE_USER = "user";
|
||||
public static final String OZONE_REQUEST = "request";
|
||||
|
||||
public static final String CONTAINER_EXTENSION = ".container";
|
||||
public static final String CONTAINER_META = ".meta";
|
||||
|
||||
// container storage is in the following format.
|
||||
// Data Volume basePath/containers/<containerName>/metadata and
|
||||
// Data Volume basePath/containers/<containerName>/data/...
|
||||
public static final String CONTAINER_PREFIX = "containers";
|
||||
public static final String CONTAINER_META_PATH = "metadata";
|
||||
public static final String CONTAINER_DATA_PATH = "data";
|
||||
public static final String CONTAINER_ROOT_PREFIX = "repository";
|
||||
|
||||
public static final String CONTAINER_DB = "container.db";
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Supports Bucket Versioning.
|
|
@ -35,8 +35,9 @@ public class ContainerData {
|
|||
|
||||
private final String containerName;
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
private String path;
|
||||
private String dbPath; // Path to Level DB Store.
|
||||
// Path to Physical file system where container and checksum are stored.
|
||||
private String containerFilePath;
|
||||
|
||||
/**
|
||||
* Constructs a ContainerData Object.
|
||||
|
@ -63,8 +64,13 @@ public class ContainerData {
|
|||
}
|
||||
|
||||
if (protoData.hasContainerPath()) {
|
||||
data.setPath(protoData.getContainerPath());
|
||||
data.setContainerPath(protoData.getContainerPath());
|
||||
}
|
||||
|
||||
if (protoData.hasDbPath()) {
|
||||
data.setDBPath(protoData.getDbPath());
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -77,9 +83,15 @@ public class ContainerData {
|
|||
ContainerProtos.ContainerData.Builder builder = ContainerProtos
|
||||
.ContainerData.newBuilder();
|
||||
builder.setName(this.getContainerName());
|
||||
if (this.getPath() != null) {
|
||||
builder.setContainerPath(this.getPath());
|
||||
|
||||
if (this.getDBPath() != null) {
|
||||
builder.setDbPath(this.getDBPath());
|
||||
}
|
||||
|
||||
if (this.getContainerPath() != null) {
|
||||
builder.setContainerPath(this.getContainerPath());
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||
ContainerProtos.KeyValue.Builder keyValBuilder =
|
||||
ContainerProtos.KeyValue.newBuilder();
|
||||
|
@ -144,8 +156,8 @@ public class ContainerData {
|
|||
*
|
||||
* @return - path
|
||||
*/
|
||||
public String getPath() {
|
||||
return path;
|
||||
public String getDBPath() {
|
||||
return dbPath;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -153,8 +165,8 @@ public class ContainerData {
|
|||
*
|
||||
* @param path - String.
|
||||
*/
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
public void setDBPath(String path) {
|
||||
this.dbPath = path;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,4 +179,21 @@ public class ContainerData {
|
|||
public String getName() {
|
||||
return getContainerName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get container file path.
|
||||
* @return - Physical path where container file and checksum is stored.
|
||||
*/
|
||||
public String getContainerPath() {
|
||||
return containerFilePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set container Path.
|
||||
* @param containerFilePath - File path.
|
||||
*/
|
||||
public void setContainerPath(String containerFilePath) {
|
||||
this.containerFilePath = containerFilePath;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,23 @@
|
|||
package org.apache.hadoop.ozone.container.common.helpers;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import static org.apache.commons.io.FilenameUtils.removeExtension;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
|
||||
|
||||
/**
|
||||
* A set of helper functions to create proper responses.
|
||||
|
@ -104,6 +120,134 @@ public final class ContainerUtils {
|
|||
"Server does not support this command yet.").build();
|
||||
}
|
||||
|
||||
/**
|
||||
* get containerName from a container file.
|
||||
*
|
||||
* @param containerFile - File
|
||||
* @return Name of the container.
|
||||
*/
|
||||
public static String getContainerNameFromFile(File containerFile) {
|
||||
Preconditions.checkNotNull(containerFile);
|
||||
return Paths.get(containerFile.getParent()).resolve(
|
||||
removeExtension(containerFile.getName())).toString();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that this in indeed a new container.
|
||||
*
|
||||
* @param containerFile - Container File to verify
|
||||
* @param metadataFile - metadata File to verify
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void verifyIsNewContainer(File containerFile, File metadataFile)
|
||||
throws IOException {
|
||||
Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
|
||||
if (containerFile.exists()) {
|
||||
log.error("container already exists on disk. File: {}",
|
||||
containerFile.toPath());
|
||||
throw new FileAlreadyExistsException("container already exists on " +
|
||||
"disk.");
|
||||
}
|
||||
|
||||
if (metadataFile.exists()) {
|
||||
log.error("metadata found on disk, but missing container. Refusing to" +
|
||||
" write this container. File: {} ", metadataFile.toPath());
|
||||
throw new FileAlreadyExistsException(("metadata found on disk, but " +
|
||||
"missing container. Refusing to write this container."));
|
||||
}
|
||||
|
||||
File parentPath = new File(containerFile.getParent());
|
||||
|
||||
if (!parentPath.exists() && !parentPath.mkdirs()) {
|
||||
log.error("Unable to create parent path. Path: {}",
|
||||
parentPath.toString());
|
||||
throw new IOException("Unable to create container directory.");
|
||||
}
|
||||
|
||||
if (!containerFile.createNewFile()) {
|
||||
log.error("creation of a new container file failed. File: {}",
|
||||
containerFile.toPath());
|
||||
throw new IOException("creation of a new container file failed.");
|
||||
}
|
||||
|
||||
if (!metadataFile.createNewFile()) {
|
||||
log.error("creation of the metadata file failed. File: {}",
|
||||
metadataFile.toPath());
|
||||
throw new IOException("creation of a new container file failed.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a Metadata DB for the specified container.
|
||||
*
|
||||
* @param containerPath - Container Path.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Path createMetadata(Path containerPath) throws IOException {
|
||||
Preconditions.checkNotNull(containerPath);
|
||||
containerPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
|
||||
if (!containerPath.toFile().mkdirs()) {
|
||||
throw new IOException("Unable to create directory for metadata storage." +
|
||||
" Path {}" + containerPath);
|
||||
}
|
||||
containerPath = containerPath.resolve(OzoneConsts.CONTAINER_DB);
|
||||
LevelDBStore store = new LevelDBStore(containerPath.toFile(), true);
|
||||
|
||||
// we close since the SCM pre-creates containers.
|
||||
// we will open and put Db handle into a cache when keys are being created
|
||||
// in a container.
|
||||
|
||||
store.close();
|
||||
return containerPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* remove Container if it is empty.
|
||||
* <p/>
|
||||
* There are three things we need to delete.
|
||||
* <p/>
|
||||
* 1. Container file and metadata file. 2. The Level DB file 3. The path that
|
||||
* we created on the data location.
|
||||
*
|
||||
* @param containerData - Data of the container to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void removeContainer(ContainerData containerData) throws
|
||||
IOException {
|
||||
Preconditions.checkNotNull(containerData);
|
||||
|
||||
// TODO : Check if there are any keys. This needs to be done
|
||||
// by calling into key layer code, hence this is a TODO for now.
|
||||
|
||||
Path dbPath = Paths.get(containerData.getDBPath());
|
||||
|
||||
// Delete the DB File.
|
||||
FileUtils.forceDelete(dbPath.toFile());
|
||||
dbPath = dbPath.getParent();
|
||||
|
||||
// Delete all Metadata in the Data directories for this containers.
|
||||
if (dbPath != null) {
|
||||
FileUtils.deleteDirectory(dbPath.toFile());
|
||||
dbPath = dbPath.getParent();
|
||||
}
|
||||
|
||||
// now delete the container directory, this means that all key data dirs
|
||||
// will be removed too.
|
||||
if (dbPath != null) {
|
||||
FileUtils.deleteDirectory(dbPath.toFile());
|
||||
}
|
||||
|
||||
// Delete the container metadata from the metadata locations.
|
||||
String rootPath = getContainerNameFromFile(new File(containerData
|
||||
.getContainerPath()));
|
||||
Path containerPath = Paths.get(rootPath.concat(CONTAINER_EXTENSION));
|
||||
Path metaPath = Paths.get(rootPath.concat(CONTAINER_META));
|
||||
|
||||
FileUtils.forceDelete(containerPath.toFile());
|
||||
FileUtils.forceDelete(metaPath.toFile());
|
||||
}
|
||||
|
||||
private ContainerUtils() {
|
||||
//never constructed.
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
|
||||
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A class that tells the ContainerManager where to place the containers.
|
||||
* Please note : There is *no* one-to-one correlation between metadata
|
||||
* locations and data locations.
|
||||
*
|
||||
* For example : A user could map all container files to a
|
||||
* SSD but leave data/metadata on bunch of other disks.
|
||||
*/
|
||||
public class ContainerLocationManagerImpl implements ContainerLocationManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerLocationManagerImpl.class);
|
||||
|
||||
|
||||
private final Configuration conf;
|
||||
private final FsDatasetSpi dataset;
|
||||
private final Path[] volumePaths;
|
||||
private int currentIndex;
|
||||
private final List<Path> locations;
|
||||
|
||||
|
||||
/**
|
||||
* Constructs a Location Manager.
|
||||
* @param conf - Configuration.
|
||||
*/
|
||||
public ContainerLocationManagerImpl(Configuration conf, List<Path> locations,
|
||||
FsDatasetSpi dataset) throws IOException {
|
||||
this.conf = conf;
|
||||
this.dataset = dataset;
|
||||
List<Path> pathList = new LinkedList<>();
|
||||
FsDatasetSpi.FsVolumeReferences references;
|
||||
try {
|
||||
synchronized (this.dataset) {
|
||||
references = this.dataset.getFsVolumeReferences();
|
||||
for (int ndx = 0; ndx < references.size(); ndx++) {
|
||||
FsVolumeSpi vol = references.get(ndx);
|
||||
pathList.add(Paths.get(vol.getBasePath()));
|
||||
}
|
||||
references.close();
|
||||
volumePaths = pathList.toArray(new Path[pathList.size()]);
|
||||
this.locations = locations;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Unable to get volume paths.", ex);
|
||||
throw new IOException("Internal error", ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the path where the container should be placed from a set of
|
||||
* locations.
|
||||
*
|
||||
* @return A path where we should place this container and metadata.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Path getContainerPath()
|
||||
throws IOException {
|
||||
Preconditions.checkState(locations.size() > 0);
|
||||
int index = currentIndex % locations.size();
|
||||
return locations.get(index).resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the path where the container Data file are stored.
|
||||
*
|
||||
* @return a path where we place the LevelDB and data files of a container.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Path getDataPath(String containerName) throws IOException {
|
||||
Path currentPath = volumePaths[currentIndex++ % volumePaths.length];
|
||||
currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX);
|
||||
return currentPath.resolve(containerName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,514 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.DigestInputStream;
|
||||
import java.security.DigestOutputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
|
||||
|
||||
/**
|
||||
* A Generic ContainerManagerImpl that will be called from Ozone
|
||||
* ContainerManagerImpl. This allows us to support delta changes to ozone
|
||||
* version without having to rewrite the containerManager.
|
||||
*/
|
||||
public class ContainerManagerImpl implements ContainerManager {
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerManagerImpl.class);
|
||||
|
||||
private final ConcurrentSkipListMap<String, ContainerStatus>
|
||||
containerMap = new ConcurrentSkipListMap<>();
|
||||
|
||||
// This lock follows fair locking policy of first-come first-serve
|
||||
// for waiting threads.
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
private ContainerLocationManager locationManager;
|
||||
|
||||
/**
|
||||
* Init call that sets up a container Manager.
|
||||
*
|
||||
* @param config - Configuration.
|
||||
* @param containerDirs - List of Metadata Container locations.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void init(Configuration config, List<Path> containerDirs,
|
||||
FsDatasetSpi dataset)
|
||||
throws IOException {
|
||||
|
||||
Preconditions.checkNotNull(config);
|
||||
Preconditions.checkNotNull(containerDirs);
|
||||
Preconditions.checkState(containerDirs.size() > 0);
|
||||
|
||||
readLock();
|
||||
try {
|
||||
for (Path path : containerDirs) {
|
||||
File directory = path.toFile();
|
||||
if (!directory.isDirectory()) {
|
||||
LOG.error("Invalid path to container metadata directory. path: {}",
|
||||
path.toString());
|
||||
throw new IOException("Invalid path to container metadata directory" +
|
||||
". " + path);
|
||||
}
|
||||
File[] files = directory.listFiles(new ContainerFilter());
|
||||
if (files != null) {
|
||||
for (File containerFile : files) {
|
||||
String containerPath =
|
||||
ContainerUtils.getContainerNameFromFile(containerFile);
|
||||
Preconditions.checkNotNull(containerPath);
|
||||
readContainerInfo(containerPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.locationManager = new ContainerLocationManagerImpl(config,
|
||||
containerDirs, dataset);
|
||||
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the Container Info from a file and verifies that checksum match. If
|
||||
* the checksums match, then that file is added to containerMap.
|
||||
*
|
||||
* @param containerName - Name which points to the persisted container.
|
||||
*/
|
||||
private void readContainerInfo(String containerName)
|
||||
throws IOException {
|
||||
Preconditions.checkState(containerName.length() > 0);
|
||||
FileInputStream containerStream = null;
|
||||
DigestInputStream dis = null;
|
||||
FileInputStream metaStream = null;
|
||||
Path cPath = Paths.get(containerName).getFileName();
|
||||
String keyName = null;
|
||||
if (cPath != null) {
|
||||
keyName = cPath.toString();
|
||||
}
|
||||
Preconditions.checkNotNull(keyName);
|
||||
|
||||
try {
|
||||
String containerFileName = containerName.concat(CONTAINER_EXTENSION);
|
||||
String metaFileName = containerName.concat(CONTAINER_META);
|
||||
|
||||
containerStream = new FileInputStream(containerFileName);
|
||||
|
||||
metaStream = new FileInputStream(metaFileName);
|
||||
|
||||
MessageDigest sha = MessageDigest.getInstance("SHA-256");
|
||||
|
||||
dis = new DigestInputStream(containerStream, sha);
|
||||
|
||||
ContainerData containerData = ContainerData.getFromProtBuf(
|
||||
ContainerProtos.ContainerData.parseDelimitedFrom(dis));
|
||||
|
||||
|
||||
ContainerProtos.ContainerMeta meta = ContainerProtos.ContainerMeta
|
||||
.parseDelimitedFrom(metaStream);
|
||||
|
||||
if (meta != null && !DigestUtils.sha256Hex(sha.digest()).equals(meta
|
||||
.getHash())) {
|
||||
throw new IOException("Invalid SHA found for file.");
|
||||
}
|
||||
|
||||
containerMap.put(keyName, new ContainerStatus(containerData, true));
|
||||
|
||||
} catch (IOException | NoSuchAlgorithmException ex) {
|
||||
LOG.error("read failed for file: {} ex: {}",
|
||||
containerName, ex.getMessage());
|
||||
|
||||
// TODO : Add this file to a recovery Queue.
|
||||
|
||||
// Remember that this container is busted and we cannot use it.
|
||||
containerMap.put(keyName, new ContainerStatus(null, false));
|
||||
} finally {
|
||||
IOUtils.closeStream(dis);
|
||||
IOUtils.closeStream(containerStream);
|
||||
IOUtils.closeStream(metaStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a container with the given name.
|
||||
*
|
||||
* @param pipeline -- Nodes which make up this container.
|
||||
* @param containerData - Container Name and metadata.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void createContainer(Pipeline pipeline, ContainerData containerData)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(containerData);
|
||||
|
||||
writeLock();
|
||||
try {
|
||||
if (containerMap.containsKey(containerData.getName())) {
|
||||
throw new FileAlreadyExistsException("container already exists.");
|
||||
}
|
||||
|
||||
// This is by design. We first write and close the
|
||||
// container Info and metadata to a directory.
|
||||
// Then read back and put that info into the containerMap.
|
||||
// This allows us to make sure that our write is consistent.
|
||||
|
||||
writeContainerInfo(containerData);
|
||||
File cFile = new File(containerData.getContainerPath());
|
||||
readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
|
||||
} catch (NoSuchAlgorithmException ex) {
|
||||
throw new IOException("failed to create container", ex);
|
||||
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a container to a chosen location and updates the container Map.
|
||||
*
|
||||
* The file formats of ContainerData and Container Meta is the following.
|
||||
*
|
||||
* message ContainerData {
|
||||
* required string name = 1;
|
||||
* repeated KeyValue metadata = 2;
|
||||
* optional string dbPath = 3;
|
||||
* optional string containerPath = 4;
|
||||
* }
|
||||
*
|
||||
* message ContainerMeta {
|
||||
* required string fileName = 1;
|
||||
* required string hash = 2;
|
||||
* }
|
||||
*
|
||||
* @param containerData - container Data
|
||||
*/
|
||||
private void writeContainerInfo(ContainerData containerData)
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
|
||||
Preconditions.checkNotNull(this.locationManager);
|
||||
|
||||
FileOutputStream containerStream = null;
|
||||
DigestOutputStream dos = null;
|
||||
FileOutputStream metaStream = null;
|
||||
Path location = locationManager.getContainerPath();
|
||||
|
||||
File containerFile = location.resolve(containerData
|
||||
.getContainerName().concat(CONTAINER_EXTENSION))
|
||||
.toFile();
|
||||
|
||||
File metadataFile = location.resolve(containerData
|
||||
.getContainerName().concat(CONTAINER_META))
|
||||
.toFile();
|
||||
|
||||
try {
|
||||
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
|
||||
|
||||
Path metadataPath = this.locationManager.getDataPath(
|
||||
containerData.getContainerName());
|
||||
metadataPath = ContainerUtils.createMetadata(metadataPath);
|
||||
|
||||
containerStream = new FileOutputStream(containerFile);
|
||||
metaStream = new FileOutputStream(metadataFile);
|
||||
MessageDigest sha = MessageDigest.getInstance("SHA-256");
|
||||
|
||||
dos = new DigestOutputStream(containerStream, sha);
|
||||
containerData.setDBPath(metadataPath.toString());
|
||||
containerData.setContainerPath(containerFile.toString());
|
||||
|
||||
ContainerProtos.ContainerData protoData = containerData
|
||||
.getProtoBufMessage();
|
||||
protoData.writeDelimitedTo(dos);
|
||||
|
||||
ContainerProtos.ContainerMeta protoMeta = ContainerProtos
|
||||
.ContainerMeta.newBuilder()
|
||||
.setFileName(containerFile.toString())
|
||||
.setHash(DigestUtils.sha256Hex(sha.digest()))
|
||||
.build();
|
||||
protoMeta.writeDelimitedTo(metaStream);
|
||||
|
||||
} catch (IOException ex) {
|
||||
|
||||
// TODO : we need to clean up partially constructed files
|
||||
// The proper way to do would be for a thread
|
||||
// to read all these 3 artifacts and make sure they are
|
||||
// sane. That info needs to come from the replication
|
||||
// pipeline, and if not consistent delete these file.
|
||||
|
||||
// In case of ozone this is *not* a deal breaker since
|
||||
// SCM is guaranteed to generate unique container names.
|
||||
|
||||
LOG.error("creation of container failed. Name: {} "
|
||||
, containerData.getContainerName());
|
||||
throw ex;
|
||||
} finally {
|
||||
IOUtils.closeStream(dos);
|
||||
IOUtils.closeStream(containerStream);
|
||||
IOUtils.closeStream(metaStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes an existing container.
|
||||
*
|
||||
* @param pipeline - nodes that make this container.
|
||||
* @param containerName - name of the container.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void deleteContainer(Pipeline pipeline, String containerName) throws
|
||||
IOException {
|
||||
Preconditions.checkState(containerName.length() > 0);
|
||||
writeLock();
|
||||
try {
|
||||
ContainerStatus status = containerMap.get(containerName);
|
||||
if (status == null) {
|
||||
LOG.info("No such container. Name: {}", containerName);
|
||||
throw new IOException("No such container. Name : " + containerName);
|
||||
}
|
||||
ContainerUtils.removeContainer(status.containerData);
|
||||
containerMap.remove(containerName);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple interface for container Iterations.
|
||||
* <p/>
|
||||
* This call make no guarantees about consistency of the data between
|
||||
* different list calls. It just returns the best known data at that point of
|
||||
* time. It is possible that using this iteration you can miss certain
|
||||
* container from the listing.
|
||||
*
|
||||
* @param prevKey - Previous Key Value or empty String.
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void listContainer(String prevKey, long count,
|
||||
List<ContainerData> data) throws IOException {
|
||||
Preconditions.checkNotNull(data);
|
||||
readLock();
|
||||
try {
|
||||
ConcurrentNavigableMap<String, ContainerStatus> map = null;
|
||||
if (prevKey.length() == 0) {
|
||||
map = containerMap.tailMap(containerMap.firstKey(), true);
|
||||
} else {
|
||||
map = containerMap.tailMap(prevKey, false);
|
||||
}
|
||||
|
||||
int currentCount = 0;
|
||||
for (ContainerStatus entry : map.values()) {
|
||||
if (currentCount < count) {
|
||||
data.add(entry.getContainer());
|
||||
currentCount++;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metadata about a specific container.
|
||||
*
|
||||
* @param containerName - Name of the container
|
||||
* @return ContainerData - Container Data.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ContainerData readContainer(String containerName) throws IOException {
|
||||
return containerMap.get(containerName).getContainer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports clean shutdown of container.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
|
||||
return containerMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire read lock.
|
||||
*/
|
||||
@Override
|
||||
public void readLock() {
|
||||
this.lock.readLock().lock();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Release read lock.
|
||||
*/
|
||||
@Override
|
||||
public void readUnlock() {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the current thread holds read lock.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasReadLock() {
|
||||
return this.lock.readLock().tryLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire write lock.
|
||||
*/
|
||||
@Override
|
||||
public void writeLock() {
|
||||
this.lock.writeLock().lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire write lock, unless interrupted while waiting.
|
||||
*/
|
||||
@Override
|
||||
public void writeLockInterruptibly() throws InterruptedException {
|
||||
this.lock.writeLock().lockInterruptibly();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Release write lock.
|
||||
*/
|
||||
@Override
|
||||
public void writeUnlock() {
|
||||
this.lock.writeLock().unlock();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the current thread holds write lock.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasWriteLock() {
|
||||
return this.lock.writeLock().isHeldByCurrentThread();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter out only container files from the container metadata dir.
|
||||
*/
|
||||
private static class ContainerFilter implements FilenameFilter {
|
||||
/**
|
||||
* Tests if a specified file should be included in a file list.
|
||||
*
|
||||
* @param dir the directory in which the file was found.
|
||||
* @param name the name of the file.
|
||||
* @return <code>true</code> if and only if the name should be included in
|
||||
* the file list; <code>false</code> otherwise.
|
||||
*/
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.endsWith(CONTAINER_EXTENSION);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an immutable class that represents the state of a container. if the
|
||||
* container reading encountered an error when we boot up we will post that
|
||||
* info to a recovery queue and keep the info in the containerMap.
|
||||
* <p/>
|
||||
* if and when the issue is fixed, the expectation is that this entry will be
|
||||
* deleted by the recovery thread from the containerMap and will insert entry
|
||||
* instead of modifying this class.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class ContainerStatus {
|
||||
private final ContainerData containerData;
|
||||
private final boolean active;
|
||||
|
||||
/**
|
||||
* Creates a Container Status class.
|
||||
*
|
||||
* @param containerData - ContainerData.
|
||||
* @param active - Active or not active.
|
||||
*/
|
||||
public ContainerStatus(ContainerData containerData, boolean active) {
|
||||
this.containerData = containerData;
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container if it is active. It is not active if we have had an
|
||||
* error and we are waiting for the background threads to fix the issue.
|
||||
*
|
||||
* @return ContainerData.
|
||||
*/
|
||||
public ContainerData getContainer() {
|
||||
if (active) {
|
||||
return containerData;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates if a container is Active.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
/**
|
||||
* Returns physical path locations, where the containers will be created.
|
||||
*/
|
||||
public interface ContainerLocationManager {
|
||||
/**
|
||||
* Returns the path where the container should be placed from a set of
|
||||
* locations.
|
||||
*
|
||||
* @return A path where we should place this container and metadata.
|
||||
* @throws IOException
|
||||
*/
|
||||
Path getContainerPath() throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the path where the container Data file are stored.
|
||||
*
|
||||
* @return a path where we place the LevelDB and data files of a container.
|
||||
* @throws IOException
|
||||
*/
|
||||
Path getDataPath(String containerName) throws IOException;
|
||||
|
||||
}
|
|
@ -20,18 +20,35 @@ package org.apache.hadoop.ozone.container.common.interfaces;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for container operations.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface ContainerManager {
|
||||
public interface ContainerManager extends RwLock {
|
||||
|
||||
/**
|
||||
* Init call that sets up a container Manager.
|
||||
*
|
||||
* @param config - Configuration.
|
||||
* @param containerDirs - List of Metadata Container locations.
|
||||
* @param dataset - FSDataset.
|
||||
* @throws IOException
|
||||
*/
|
||||
void init(Configuration config, List<Path> containerDirs,
|
||||
FsDatasetSpi dataset)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a container with the given name.
|
||||
|
@ -56,20 +73,28 @@ public interface ContainerManager {
|
|||
/**
|
||||
* As simple interface for container Iterations.
|
||||
*
|
||||
* @param start - Starting index
|
||||
* @param prevKey - Starting KeyValue
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @throws IOException
|
||||
*/
|
||||
void listContainer(long start, long count, List<ContainerData> data)
|
||||
void listContainer(String prevKey, long count, List<ContainerData> data)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Get metadata about a specific container.
|
||||
*
|
||||
* @param containerName - Name of the container
|
||||
* @return ContainerData
|
||||
* @return ContainerData - Container Data.
|
||||
* @throws IOException
|
||||
*/
|
||||
ContainerData readContainer(String containerName) throws IOException;
|
||||
|
||||
/**
|
||||
* Supports clean shutdown of container.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
void shutdown() throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -26,10 +26,10 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
|||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -42,7 +42,7 @@ import java.io.IOException;
|
|||
public class XceiverClient {
|
||||
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
|
||||
private final Pipeline pipeline;
|
||||
private final OzoneConfiguration config;
|
||||
private final Configuration config;
|
||||
private ChannelFuture channelFuture;
|
||||
private Bootstrap b;
|
||||
private EventLoopGroup group;
|
||||
|
@ -53,7 +53,7 @@ public class XceiverClient {
|
|||
* @param pipeline - Pipeline that defines the machines.
|
||||
* @param config -- Ozone Config
|
||||
*/
|
||||
public XceiverClient(Pipeline pipeline, OzoneConfiguration config) {
|
||||
public XceiverClient(Pipeline pipeline, Configuration config) {
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
Preconditions.checkNotNull(config);
|
||||
this.pipeline = pipeline;
|
||||
|
|
|
@ -26,8 +26,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
|||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
|
||||
/**
|
||||
|
@ -47,7 +47,7 @@ public final class XceiverServer {
|
|||
*
|
||||
* @param conf - Configuration
|
||||
*/
|
||||
public XceiverServer(OzoneConfiguration conf,
|
||||
public XceiverServer(Configuration conf,
|
||||
ContainerDispatcher dispatcher) {
|
||||
Preconditions.checkNotNull(conf);
|
||||
this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||
|
|
|
@ -16,43 +16,45 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.web.localstorage;
|
||||
package org.apache.hadoop.ozone.container.common.utils;
|
||||
|
||||
import org.fusesource.leveldbjni.JniDBFactory;
|
||||
import org.iq80.leveldb.DB;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
import org.iq80.leveldb.Options;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* OzoneLevelDBStore is used by the local
|
||||
* OzoneStore which is used in testing.
|
||||
* LevelDB interface.
|
||||
*/
|
||||
class OzoneLevelDBStore {
|
||||
public class LevelDBStore {
|
||||
private DB db;
|
||||
private final File dbFile;
|
||||
|
||||
/**
|
||||
* Opens a DB file.
|
||||
*
|
||||
* @param dbPath - DB File path
|
||||
* @param dbPath - DB File path
|
||||
* @param createIfMissing - Create if missing
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
OzoneLevelDBStore(File dbPath, boolean createIfMissing) throws IOException {
|
||||
public LevelDBStore(File dbPath, boolean createIfMissing) throws
|
||||
IOException {
|
||||
Options options = new Options();
|
||||
options.createIfMissing(createIfMissing);
|
||||
db = JniDBFactory.factory.open(dbPath, options);
|
||||
if (db == null) {
|
||||
throw new IOException("Db is null");
|
||||
}
|
||||
this.dbFile = dbPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts a Key into file.
|
||||
*
|
||||
* @param key - key
|
||||
* @param key - key
|
||||
* @param value - value
|
||||
*/
|
||||
public void put(byte[] key, byte[] value) {
|
||||
|
@ -63,7 +65,6 @@ class OzoneLevelDBStore {
|
|||
* Get Key.
|
||||
*
|
||||
* @param key key
|
||||
*
|
||||
* @return value
|
||||
*/
|
||||
public byte[] get(byte[] key) {
|
||||
|
@ -87,4 +88,37 @@ class OzoneLevelDBStore {
|
|||
public void close() throws IOException {
|
||||
db.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the DB is empty.
|
||||
*
|
||||
* @return boolean
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean isEmpty() throws IOException {
|
||||
DBIterator iter = db.iterator();
|
||||
try {
|
||||
iter.seekToFirst();
|
||||
return iter.hasNext();
|
||||
} finally {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns Java File Object that points to the DB.
|
||||
* @return File
|
||||
*/
|
||||
public File getDbFile() {
|
||||
return dbFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the actual levelDB object.
|
||||
* @return DB handle.
|
||||
*/
|
||||
public DB getDB() {
|
||||
return db;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.nio.file.Path;
|
||||
|
||||
/**
|
||||
* Ozone main class sets up the network server and initializes the container
|
||||
* layer.
|
||||
*/
|
||||
public class OzoneContainer {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneContainer.class);
|
||||
|
||||
private final Configuration ozoneConfig;
|
||||
private final FsDatasetSpi dataSet;
|
||||
private final ContainerDispatcher dispatcher;
|
||||
private final ContainerManager manager;
|
||||
private final XceiverServer server;
|
||||
|
||||
/**
|
||||
* Creates a network endpoint and enables Ozone container.
|
||||
*
|
||||
* @param ozoneConfig - Config
|
||||
* @param dataSet - FsDataset.
|
||||
* @throws IOException
|
||||
*/
|
||||
public OzoneContainer(Configuration ozoneConfig, FsDatasetSpi dataSet) throws
|
||||
Exception {
|
||||
List<Path> locations = new LinkedList<>();
|
||||
String[] paths = ozoneConfig.getStrings(OzoneConfigKeys
|
||||
.DFS_OZONE_METADATA_DIRS);
|
||||
if (paths != null && paths.length > 0) {
|
||||
for (String p : paths) {
|
||||
locations.add(Paths.get(p));
|
||||
}
|
||||
} else {
|
||||
getDataDir(dataSet, locations);
|
||||
}
|
||||
|
||||
this.ozoneConfig = ozoneConfig;
|
||||
this.dataSet = dataSet;
|
||||
|
||||
manager = new ContainerManagerImpl();
|
||||
manager.init(this.ozoneConfig, locations, this.dataSet);
|
||||
|
||||
this.dispatcher = new Dispatcher(manager);
|
||||
server = new XceiverServer(this.ozoneConfig, this.dispatcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts serving requests to ozone container.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void start() throws Exception {
|
||||
server.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the ozone container.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void stop() throws Exception {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a paths to data dirs.
|
||||
* @param dataset - FSDataset.
|
||||
* @param pathList - List of paths.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void getDataDir(FsDatasetSpi dataset, List<Path> pathList) throws
|
||||
IOException {
|
||||
FsDatasetSpi.FsVolumeReferences references;
|
||||
try {
|
||||
synchronized (dataset) {
|
||||
references = dataset.getFsVolumeReferences();
|
||||
for (int ndx = 0; ndx < references.size(); ndx++) {
|
||||
FsVolumeSpi vol = references.get(ndx);
|
||||
pathList.add(Paths.get(vol.getBasePath()));
|
||||
}
|
||||
references.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Unable to get volume paths.", ex);
|
||||
throw new IOException("Internal error", ex);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
/**
|
||||
Ozone main that calls into the container layer
|
||||
**/
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
|||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.response.KeyInfo;
|
||||
import org.apache.hadoop.ozone.web.response.ListKeys;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpRequest;
|
||||
import org.apache.http.HttpRequestInterceptor;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
|||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.response.ListBuckets;
|
||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpResponse;
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.web.handlers;
|
|||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
|||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.interfaces.Bucket;
|
||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
|
@ -37,7 +37,7 @@ import java.io.IOException;
|
|||
|
||||
import static java.net.HttpURLConnection.HTTP_CREATED;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_FUNCTION;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_FUNCTION;
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
|||
import org.apache.hadoop.ozone.web.interfaces.UserAuth;
|
||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.response.ListKeys;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
@ -47,10 +47,11 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_USER;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_USER;
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -44,10 +44,11 @@ import static org.apache.hadoop.ozone.web.exceptions.ErrorTable.INVALID_BUCKET_N
|
|||
import static org.apache.hadoop.ozone.web.exceptions.ErrorTable.INVALID_REQUEST;
|
||||
import static org.apache.hadoop.ozone.web.exceptions.ErrorTable.SERVER_ERROR;
|
||||
import static org.apache.hadoop.ozone.web.exceptions.ErrorTable.newError;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_USER;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_USER;
|
||||
|
||||
|
||||
/**
|
||||
* This class abstracts way the repetitive tasks in Key handling code.
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.io.IOException;
|
|||
|
||||
import static java.net.HttpURLConnection.HTTP_CREATED;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_FUNCTION;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_FUNCTION;
|
||||
|
||||
/**
|
||||
* VolumeHandler handles volume specific HTTP calls.
|
||||
|
|
|
@ -41,10 +41,10 @@ import java.nio.file.FileAlreadyExistsException;
|
|||
import java.nio.file.NoSuchFileException;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.web.utils.OzoneConsts.OZONE_USER;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_COMPONENT;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_RESOURCE;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_REQUEST;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_USER;
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.slf4j.Logger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||
|
@ -37,7 +38,7 @@ import org.apache.hadoop.ozone.web.response.ListBuckets;
|
|||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.web.response.VolumeOwner;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
|
@ -125,8 +126,8 @@ public final class OzoneMetadataManager {
|
|||
private static final String USER_DB = "/user.db";
|
||||
private static final String META_DB = "/metadata.db";
|
||||
private static OzoneMetadataManager bm = null;
|
||||
private OzoneLevelDBStore userDB;
|
||||
private OzoneLevelDBStore metadataDB;
|
||||
private LevelDBStore userDB;
|
||||
private LevelDBStore metadataDB;
|
||||
private ReadWriteLock lock;
|
||||
private Charset encoding = Charset.forName("UTF-8");
|
||||
private String storageRoot;
|
||||
|
@ -154,8 +155,8 @@ public final class OzoneMetadataManager {
|
|||
}
|
||||
|
||||
try {
|
||||
userDB = new OzoneLevelDBStore(new File(storageRoot + USER_DB), true);
|
||||
metadataDB = new OzoneLevelDBStore(new File(storageRoot + META_DB), true);
|
||||
userDB = new LevelDBStore(new File(storageRoot + USER_DB), true);
|
||||
metadataDB = new LevelDBStore(new File(storageRoot + META_DB), true);
|
||||
inProgressObjects = new ConcurrentHashMap<>();
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Cannot open db :" + ex.getMessage());
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.apache.hadoop.ozone.web.request;
|
||||
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.web.response;
|
|||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.codehaus.jackson.annotate.JsonAutoDetect;
|
||||
import org.codehaus.jackson.annotate.JsonMethod;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
|||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.interfaces.UserAuth;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import java.util.List;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.ozone.web.utils;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
|
|
|
@ -168,7 +168,13 @@ message KeyValue {
|
|||
message ContainerData {
|
||||
required string name = 1;
|
||||
repeated KeyValue metadata = 2;
|
||||
optional string containerPath = 3;
|
||||
optional string dbPath = 3;
|
||||
optional string containerPath = 4;
|
||||
}
|
||||
|
||||
message ContainerMeta {
|
||||
required string fileName = 1;
|
||||
required string hash = 2;
|
||||
}
|
||||
|
||||
// Container Messages.
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||
.createSingleNodePipeline;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Simple tests to verify that container persistence works as expected.
|
||||
*/
|
||||
public class TestContainerPersistence {
|
||||
|
||||
static String path;
|
||||
static ContainerManagerImpl containerManager;
|
||||
static OzoneConfiguration conf;
|
||||
static FsDatasetSpi fsDataSet;
|
||||
static MiniDFSCluster cluster;
|
||||
static List<Path> pathLists = new LinkedList<>();
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws IOException {
|
||||
conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
path = p.getPath().concat(
|
||||
TestContainerPersistence.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
||||
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
|
||||
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
|
||||
|
||||
File containerDir = new File(path);
|
||||
if (containerDir.exists()) {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
|
||||
Assert.assertTrue(containerDir.mkdirs());
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
fsDataSet = cluster.getDataNodes().get(0).getFSDataset();
|
||||
containerManager = new ContainerManagerImpl();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
cluster.shutdown();
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupPaths() throws IOException {
|
||||
if (!new File(path).exists()) {
|
||||
new File(path).mkdirs();
|
||||
}
|
||||
pathLists.clear();
|
||||
containerManager.getContainerMap().clear();
|
||||
pathLists.add(Paths.get(path));
|
||||
containerManager.init(conf, pathLists, fsDataSet);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupDir() throws IOException {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateContainer() throws Exception {
|
||||
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName));
|
||||
ContainerManagerImpl.ContainerStatus status = containerManager
|
||||
.getContainerMap().get(containerName);
|
||||
|
||||
Assert.assertTrue(status.isActive());
|
||||
Assert.assertNotNull(status.getContainer().getContainerPath());
|
||||
Assert.assertNotNull(status.getContainer().getDBPath());
|
||||
|
||||
|
||||
Assert.assertTrue(new File(status.getContainer().getContainerPath())
|
||||
.exists());
|
||||
|
||||
String containerPathString = ContainerUtils.getContainerNameFromFile(new
|
||||
File(status.getContainer().getContainerPath()));
|
||||
|
||||
Path meta = Paths.get(containerPathString);
|
||||
|
||||
String metadataFile = meta.toString() + OzoneConsts.CONTAINER_META;
|
||||
Assert.assertTrue(new File(metadataFile).exists());
|
||||
|
||||
|
||||
String dbPath = status.getContainer().getDBPath();
|
||||
|
||||
LevelDBStore store = null;
|
||||
try {
|
||||
store = new LevelDBStore(new File(dbPath), false);
|
||||
Assert.assertNotNull(store.getDB());
|
||||
} finally {
|
||||
if (store != null) {
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDuplicateContainer() throws Exception {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
try {
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
fail("Expected Exception not thrown.");
|
||||
} catch (IOException ex) {
|
||||
Assert.assertNotNull(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteContainer() throws Exception {
|
||||
String containerName1 = OzoneUtils.getRequestID();
|
||||
String containerName2 = OzoneUtils.getRequestID();
|
||||
|
||||
|
||||
ContainerData data = new ContainerData(containerName1);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
|
||||
data = new ContainerData(containerName2);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
|
||||
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName1));
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName2));
|
||||
|
||||
containerManager.deleteContainer(createSingleNodePipeline(),
|
||||
containerName1);
|
||||
Assert.assertFalse(containerManager.getContainerMap()
|
||||
.containsKey(containerName1));
|
||||
|
||||
// Let us make sure that we are able to re-use a container name after
|
||||
// delete.
|
||||
|
||||
data = new ContainerData(containerName1);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
|
||||
// Assert we still have both containers.
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName1));
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName2));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This test creates 1000 containers and reads them back 5 containers at a
|
||||
* time and verifies that we did get back all containers.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testListContainer() throws IOException {
|
||||
final int count = 1000;
|
||||
final int step = 5;
|
||||
|
||||
Map<String, ContainerData> testMap = new HashMap<>();
|
||||
for (int x = 0; x < count; x++) {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
||||
testMap.put(containerName, data);
|
||||
}
|
||||
|
||||
int counter = 0;
|
||||
String prevKey = "";
|
||||
List<ContainerData> results = new LinkedList<>();
|
||||
while (counter < count) {
|
||||
containerManager.listContainer(prevKey, step, results);
|
||||
for (int y = 0; y < results.size(); y++) {
|
||||
testMap.remove(results.get(y).getContainerName());
|
||||
}
|
||||
counter += step;
|
||||
String nextKey = results.get(results.size() - 1).getContainerName();
|
||||
|
||||
//Assert that container is returning results in a sorted fashion.
|
||||
Assert.assertTrue(prevKey.compareTo(nextKey) < 0);
|
||||
prevKey = nextKey;
|
||||
results.clear();
|
||||
}
|
||||
// Assert that we listed all the keys that we had put into
|
||||
// container.
|
||||
Assert.assertTrue(testMap.isEmpty());
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
|
||||
|
||||
public class TestOzoneContainer {
|
||||
@Test
|
||||
public void testCreateOzoneContainer() throws Exception {
|
||||
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
||||
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
|
||||
|
||||
// We don't start Ozone Container via data node, we will do it
|
||||
// independently in our test path.
|
||||
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, false);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
|
||||
|
||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
|
||||
().get(0).getFSDataset());
|
||||
container.start();
|
||||
|
||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||
client.connect();
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest();
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
container.stop();
|
||||
cluster.shutdown();
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOzoneContainerViaDataNode() throws Exception {
|
||||
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
||||
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
|
||||
|
||||
// Start ozone container Via Datanode create.
|
||||
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
|
||||
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
|
||||
|
||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
|
||||
// This client talks to ozone container via datanode.
|
||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||
client.connect();
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest();
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
cluster.shutdown();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -22,7 +22,7 @@ package org.apache.hadoop.ozone.web;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.http.HttpResponse;
|
||||
|
|
|
@ -29,13 +29,12 @@ import java.util.Date;
|
|||
import java.util.Locale;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import org.apache.http.HttpResponse;
|
||||
|
|
|
@ -24,12 +24,11 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
Loading…
Reference in New Issue