HDDS-155:Implement KeyValueContainer and adopt new disk layout for the containers. Contributed by Bharat Viswanadham

This commit is contained in:
Bharat Viswanadham 2018-06-14 20:54:54 -07:00
parent 9a5552bf76
commit 998e2850a3
14 changed files with 1286 additions and 88 deletions

View File

@ -101,6 +101,9 @@ public final class OzoneConsts {
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String KSM_DB_NAME = "ksm.db";
public static final String STORAGE_DIR_CHUNKS = "chunks";
public static final String CONTAINER_FILE_CHECKSUM_EXTENSION = ".chksm";
/**
* Supports Bucket Versioning.
*/

View File

@ -45,8 +45,9 @@
public abstract class Storage {
private static final Logger LOG = LoggerFactory.getLogger(Storage.class);
protected static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_CURRENT = "current";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String CONTAINER_DIR = "containerdir";
private final NodeType nodeType;
private final File root;

View File

@ -132,6 +132,11 @@ enum Result {
DELETE_ON_OPEN_CONTAINER = 26;
CLOSED_CONTAINER_RETRY = 27;
INVALID_CONTAINER_STATE = 28;
DISK_OUT_OF_SPACE = 29;
CONTAINER_ALREADY_EXISTS = 30;
CONTAINER_METADATA_ERROR = 31;
CONTAINER_FILES_CREATE_ERROR = 32;
CONTAINER_CHECKSUM_ERROR = 33;
}
/**

View File

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.NoSuchAlgorithmException;
/**
* Class to perform KeyValue Container operations.
*/
public class KeyValueContainer implements Container {
static final Logger LOG =
LoggerFactory.getLogger(Container.class);
private KeyValueContainerData containerData;
public KeyValueContainer(KeyValueContainerData containerData) {
Preconditions.checkNotNull(containerData, "KeyValueContainerData cannot " +
"be null");
this.containerData = containerData;
}
@Override
public void create(ContainerData cData) throws StorageContainerException {
}
@Override
public void delete(boolean forceDelete)
throws StorageContainerException {
}
@Override
public void update(boolean forceUpdate)
throws StorageContainerException {
}
@Override
public ContainerData getContainerData() {
return containerData;
}
@Override
public void close() throws StorageContainerException,
NoSuchAlgorithmException {
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import java.io.File;
import java.io.IOException;
/**
@ -42,6 +43,8 @@ public class KeyValueContainerData extends ContainerData {
//Number of pending deletion blocks in container.
private int numPendingDeletionBlocks;
private File dbFile = null;
/**
* Constructs KeyValueContainerData object.
* @param type - containerType
@ -63,6 +66,24 @@ public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
super(type, id, layOutVersion);
this.numPendingDeletionBlocks = 0;
}
/**
* Sets Container dbFile. This should be called only during creation of
* KeyValue container.
* @param containerDbFile
*/
public void setDbFile(File containerDbFile) {
dbFile = containerDbFile;
}
/**
* Returns container DB file.
* @return dbFile
*/
public File getDbFile() {
return dbFile;
}
/**
* Returns container metadata path.
*

View File

@ -21,21 +21,26 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import java.security.NoSuchAlgorithmException;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import java.util.Map;
/**
* Interface for Container Operations.
*/
public interface Container {
public interface Container extends RwLock {
/**
* Creates a container.
*
* @throws StorageContainerException
*/
void create(ContainerData containerData) throws StorageContainerException;
void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy,
String scmId) throws StorageContainerException;
/**
* Deletes the container.
@ -48,10 +53,11 @@ public interface Container {
/**
* Update the container.
*
* @param metaData
* @param forceUpdate if true, update container forcibly.
* @throws StorageContainerException
*/
void update(boolean forceUpdate)
void update(Map<String, String> metaData, boolean forceUpdate)
throws StorageContainerException;
/**
@ -68,8 +74,7 @@ void update(boolean forceUpdate)
*
* @throws StorageContainerException
*/
void close() throws StorageContainerException,
NoSuchAlgorithmException;
void close() throws StorageContainerException;
}

View File

@ -0,0 +1,544 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_CHECKSUM_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_METADATA_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_FILES_CREATE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.DISK_OUT_OF_SPACE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.ERROR_IN_COMPACT_DB;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNSUPPORTED_REQUEST;
/**
* Class to perform KeyValue Container operations.
*/
public class KeyValueContainer implements Container {
private static final Logger LOG = LoggerFactory.getLogger(Container.class);
// Use a non-fair RW lock for better throughput, we may revisit this decision
// if this causes fairness issues.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final KeyValueContainerData containerData;
private long containerMaxSize;
private Configuration config;
public KeyValueContainer(KeyValueContainerData containerData, Configuration
ozoneConfig) {
Preconditions.checkNotNull(containerData, "KeyValueContainerData cannot " +
"be null");
Preconditions.checkNotNull(ozoneConfig, "Ozone configuration cannot " +
"be null");
this.config = ozoneConfig;
this.containerData = containerData;
this.containerMaxSize = (long) ozoneConfig.getInt(ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024L * 1024L * 1024L;
}
@Override
public void create(VolumeSet volumeSet, VolumeChoosingPolicy
volumeChoosingPolicy, String scmId) throws StorageContainerException {
Preconditions.checkNotNull(volumeChoosingPolicy, "VolumeChoosingPolicy " +
"cannot be null");
Preconditions.checkNotNull(volumeSet, "VolumeSet cannot be null");
Preconditions.checkNotNull(scmId, "scmId cannot be null");
File containerMetaDataPath = null;
try {
//acquiring volumeset lock and container lock
volumeSet.acquireLock();
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), containerMaxSize);
String containerBasePath = containerVolume.getHddsRootDir().toString();
long containerId = containerData.getContainerId();
String containerName = Long.toString(containerId);
containerMetaDataPath = KeyValueContainerLocationUtil
.getContainerMetaDataPath(containerBasePath, scmId, containerId);
File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
containerBasePath, scmId, containerId);
File containerFile = KeyValueContainerLocationUtil.getContainerFile(
containerMetaDataPath, containerName);
File containerCheckSumFile = KeyValueContainerLocationUtil
.getContainerCheckSumFile(containerMetaDataPath, containerName);
File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
containerMetaDataPath, containerName);
// Check if it is new Container.
KeyValueContainerUtil.verifyIsNewContainer(containerMetaDataPath);
//Create Metadata path chunks path and metadata db
KeyValueContainerUtil.createContainerMetaData(containerMetaDataPath,
chunksPath, dbFile, containerName, config);
String impl = config.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
//Set containerData for the KeyValueContainer.
containerData.setMetadataPath(containerMetaDataPath.getPath());
containerData.setChunksPath(chunksPath.getPath());
containerData.setContainerDBType(impl);
containerData.setDbFile(dbFile);
// Create .container file and .chksm file
createContainerFile(containerFile, containerCheckSumFile);
} catch (StorageContainerException ex) {
if (containerMetaDataPath != null && containerMetaDataPath.getParentFile()
.exists()) {
FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
}
throw ex;
} catch (DiskOutOfSpaceException ex) {
throw new StorageContainerException("Container creation failed, due to " +
"disk out of space", ex, DISK_OUT_OF_SPACE);
} catch (FileAlreadyExistsException ex) {
throw new StorageContainerException("Container creation failed because " +
"ContainerFile already exists", ex, CONTAINER_ALREADY_EXISTS);
} catch (IOException ex) {
if (containerMetaDataPath != null && containerMetaDataPath.getParentFile()
.exists()) {
FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
}
throw new StorageContainerException("Container creation failed.", ex,
CONTAINER_INTERNAL_ERROR);
} finally {
volumeSet.releaseLock();
}
}
/**
* Creates .container file and checksum file.
*
* @param containerFile
* @param containerCheckSumFile
* @throws StorageContainerException
*/
private void createContainerFile(File containerFile, File
containerCheckSumFile) throws StorageContainerException {
File tempContainerFile = null;
File tempCheckSumFile = null;
FileOutputStream containerCheckSumStream = null;
Writer writer = null;
long containerId = containerData.getContainerId();
try {
tempContainerFile = createTempFile(containerFile);
tempCheckSumFile = createTempFile(containerCheckSumFile);
KeyValueYaml.createContainerFile(tempContainerFile, containerData);
//Compute Checksum for container file
String checksum = computeCheckSum(tempContainerFile);
containerCheckSumStream = new FileOutputStream(tempCheckSumFile);
writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8");
writer.write(checksum);
writer.flush();
NativeIO.renameTo(tempContainerFile, containerFile);
NativeIO.renameTo(tempCheckSumFile, containerCheckSumFile);
} catch (IOException ex) {
throw new StorageContainerException("Error during creation of " +
"required files(.container, .chksm) for container. Container Name: "
+ containerId, ex, CONTAINER_FILES_CREATE_ERROR);
} finally {
IOUtils.closeStream(containerCheckSumStream);
if (tempContainerFile != null && tempContainerFile.exists()) {
if (!tempContainerFile.delete()) {
LOG.warn("Unable to delete container temporary file: {}.",
tempContainerFile.getAbsolutePath());
}
}
if (tempCheckSumFile != null && tempCheckSumFile.exists()) {
if (!tempCheckSumFile.delete()) {
LOG.warn("Unable to delete container temporary checksum file: {}.",
tempContainerFile.getAbsolutePath());
}
}
try {
if (writer != null) {
writer.close();
}
} catch (IOException ex) {
LOG.warn("Error occurred during closing the writer. Container " +
"Name:" + containerId);
}
}
}
private void updateContainerFile(File containerFile, File
containerCheckSumFile) throws StorageContainerException {
File containerBkpFile = null;
File checkSumBkpFile = null;
long containerId = containerData.getContainerId();
try {
if (containerFile.exists() && containerCheckSumFile.exists()) {
//Take backup of original files (.container and .chksm files)
containerBkpFile = new File(containerFile + ".bkp");
checkSumBkpFile = new File(containerCheckSumFile + ".bkp");
NativeIO.renameTo(containerFile, containerBkpFile);
NativeIO.renameTo(containerCheckSumFile, checkSumBkpFile);
createContainerFile(containerFile, containerCheckSumFile);
} else {
containerData.setState(ContainerProtos.ContainerLifeCycleState.INVALID);
throw new StorageContainerException("Container is an Inconsistent " +
"state, missing required files(.container, .chksm)",
INVALID_CONTAINER_STATE);
}
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {
// Restore from back up files.
try {
if (containerBkpFile != null && containerBkpFile
.exists() && containerFile.delete()) {
LOG.info("update failed for container Name: {}, restoring container" +
" file", containerId);
NativeIO.renameTo(containerBkpFile, containerFile);
}
if (checkSumBkpFile != null && checkSumBkpFile.exists() &&
containerCheckSumFile.delete()) {
LOG.info("update failed for container Name: {}, restoring checksum" +
" file", containerId);
NativeIO.renameTo(checkSumBkpFile, containerCheckSumFile);
}
throw new StorageContainerException("Error during updating of " +
"required files(.container, .chksm) for container. Container Name: "
+ containerId, ex, CONTAINER_FILES_CREATE_ERROR);
} catch (IOException e) {
containerData.setState(ContainerProtos.ContainerLifeCycleState.INVALID);
LOG.error("During restore failed for container Name: " +
containerId);
throw new StorageContainerException(
"Failed to restore container data from the backup. ID: "
+ containerId, CONTAINER_FILES_CREATE_ERROR);
}
} finally {
if (containerBkpFile != null && containerBkpFile
.exists()) {
if(!containerBkpFile.delete()) {
LOG.warn("Unable to delete container backup file: {}",
containerBkpFile);
}
}
if (checkSumBkpFile != null && checkSumBkpFile.exists()) {
if(!checkSumBkpFile.delete()) {
LOG.warn("Unable to delete container checksum backup file: {}",
checkSumBkpFile);
}
}
}
}
/**
* Compute checksum of the .container file.
* @param containerFile
* @throws StorageContainerException
*/
private String computeCheckSum(File containerFile) throws
StorageContainerException {
MessageDigest sha;
FileInputStream containerFileStream = null;
try {
sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
} catch (NoSuchAlgorithmException e) {
throw new StorageContainerException("Unable to create Message Digest,"
+ " usually this is a java configuration issue.",
NO_SUCH_ALGORITHM);
}
try {
containerFileStream = new FileInputStream(containerFile);
byte[] byteArray = new byte[1024];
int bytesCount = 0;
while ((bytesCount = containerFileStream.read(byteArray)) != -1) {
sha.update(byteArray, 0, bytesCount);
}
String checksum = DigestUtils.sha256Hex(sha.digest());
return checksum;
} catch (IOException ex) {
throw new StorageContainerException("Error during update of " +
"check sum file. Container Name: " + containerData.getContainerId(),
ex, CONTAINER_CHECKSUM_ERROR);
} finally {
IOUtils.closeStream(containerFileStream);
}
}
@Override
public void delete(boolean forceDelete)
throws StorageContainerException {
long containerId = containerData.getContainerId();
try {
KeyValueContainerUtil.removeContainer(containerData, config, forceDelete);
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {
// TODO : An I/O error during delete can leave partial artifacts on the
// disk. We will need the cleaner thread to cleanup this information.
String errMsg = String.format("Failed to cleanup container. ID: %d",
containerId);
LOG.error(errMsg, ex);
throw new StorageContainerException(errMsg, ex, CONTAINER_INTERNAL_ERROR);
}
}
@Override
public void close() throws StorageContainerException {
//TODO: writing .container file and compaction can be done
// asynchronously, otherwise rpc call for this will take a lot of time to
// complete this action
try {
writeLock();
long containerId = containerData.getContainerId();
if(!containerData.isValid()) {
LOG.debug("Invalid container data. Container Id: {}", containerId);
throw new StorageContainerException("Invalid container data. Name : " +
containerId, INVALID_CONTAINER_STATE);
}
containerData.closeContainer();
File containerFile = getContainerFile();
File containerCheckSumFile = getContainerCheckSumFile();
// update the new container data to .container File
updateContainerFile(containerFile, containerCheckSumFile);
} catch (StorageContainerException ex) {
throw ex;
} finally {
writeUnlock();
}
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
try {
MetadataStore db = KeyUtils.getDB(containerData, config);
db.compactDB();
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {
LOG.error("Error in DB compaction while closing container", ex);
throw new StorageContainerException(ex, ERROR_IN_COMPACT_DB);
}
}
@Override
public ContainerData getContainerData() {
return containerData;
}
@Override
public void update(Map<String, String> metadata, boolean forceUpdate)
throws StorageContainerException {
// TODO: Now, when writing the updated data to .container file, we are
// holding lock and writing data to disk. We can have async implementation
// to flush the update container data to disk.
long containerId = containerData.getContainerId();
if(!containerData.isValid()) {
LOG.debug("Invalid container data. ID: {}", containerId);
throw new StorageContainerException("Invalid container data. " +
"Container Name : " + containerId, INVALID_CONTAINER_STATE);
}
if (!forceUpdate && !containerData.isOpen()) {
throw new StorageContainerException(
"Updating a closed container is not allowed. ID: " + containerId,
UNSUPPORTED_REQUEST);
}
try {
for (Map.Entry<String, String> entry : metadata.entrySet()) {
containerData.addMetadata(entry.getKey(), entry.getValue());
}
} catch (IOException ex) {
throw new StorageContainerException("Container Metadata update error" +
". Container Name:" + containerId, ex, CONTAINER_METADATA_ERROR);
}
try {
writeLock();
String containerName = String.valueOf(containerId);
File containerFile = getContainerFile();
File containerCheckSumFile = getContainerCheckSumFile();
// update the new container data to .container File
updateContainerFile(containerFile, containerCheckSumFile);
} catch (StorageContainerException ex) {
throw ex;
} finally {
writeUnlock();
}
}
/**
* Acquire read lock.
*/
public void readLock() {
this.lock.readLock().lock();
}
/**
* Release read lock.
*/
public void readUnlock() {
this.lock.readLock().unlock();
}
/**
* Check if the current thread holds read lock.
*/
public boolean hasReadLock() {
return this.lock.readLock().tryLock();
}
/**
* Acquire write lock.
*/
public void writeLock() {
this.lock.writeLock().lock();
}
/**
* Release write lock.
*/
public void writeUnlock() {
this.lock.writeLock().unlock();
}
/**
* Check if the current thread holds write lock.
*/
public boolean hasWriteLock() {
return this.lock.writeLock().isHeldByCurrentThread();
}
/**
* Acquire read lock, unless interrupted while waiting.
* @throws InterruptedException
*/
@Override
public void readLockInterruptibly() throws InterruptedException {
this.lock.readLock().lockInterruptibly();
}
/**
* Acquire write lock, unless interrupted while waiting.
* @throws InterruptedException
*/
@Override
public void writeLockInterruptibly() throws InterruptedException {
this.lock.writeLock().lockInterruptibly();
}
/**
* Returns containerFile.
* @return .container File name
*/
private File getContainerFile() {
return new File(containerData.getMetadataPath(), containerData
.getContainerId() + OzoneConsts.CONTAINER_EXTENSION);
}
/**
* Returns container checksum file.
* @return container checksum file
*/
private File getContainerCheckSumFile() {
return new File(containerData.getMetadataPath(), containerData
.getContainerId() + OzoneConsts.CONTAINER_FILE_CHECKSUM_EXTENSION);
}
/**
* Creates a temporary file.
* @param file
* @return
* @throws IOException
*/
private File createTempFile(File file) throws IOException{
return File.createTempFile("tmp_" + System.currentTimeMillis() + "_",
file.getName(), file.getParentFile());
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Storage;
import java.io.File;
/**
* Class which provides utility methods for container locations.
*/
public final class KeyValueContainerLocationUtil {
/* Never constructed. */
private KeyValueContainerLocationUtil() {
}
/**
* Returns Container Metadata Location.
* @param baseDir
* @param scmId
* @param containerId
* @return containerMetadata Path
*/
public static File getContainerMetaDataPath(String baseDir, String scmId,
long containerId) {
String containerMetaDataPath = getBaseContainerLocation(baseDir, scmId,
containerId);
containerMetaDataPath = containerMetaDataPath + File.separator +
OzoneConsts.CONTAINER_META_PATH;
return new File(containerMetaDataPath);
}
/**
* Returns Container Chunks Location.
* @param baseDir
* @param scmId
* @param containerId
* @return chunksPath
*/
public static File getChunksLocationPath(String baseDir, String scmId,
long containerId) {
String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
+ File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
return new File(chunksPath);
}
/**
* Returns base directory for specified container.
* @param baseDir
* @param scmId
* @param containerId
* @return base directory for container.
*/
private static String getBaseContainerLocation(String baseDir, String scmId,
long containerId) {
Preconditions.checkNotNull(baseDir, "Base Directory cannot be null");
Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
Preconditions.checkState(containerId >= 0,
"Container Id cannot be negative.");
String containerSubDirectory = getContainerSubDirectory(containerId);
String containerMetaDataPath = baseDir + File.separator + scmId +
File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
containerSubDirectory + File.separator + containerId;
return containerMetaDataPath;
}
/**
* Returns subdirectory, where this container needs to be placed.
* @param containerId
* @return container sub directory
*/
private static String getContainerSubDirectory(long containerId){
int directory = (int) ((containerId >> 9) & 0xFF);
return Storage.CONTAINER_DIR + directory;
}
/**
* Returns containerFile.
* @param containerMetaDataPath
* @param containerName
* @return .container File name
*/
public static File getContainerFile(File containerMetaDataPath, String
containerName) {
Preconditions.checkNotNull(containerMetaDataPath);
Preconditions.checkNotNull(containerName);
return new File(containerMetaDataPath, containerName +
OzoneConsts.CONTAINER_EXTENSION);
}
/**
* Return containerDB File.
* @param containerMetaDataPath
* @param containerName
* @return containerDB File name
*/
public static File getContainerDBFile(File containerMetaDataPath, String
containerName) {
Preconditions.checkNotNull(containerMetaDataPath);
Preconditions.checkNotNull(containerName);
return new File(containerMetaDataPath, containerName + OzoneConsts
.DN_CONTAINER_DB);
}
/**
* Returns container checksum file.
* @param containerMetaDataPath
* @param containerName
* @return container checksum file
*/
public static File getContainerCheckSumFile(File containerMetaDataPath,
String containerName) {
Preconditions.checkNotNull(containerMetaDataPath);
Preconditions.checkNotNull(containerName);
return new File(containerMetaDataPath, containerName + OzoneConsts
.CONTAINER_FILE_CHECKSUM_EXTENSION);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
/**
* Class which defines utility methods for KeyValueContainer.
*/
public final class KeyValueContainerUtil {
/* Never constructed. */
private KeyValueContainerUtil() {
}
private static final Logger LOG = LoggerFactory.getLogger(
KeyValueContainerUtil.class);
public static void verifyIsNewContainer(File containerFile) throws
FileAlreadyExistsException {
Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
"null");
if (containerFile.getParentFile().exists()) {
LOG.error("container already exists on disk. File: {}", containerFile
.toPath());
throw new FileAlreadyExistsException("container already exists on " +
"disk.");
}
}
/**
* creates metadata path, chunks path and metadata DB for the specified
* container.
*
* @param containerMetaDataPath
* @throws IOException
*/
public static void createContainerMetaData(File containerMetaDataPath, File
chunksPath, File dbFile, String containerName, Configuration conf) throws
IOException {
Preconditions.checkNotNull(containerMetaDataPath);
Preconditions.checkNotNull(containerName);
Preconditions.checkNotNull(conf);
if (!containerMetaDataPath.mkdirs()) {
LOG.error("Unable to create directory for metadata storage. Path: {}",
containerMetaDataPath);
throw new IOException("Unable to create directory for metadata storage." +
" Path: " + containerMetaDataPath);
}
MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
.setCreateIfMissing(true).setDbFile(dbFile).build();
// we close since the SCM pre-creates containers.
// we will open and put Db handle into a cache when keys are being created
// in a container.
store.close();
if (!chunksPath.mkdirs()) {
LOG.error("Unable to create chunks directory Container {}",
chunksPath);
//clean up container metadata path and metadata db
FileUtils.deleteDirectory(containerMetaDataPath);
FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
throw new IOException("Unable to create directory for data storage." +
" Path: " + chunksPath);
}
}
/**
* remove Container if it is empty.
* <p/>
* There are three things we need to delete.
* <p/>
* 1. Container file and metadata file. 2. The Level DB file 3. The path that
* we created on the data location.
*
* @param containerData - Data of the container to remove.
* @param conf - configuration of the cluster.
* @param forceDelete - whether this container should be deleted forcibly.
* @throws IOException
*/
public static void removeContainer(KeyValueContainerData containerData,
Configuration conf, boolean forceDelete)
throws IOException {
Preconditions.checkNotNull(containerData);
File containerMetaDataPath = new File(containerData
.getMetadataPath());
File chunksPath = new File(containerData.getChunksPath());
MetadataStore db = KeyUtils.getDB(containerData, conf);
// If the container is not empty and cannot be deleted forcibly,
// then throw a SCE to stop deleting.
if(!forceDelete && !db.isEmpty()) {
throw new StorageContainerException(
"Container cannot be deleted because it is not empty.",
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
}
// Close the DB connection and remove the DB handler from cache
KeyUtils.removeDB(containerData, conf);
// Delete the Container MetaData path.
FileUtils.deleteDirectory(containerMetaDataPath);
//Delete the Container Chunks Path.
FileUtils.deleteDirectory(chunksPath);
//Delete Container directory
FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB;
/**
* Utils functions to help key functions.
*/
public final class KeyUtils {
/** Never constructed. **/
private KeyUtils() {
}
/**
* Get a DB handler for a given container.
* If the handler doesn't exist in cache yet, first create one and
* add into cache. This function is called with containerManager
* ReadLock held.
*
* @param container container.
* @param conf configuration.
* @return MetadataStore handle.
* @throws StorageContainerException
*/
public static MetadataStore getDB(KeyValueContainerData container,
Configuration conf) throws
StorageContainerException {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
Preconditions.checkNotNull(container.getDbFile());
try {
return cache.getDB(container.getContainerId(), container
.getContainerDBType(), container.getDbFile().getAbsolutePath());
} catch (IOException ex) {
String message = String.format("Unable to open DB Path: " +
"%s. ex: %s", container.getDbFile(), ex.getMessage());
throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
}
}
/**
* Remove a DB handler from cache.
*
* @param container - Container data.
* @param conf - Configuration.
*/
public static void removeDB(KeyValueContainerData container, Configuration
conf) {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
cache.removeDB(container.getContainerId());
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.helpers;
/**
This package contains utility classes for KeyValue container type.
**/

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
/**
This package contains classes for KeyValue container type.
**/

View File

@ -17,12 +17,14 @@
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.junit.Test;
import java.io.IOException;
@ -51,7 +53,8 @@ public void testAddGetRemoveContainer() throws StorageContainerException {
KeyValueContainerData kvData = new KeyValueContainerData(
ContainerProtos.ContainerType.KeyValueContainer, containerId);
kvData.setState(state);
KeyValueContainer keyValueContainer = new KeyValueContainer(kvData);
KeyValueContainer keyValueContainer = new KeyValueContainer(kvData, new
OzoneConfiguration());
//addContainer
boolean result = containerSet.addContainer(keyValueContainer);
@ -160,7 +163,8 @@ private ContainerSet createContainerSet() throws StorageContainerException {
} else {
kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
}
KeyValueContainer kv = new KeyValueContainer(kvData);
KeyValueContainer kv = new KeyValueContainer(kvData, new
OzoneConfiguration());
containerSet.addContainer(kv);
}
return containerSet;

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.apache.ratis.util.Preconditions.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
/**
* Class to test KeyValue Container operations.
*/
public class TestKeyValueContainer {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private OzoneConfiguration conf;
private String scmId = UUID.randomUUID().toString();
private VolumeSet volumeSet;
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private long containerId = 1L;
private String containerName = String.valueOf(containerId);
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(conf).datanodeUuid(UUID.randomUUID()
.toString()).build();
volumeSet = mock(VolumeSet.class);
volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(
ContainerProtos.ContainerType.KeyValueContainer, 1L);
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
}
@Test
public void testCreateContainer() throws Exception {
// Create Container.
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainerData = (KeyValueContainerData) keyValueContainer
.getContainerData();
String containerMetaDataPath = keyValueContainerData
.getMetadataPath();
String chunksPath = keyValueContainerData.getChunksPath();
// Check whether containerMetaDataPath and chunksPath exists or not.
assertTrue(containerMetaDataPath != null);
assertTrue(chunksPath != null);
File containerMetaDataLoc = new File(containerMetaDataPath);
//Check whether container file, check sum file and container db file exists
// or not.
assertTrue(KeyValueContainerLocationUtil.getContainerFile(
containerMetaDataLoc, containerName).exists(), ".Container File does" +
" not exist");
assertTrue(KeyValueContainerLocationUtil.getContainerCheckSumFile(
containerMetaDataLoc, containerName).exists(), "Container check sum " +
"File does" + " not exist");
assertTrue(KeyValueContainerLocationUtil.getContainerDBFile(
containerMetaDataLoc, containerName).exists(), "Container DB does " +
"not exist");
}
@Test
public void testDuplicateContainer() throws Exception {
try {
// Create Container.
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
fail("testDuplicateContainer failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("ContainerFile already " +
"exists", ex);
assertEquals(ContainerProtos.Result.CONTAINER_ALREADY_EXISTS, ex
.getResult());
}
}
@Test
public void testDiskFullExceptionCreateContainer() throws Exception {
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenThrow(DiskChecker.DiskOutOfSpaceException.class);
try {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
fail("testDiskFullExceptionCreateContainer failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("disk out of space",
ex);
assertEquals(ContainerProtos.Result.DISK_OUT_OF_SPACE, ex.getResult());
}
}
@Test
public void testDeleteContainer() throws Exception {
keyValueContainerData.setState(ContainerProtos.ContainerLifeCycleState
.CLOSED);
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.delete(true);
String containerMetaDataPath = keyValueContainerData
.getMetadataPath();
File containerMetaDataLoc = new File(containerMetaDataPath);
assertFalse("Container directory still exists", containerMetaDataLoc
.getParentFile().exists());
assertFalse("Container File still exists",
KeyValueContainerLocationUtil.getContainerFile(containerMetaDataLoc,
containerName).exists());
assertFalse("Container DB file still exists",
KeyValueContainerLocationUtil.getContainerDBFile(containerMetaDataLoc,
containerName).exists());
}
@Test
public void testCloseContainer() throws Exception {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.close();
keyValueContainerData = (KeyValueContainerData) keyValueContainer
.getContainerData();
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
keyValueContainerData.getState());
//Check state in the .container file
String containerMetaDataPath = keyValueContainerData
.getMetadataPath();
File containerMetaDataLoc = new File(containerMetaDataPath);
File containerFile = KeyValueContainerLocationUtil.getContainerFile(
containerMetaDataLoc, containerName);
keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
keyValueContainerData.getState());
}
@Test
public void testCloseInvalidContainer() throws Exception {
try {
keyValueContainerData.setState(ContainerProtos.ContainerLifeCycleState
.INVALID);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.close();
fail("testCloseInvalidContainer failed");
} catch (StorageContainerException ex) {
assertEquals(ContainerProtos.Result.INVALID_CONTAINER_STATE,
ex.getResult());
GenericTestUtils.assertExceptionContains("Invalid container data", ex);
}
}
@Test
public void testUpdateContainer() throws IOException {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put("VOLUME", "ozone");
metadata.put("OWNER", "hdfs");
keyValueContainer.update(metadata, true);
keyValueContainerData = (KeyValueContainerData) keyValueContainer
.getContainerData();
assertEquals(2, keyValueContainerData.getMetadata().size());
//Check metadata in the .container file
String containerMetaDataPath = keyValueContainerData
.getMetadataPath();
File containerMetaDataLoc = new File(containerMetaDataPath);
File containerFile = KeyValueContainerLocationUtil.getContainerFile(
containerMetaDataLoc, containerName);
keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
assertEquals(2, keyValueContainerData.getMetadata().size());
}
@Test
public void testUpdateContainerInvalidMetadata() throws IOException {
try {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put("VOLUME", "ozone");
keyValueContainer.update(metadata, true);
//Trying to update again with same metadata
keyValueContainer.update(metadata, true);
fail("testUpdateContainerInvalidMetadata failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Container Metadata update " +
"error", ex);
assertEquals(ContainerProtos.Result.CONTAINER_METADATA_ERROR, ex
.getResult());
}
}
@Test
public void testUpdateContainerUnsupportedRequest() throws Exception {
try {
keyValueContainerData.setState(ContainerProtos.ContainerLifeCycleState
.CLOSED);
keyValueContainer = new KeyValueContainer(keyValueContainerData, conf);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
Map<String, String> metadata = new HashMap<>();
metadata.put("VOLUME", "ozone");
keyValueContainer.update(metadata, false);
fail("testUpdateContainerUnsupportedRequest failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Updating a closed container " +
"is not allowed", ex);
assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex
.getResult());
}
}
}