diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java index c58fb9dbbcf..9de84da390e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java @@ -70,4 +70,4 @@ public interface ChunkManager { */ void shutdown(); -} +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java new file mode 100644 index 00000000000..6ee0fd3f681 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; +import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.NO_SUCH_ALGORITHM; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; + +/** + * This class is for performing chunk related operations. + */ +public class ChunkManagerImpl implements ChunkManager { + static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class); + + /** + * writes a given chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block + * @param info - ChunkInfo + * @param data - data of the chunk + * @param stage - Stage of the Chunk operation + * @throws StorageContainerException + */ + public void writeChunk(Container container, BlockID blockID, ChunkInfo info, + byte[] data, ContainerProtos.Stage stage) + throws StorageContainerException { + + try { + + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + + File chunkFile = ChunkUtils.validateChunk(containerData, info); + File tmpChunkFile = getTmpChunkFile(chunkFile, info); + + LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file", + info.getChunkName(), stage, chunkFile, tmpChunkFile); + + switch (stage) { + case WRITE_DATA: + // Initially writes to temporary chunk file. + ChunkUtils.writeData(tmpChunkFile, info, data); + break; + case COMMIT_DATA: + // commit the data, means move chunk data from temporary chunk file + // to actual chunk file. + long sizeDiff = tmpChunkFile.length() - chunkFile.length(); + commitChunk(tmpChunkFile, chunkFile); + containerData.incrBytesUsed(sizeDiff); + containerData.incrWriteCount(); + containerData.incrWriteBytes(sizeDiff); + break; + case COMBINED: + // directly write to the chunk file + ChunkUtils.writeData(chunkFile, info, data); + containerData.incrBytesUsed(info.getLen()); + containerData.incrWriteCount(); + containerData.incrWriteBytes(info.getLen()); + break; + default: + throw new IOException("Can not identify write operation."); + } + } catch (StorageContainerException ex) { + throw ex; + } catch (NoSuchAlgorithmException ex) { + LOG.error("write data failed. error: {}", ex); + throw new StorageContainerException("Internal error: ", ex, + NO_SUCH_ALGORITHM); + } catch (ExecutionException | IOException ex) { + LOG.error("write data failed. error: {}", ex); + throw new StorageContainerException("Internal error: ", ex, + CONTAINER_INTERNAL_ERROR); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("write data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", e, + CONTAINER_INTERNAL_ERROR); + } + } + + /** + * reads the data defined by a chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block. + * @param info - ChunkInfo. + * @return byte array + * @throws StorageContainerException + * TODO: Right now we do not support partial reads and writes of chunks. + * TODO: Explore if we need to do that for ozone. + */ + public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) + throws StorageContainerException { + try { + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + ByteBuffer data; + + // Checking here, which layout version the container is, and reading + // the chunk file in that format. + // In version1, we verify checksum if it is available and return data + // of the chunk file. + if (containerData.getLayOutVersion() == ChunkLayOutVersion + .getLatestVersion().getVersion()) { + File chunkFile = ChunkUtils.getChunkFile(containerData, info); + data = ChunkUtils.readData(chunkFile, info); + containerData.incrReadCount(); + containerData.incrReadBytes(chunkFile.length()); + return data.array(); + } + } catch(NoSuchAlgorithmException ex) { + LOG.error("read data failed. error: {}", ex); + throw new StorageContainerException("Internal error: ", + ex, NO_SUCH_ALGORITHM); + } catch (ExecutionException ex) { + LOG.error("read data failed. error: {}", ex); + throw new StorageContainerException("Internal error: ", + ex, CONTAINER_INTERNAL_ERROR); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("read data failed. error: {}", e); + throw new StorageContainerException("Internal error: ", + e, CONTAINER_INTERNAL_ERROR); + } + return null; + } + + /** + * Deletes a given chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block + * @param info - Chunk Info + * @throws StorageContainerException + */ + public void deleteChunk(Container container, BlockID blockID, ChunkInfo info) + throws StorageContainerException { + Preconditions.checkNotNull(blockID, "Block ID cannot be null."); + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + // Checking here, which layout version the container is, and performing + // deleting chunk operation. + // In version1, we have only chunk file. + if (containerData.getLayOutVersion() == ChunkLayOutVersion + .getLatestVersion().getVersion()) { + File chunkFile = ChunkUtils.getChunkFile(containerData, info); + if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { + FileUtil.fullyDelete(chunkFile); + containerData.decrBytesUsed(chunkFile.length()); + } else { + LOG.error("Not Supported Operation. Trying to delete a " + + "chunk that is in shared file. chunk info : " + info.toString()); + throw new StorageContainerException("Not Supported Operation. " + + "Trying to delete a chunk that is in shared file. chunk info : " + + info.toString(), UNSUPPORTED_REQUEST); + } + } + } + + /** + * Shutdown the chunkManager. + * + * In the chunkManager we haven't acquired any resources, so nothing to do + * here. + */ + + public void shutdown() { + //TODO: need to revisit this during integration of container IO. + } + + /** + * Returns the temporary chunkFile path. + * @param chunkFile + * @param info + * @return temporary chunkFile path + * @throws StorageContainerException + */ + private File getTmpChunkFile(File chunkFile, ChunkInfo info) + throws StorageContainerException { + return new File(chunkFile.getParent(), + chunkFile.getName() + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + } + + /** + * Commit the chunk by renaming the temporary chunk file to chunk file. + * @param tmpChunkFile + * @param chunkFile + * @throws IOException + */ + private void commitChunk(File tmpChunkFile, File chunkFile) throws + IOException { + Files.move(tmpChunkFile.toPath(), chunkFile.toPath(), + StandardCopyOption.REPLACE_EXISTING); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java new file mode 100644 index 00000000000..87565ceac7b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY; + +/** + * This class is for performing key related operations on the KeyValue + * Container. + */ +public class KeyManagerImpl implements KeyManager { + + static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class); + + private Configuration config; + + /** + * Constructs a key Manager. + * + * @param conf - Ozone configuration + */ + public KeyManagerImpl(Configuration conf) { + Preconditions.checkNotNull(conf, "Config cannot be null"); + this.config = conf; + } + + /** + * Puts or overwrites a key. + * + * @param container - Container for which key need to be added. + * @param data - Key Data. + * @throws IOException + */ + public void putKey(Container container, KeyData data) throws IOException { + Preconditions.checkNotNull(data, "KeyData cannot be null for put " + + "operation."); + Preconditions.checkState(data.getContainerID() >= 0, "Container Id " + + "cannot be negative"); + // We are not locking the key manager since LevelDb serializes all actions + // against a single DB. We rely on DB level locking to avoid conflicts. + MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container + .getContainerData(), config); + + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage() + .toByteArray()); + } + + /** + * Gets an existing key. + * + * @param container - Container from which key need to be get. + * @param data - Key Data. + * @return Key Data. + * @throws IOException + */ + public KeyData getKey(Container container, KeyData data) throws IOException { + Preconditions.checkNotNull(data, "Key data cannot be null"); + Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" + + " be null"); + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + MetadataStore db = KeyUtils.getDB(containerData, config); + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + byte[] kData = db.get(Longs.toByteArray(data.getLocalID())); + if (kData == null) { + throw new StorageContainerException("Unable to find the key.", + NO_SUCH_KEY); + } + ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData); + return KeyData.getFromProtoBuf(keyData); + } + + /** + * Deletes an existing Key. + * + * @param container - Container from which key need to be deleted. + * @param blockID - ID of the block. + * @throws StorageContainerException + */ + public void deleteKey(Container container, BlockID blockID) throws + IOException { + Preconditions.checkNotNull(blockID, "block ID cannot be null."); + Preconditions.checkState(blockID.getContainerID() >= 0, + "Container ID cannot be negative."); + Preconditions.checkState(blockID.getLocalID() >= 0, + "Local ID cannot be negative."); + + KeyValueContainerData cData = (KeyValueContainerData) container + .getContainerData(); + MetadataStore db = KeyUtils.getDB(cData, config); + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + // Note : There is a race condition here, since get and delete + // are not atomic. Leaving it here since the impact is refusing + // to delete a key which might have just gotten inserted after + // the get check. + byte[] kKey = Longs.toByteArray(blockID.getLocalID()); + byte[] kData = db.get(kKey); + if (kData == null) { + throw new StorageContainerException("Unable to find the key.", + NO_SUCH_KEY); + } + db.delete(kKey); + } + + /** + * List keys in a container. + * + * @param container - Container from which keys need to be listed. + * @param startLocalID - Key to start from, 0 to begin. + * @param count - Number of keys to return. + * @return List of Keys that match the criteria. + */ + public List listKey(Container container, long startLocalID, int + count) throws IOException { + Preconditions.checkNotNull(container, "container cannot be null"); + Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " + + "negative"); + Preconditions.checkArgument(count > 0, + "Count must be a positive number."); + container.readLock(); + List result = null; + KeyValueContainerData cData = (KeyValueContainerData) container + .getContainerData(); + MetadataStore db = KeyUtils.getDB(cData, config); + result = new ArrayList<>(); + byte[] startKeyInBytes = Longs.toByteArray(startLocalID); + List> range = db.getSequentialRangeKVs( + startKeyInBytes, count, null); + for (Map.Entry entry : range) { + KeyData value = KeyUtils.getKeyData(entry.getValue()); + KeyData data = new KeyData(value.getBlockID()); + result.add(data); + } + return result; + } + + /** + * Shutdown KeyValueContainerManager. + */ + public void shutdown() { + KeyUtils.shutdownCache(ContainerCache.getInstance(config)); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java new file mode 100644 index 00000000000..c837cccd937 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.helpers; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; +import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; + +/** + * Utility methods for chunk operations for KeyValue container. + */ +public final class ChunkUtils { + + /** Never constructed. **/ + private ChunkUtils() { + + } + + /** + * Writes the data in chunk Info to the specified location in the chunkfile. + * + * @param chunkFile - File to write data to. + * @param chunkInfo - Data stream to write. + * @param data - The data buffer. + * @throws StorageContainerException + */ + public static void writeData(File chunkFile, ChunkInfo chunkInfo, + byte[] data) throws + StorageContainerException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + if (data.length != chunkInfo.getLen()) { + String err = String.format("data array does not match the length " + + "specified. DataLen: %d Byte Array: %d", + chunkInfo.getLen(), data.length); + log.error(err); + throw new StorageContainerException(err, INVALID_WRITE_SIZE); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.SPARSE, + StandardOpenOption.SYNC); + lock = file.lock().get(); + if (chunkInfo.getChecksum() != null && + !chunkInfo.getChecksum().isEmpty()) { + verifyChecksum(chunkInfo, data, log); + } + int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); + if (size != data.length) { + log.error("Invalid write size found. Size:{} Expected: {} ", size, + data.length); + throw new StorageContainerException("Invalid write size found. " + + "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE); + } + } catch (StorageContainerException ex) { + throw ex; + } catch(IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + + } finally { + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.error("Unable to release lock ??, Fatal Error."); + throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR); + + } + } + if (file != null) { + try { + file.close(); + } catch (IOException e) { + throw new StorageContainerException("Error closing chunk file", + e, CONTAINER_INTERNAL_ERROR); + } + } + } + } + + /** + * Reads data from an existing chunk file. + * + * @param chunkFile - file where data lives. + * @param data - chunk definition. + * @return ByteBuffer + * @throws StorageContainerException + * @throws ExecutionException + * @throws InterruptedException + */ + public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws + StorageContainerException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + if (!chunkFile.exists()) { + log.error("Unable to find the chunk file. chunk info : {}", + data.toString()); + throw new StorageContainerException("Unable to find the chunk file. " + + "chunk info " + + data.toString(), UNABLE_TO_FIND_CHUNK); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.READ); + lock = file.lock(data.getOffset(), data.getLen(), true).get(); + + ByteBuffer buf = ByteBuffer.allocate((int) data.getLen()); + file.read(buf, data.getOffset()).get(); + + if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { + verifyChecksum(data, buf.array(), log); + } + + return buf; + } catch (IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + } finally { + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.error("I/O error is lock release."); + } + } + if (file != null) { + IOUtils.closeStream(file); + } + } + } + + /** + * Verifies the checksum of a chunk against the data buffer. + * + * @param chunkInfo - Chunk Info. + * @param data - data buffer + * @param log - log + * @throws NoSuchAlgorithmException + * @throws StorageContainerException + */ + private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger + log) throws NoSuchAlgorithmException, StorageContainerException { + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha.update(data); + if (!Hex.encodeHexString(sha.digest()).equals( + chunkInfo.getChecksum())) { + log.error("Checksum mismatch. Provided: {} , computed: {}", + chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest())); + throw new StorageContainerException("Checksum mismatch. Provided: " + + chunkInfo.getChecksum() + " , computed: " + + DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH); + } + } + + /** + * Validates chunk data and returns a file object to Chunk File that we are + * expected to write data to. + * + * @param data - container data. + * @param info - chunk info. + * @return File + * @throws StorageContainerException + */ + public static File validateChunk(KeyValueContainerData data, ChunkInfo info) + throws StorageContainerException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + File chunkFile = getChunkFile(data, info); + if (isOverWriteRequested(chunkFile, info)) { + if (!isOverWritePermitted(info)) { + log.error("Rejecting write chunk request. Chunk overwrite " + + "without explicit request. {}", info.toString()); + throw new StorageContainerException("Rejecting write chunk request. " + + "OverWrite flag required." + info.toString(), + OVERWRITE_FLAG_REQUIRED); + } + } + return chunkFile; + } + + /** + * Validates that Path to chunk file exists. + * + * @param containerData - Container Data + * @param info - Chunk info + * @return - File. + * @throws StorageContainerException + */ + public static File getChunkFile(KeyValueContainerData containerData, + ChunkInfo info) throws + StorageContainerException { + + Preconditions.checkNotNull(containerData, "Container data can't be null"); + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + String chunksPath = containerData.getChunksPath(); + if (chunksPath == null) { + log.error("Chunks path is null in the container data"); + throw new StorageContainerException("Unable to get Chunks directory.", + UNABLE_TO_FIND_DATA_DIR); + } + File chunksLoc = new File(chunksPath); + if (!chunksLoc.exists()) { + log.error("Chunks path does not exist"); + throw new StorageContainerException("Unable to get Chunks directory.", + UNABLE_TO_FIND_DATA_DIR); + } + + return chunksLoc.toPath().resolve(info.getChunkName()).toFile(); + } + + /** + * Checks if we are getting a request to overwrite an existing range of + * chunk. + * + * @param chunkFile - File + * @param chunkInfo - Buffer to write + * @return bool + */ + public static boolean isOverWriteRequested(File chunkFile, ChunkInfo + chunkInfo) { + + if (!chunkFile.exists()) { + return false; + } + + long offset = chunkInfo.getOffset(); + return offset < chunkFile.length(); + } + + /** + * Overwrite is permitted if an only if the user explicitly asks for it. We + * permit this iff the key/value pair contains a flag called + * [OverWriteRequested, true]. + * + * @param chunkInfo - Chunk info + * @return true if the user asks for it. + */ + public static boolean isOverWritePermitted(ChunkInfo chunkInfo) { + String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE); + return (overWrite != null) && + (!overWrite.isEmpty()) && + (Boolean.valueOf(overWrite)); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java index 7d9f0e690a2..d45f5986b81 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java @@ -20,14 +20,19 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.utils.MetadataStore; import java.io.IOException; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.NO_SUCH_KEY; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.UNABLE_TO_READ_METADATA_DB; /** * Utils functions to help key functions. @@ -79,4 +84,32 @@ public final class KeyUtils { cache.removeDB(container.getContainerId()); } -} + /** + * Shutdown all DB Handles. + * + * @param cache - Cache for DB Handles. + */ + @SuppressWarnings("unchecked") + public static void shutdownCache(ContainerCache cache) { + cache.shutdownCache(); + } + + /** + * Parses the {@link KeyData} from a bytes array. + * + * @param bytes key data in bytes. + * @return key data. + * @throws IOException if the bytes array is malformed or invalid. + */ + public static KeyData getKeyData(byte[] bytes) throws IOException { + try { + ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom( + bytes); + KeyData data = KeyData.getFromProtoBuf(keyData); + return data; + } catch (IOException e) { + throw new StorageContainerException("Failed to parse key data from the" + + " bytes array.", NO_SUCH_KEY); + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java new file mode 100644 index 00000000000..7134be1d34d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.ozone.container.keyvalue.interfaces; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.interfaces.Container; + +/** + * Chunk Manager allows read, write, delete and listing of chunks in + * a container. + */ + +public interface ChunkManager { + + /** + * writes a given chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block. + * @param info - ChunkInfo. + * @param stage - Chunk Stage write. + * @throws StorageContainerException + */ + void writeChunk(Container container, BlockID blockID, ChunkInfo info, + byte[] data, ContainerProtos.Stage stage) + throws StorageContainerException; + + /** + * reads the data defined by a chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block. + * @param info - ChunkInfo. + * @return byte array + * @throws StorageContainerException + * + * TODO: Right now we do not support partial reads and writes of chunks. + * TODO: Explore if we need to do that for ozone. + */ + byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws + StorageContainerException; + + /** + * Deletes a given chunk. + * + * @param container - Container for the chunk + * @param blockID - ID of the block. + * @param info - Chunk Info + * @throws StorageContainerException + */ + void deleteChunk(Container container, BlockID blockID, ChunkInfo info) throws + StorageContainerException; + + // TODO : Support list operations. + + /** + * Shutdown the chunkManager. + */ + void shutdown(); + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java new file mode 100644 index 00000000000..ebda97eab2e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.interfaces; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; + +import java.io.IOException; +import java.util.List; + +/** + * KeyManager is for performing key related operations on the container. + */ +public interface KeyManager { + + /** + * Puts or overwrites a key. + * + * @param container - Container for which key need to be added. + * @param data - Key Data. + * @throws IOException + */ + void putKey(Container container, KeyData data) throws IOException; + + /** + * Gets an existing key. + * + * @param container - Container from which key need to be get. + * @param data - Key Data. + * @return Key Data. + * @throws IOException + */ + KeyData getKey(Container container, KeyData data) throws IOException; + + /** + * Deletes an existing Key. + * + * @param container - Container from which key need to be deleted. + * @param blockID - ID of the block. + * @throws StorageContainerException + */ + void deleteKey(Container container, BlockID blockID) throws IOException; + + /** + * List keys in a container. + * + * @param container - Container from which keys need to be listed. + * @param startLocalID - Key to start from, 0 to begin. + * @param count - Number of keys to return. + * @return List of Keys that match the criteria. + */ + List listKey(Container container, long startLocalID, int count) throws + IOException; + + /** + * Shutdown ContainerManager. + */ + void shutdown(); +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java new file mode 100644 index 00000000000..ca936c7941b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; + +/** + * This class is used to test ChunkManager operations. + */ +public class TestChunkManagerImpl { + + private OzoneConfiguration config; + private String scmId = UUID.randomUUID().toString(); + private VolumeSet volumeSet; + private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; + private KeyValueContainerData keyValueContainerData; + private KeyValueContainer keyValueContainer; + private KeyData keyData; + private BlockID blockID; + private ChunkManagerImpl chunkManager; + private ChunkInfo chunkInfo; + private byte[] data; + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + config = new OzoneConfiguration(); + + HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot() + .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID() + .toString()).build(); + + volumeSet = mock(VolumeSet.class); + + volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class); + Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) + .thenReturn(hddsVolume); + + keyValueContainerData = new KeyValueContainerData( + ContainerProtos.ContainerType.KeyValueContainer, 1L); + + keyValueContainer = new KeyValueContainer( + keyValueContainerData, config); + + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + data = "testing write chunks".getBytes(); + // Creating KeyData + blockID = new BlockID(1L, 1L); + keyData = new KeyData(blockID); + keyData.addMetadata("VOLUME", "ozone"); + keyData.addMetadata("OWNER", "hdfs"); + List chunkList = new LinkedList<>(); + chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, data.length); + chunkList.add(chunkInfo.getProtoBufMessage()); + keyData.setChunks(chunkList); + + // Create a ChunkManager object. + chunkManager = new ChunkManagerImpl(); + + } + + @Test + public void testWriteChunkStageWriteAndCommit() throws Exception { + //As in Setup, we try to create container, these paths should exist. + assertTrue(keyValueContainerData.getChunksPath() != null); + File chunksPath = new File(keyValueContainerData.getChunksPath()); + assertTrue(chunksPath.exists()); + // Initially chunks folder should be empty. + assertTrue(chunksPath.listFiles().length == 0); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.WRITE_DATA); + // Now a chunk file is being written with Stage WRITE_DATA, so it should + // create a temporary chunk file. + assertTrue(chunksPath.listFiles().length == 1); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMMIT_DATA); + // Old temp file should have been renamed to chunk file. + assertTrue(chunksPath.listFiles().length == 1); + + } + + @Test + public void testWriteChunkIncorrectLength() throws Exception { + try { + long randomLength = 200L; + chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, randomLength); + List chunkList = new LinkedList<>(); + chunkList.add(chunkInfo.getProtoBufMessage()); + keyData.setChunks(chunkList); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.WRITE_DATA); + fail("testWriteChunkIncorrectLength failed"); + } catch (StorageContainerException ex) { + GenericTestUtils.assertExceptionContains("data array does not match " + + "the length ", ex); + assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult()); + } + } + + @Test + public void testWriteChunkStageCombinedData() throws Exception { + //As in Setup, we try to create container, these paths should exist. + assertTrue(keyValueContainerData.getChunksPath() != null); + File chunksPath = new File(keyValueContainerData.getChunksPath()); + assertTrue(chunksPath.exists()); + // Initially chunks folder should be empty. + assertTrue(chunksPath.listFiles().length == 0); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMBINED); + // Now a chunk file is being written with Stage WRITE_DATA, so it should + // create a temporary chunk file. + assertTrue(chunksPath.listFiles().length == 1); + } + + @Test + public void testReadChunk() throws Exception { + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMBINED); + byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, + chunkInfo); + assertEquals(expectedData.length, data.length); + assertTrue(Arrays.equals(expectedData, data)); + } + + @Test + public void testDeleteChunk() throws Exception { + File chunksPath = new File(keyValueContainerData.getChunksPath()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMBINED); + assertTrue(chunksPath.listFiles().length == 1); + chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); + assertTrue(chunksPath.listFiles().length == 0); + } + + @Test + public void testDeleteChunkUnsupportedRequest() throws Exception { + try { + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMBINED); + long randomLength = 200L; + chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, randomLength); + List chunkList = new LinkedList<>(); + chunkList.add(chunkInfo.getProtoBufMessage()); + keyData.setChunks(chunkList); + chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); + fail("testDeleteChunkUnsupportedRequest"); + } catch (StorageContainerException ex) { + GenericTestUtils.assertExceptionContains("Not Supported Operation.", ex); + assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult()); + } + } + + @Test + public void testWriteChunkChecksumMismatch() throws Exception { + try { + chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, data.length); + //Setting checksum to some value. + chunkInfo.setChecksum("some garbage"); + List chunkList = new LinkedList<>(); + chunkList.add(chunkInfo.getProtoBufMessage()); + keyData.setChunks(chunkList); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + ContainerProtos.Stage.COMBINED); + fail("testWriteChunkChecksumMismatch failed"); + } catch (StorageContainerException ex) { + GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex); + assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult()); + } + } + + @Test + public void testReadChunkFileNotExists() throws Exception { + try { + // trying to read a chunk, where chunk file does not exist + byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, + chunkInfo); + fail("testReadChunkFileNotExists failed"); + } catch (StorageContainerException ex) { + GenericTestUtils.assertExceptionContains("Unable to find the chunk " + + "file.", ex); + assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult()); + } + } + + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java new file mode 100644 index 00000000000..a6f50c45b3e --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.volume + .RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; + +/** + * This class is used to test key related operations on the container. + */ +public class TestKeyManagerImpl { + + private OzoneConfiguration config; + private String scmId = UUID.randomUUID().toString(); + private VolumeSet volumeSet; + private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; + private KeyValueContainerData keyValueContainerData; + private KeyValueContainer keyValueContainer; + private KeyData keyData; + private KeyManagerImpl keyValueContainerManager; + private BlockID blockID; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + + @Before + public void setUp() throws Exception { + config = new OzoneConfiguration(); + + HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot() + .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID() + .toString()).build(); + + volumeSet = mock(VolumeSet.class); + + volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class); + Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) + .thenReturn(hddsVolume); + + keyValueContainerData = new KeyValueContainerData( + ContainerProtos.ContainerType.KeyValueContainer, 1L); + + keyValueContainer = new KeyValueContainer( + keyValueContainerData, config); + + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + // Creating KeyData + blockID = new BlockID(1L, 1L); + keyData = new KeyData(blockID); + keyData.addMetadata("VOLUME", "ozone"); + keyData.addMetadata("OWNER", "hdfs"); + List chunkList = new LinkedList<>(); + ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, 1024); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + + // Create KeyValueContainerManager + keyValueContainerManager = new KeyManagerImpl(config); + + } + + @Test + public void testPutAndGetKey() throws Exception { + //Put Key + keyValueContainerManager.putKey(keyValueContainer, keyData); + + //Get Key + KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer, + keyData); + + assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID()); + assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID()); + assertEquals(keyData.getChunks().size(), fromGetKeyData.getChunks().size()); + assertEquals(keyData.getMetadata().size(), fromGetKeyData.getMetadata() + .size()); + + } + + + @Test + public void testDeleteKey() throws Exception { + try { + //Put Key + keyValueContainerManager.putKey(keyValueContainer, keyData); + //Delete Key + keyValueContainerManager.deleteKey(keyValueContainer, blockID); + } catch (IOException ex) { + fail("testDeleteKey failed"); + } + } + + @Test + public void testListKey() throws Exception { + try { + keyValueContainerManager.putKey(keyValueContainer, keyData); + List listKeyData = keyValueContainerManager.listKey( + keyValueContainer, 1, 10); + assertNotNull(listKeyData); + assertTrue(listKeyData.size() == 1); + + for (long i = 2; i <= 10; i++) { + blockID = new BlockID(1L, i); + keyData = new KeyData(blockID); + keyData.addMetadata("VOLUME", "ozone"); + keyData.addMetadata("OWNER", "hdfs"); + List chunkList = new LinkedList<>(); + ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID + .getLocalID(), 0), 0, 1024); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + keyValueContainerManager.putKey(keyValueContainer, keyData); + } + + listKeyData = keyValueContainerManager.listKey( + keyValueContainer, 1, 10); + assertNotNull(listKeyData); + assertTrue(listKeyData.size() == 10); + + } catch (IOException ex) { + fail("testListKey failed"); + } + } + + @Test + public void testGetNoSuchKey() throws Exception { + try { + keyData = new KeyData(new BlockID(1L, 2L)); + keyValueContainerManager.getKey(keyValueContainer, keyData); + fail("testGetNoSuchKey failed"); + } catch (StorageContainerException ex) { + GenericTestUtils.assertExceptionContains("Unable to find the key.", ex); + assertEquals(ContainerProtos.Result.NO_SUCH_KEY, ex.getResult()); + } + } +}