diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 9e528535abb..bebbb782065 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -65,8 +65,8 @@ public final class OzoneConsts { public static final String CONTAINER_ROOT_PREFIX = "repository"; public static final String CONTAINER_DB = "container.db"; - - + public static final String FILE_HASH = "SHA-256"; + public final static String CHUNK_OVERWRITE = "OverWriteRequested"; /** * Supports Bucket Versioning. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java new file mode 100644 index 00000000000..7a3d4494d7d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * Java class that represents ChunkInfo ProtoBuf class. This helper class allows + * us to convert to and from protobuf to normal java. + */ +public class ChunkInfo { + private final String chunkName; + private final long offset; + private final long len; + private String checksum; + private final Map metadata; + + + /** + * Constructs a ChunkInfo. + * + * @param chunkName - File Name where chunk lives. + * @param offset - offset where Chunk Starts. + * @param len - Length of the Chunk. + */ + public ChunkInfo(String chunkName, long offset, long len) { + this.chunkName = chunkName; + this.offset = offset; + this.len = len; + this.metadata = new TreeMap<>(); + } + + /** + * Adds metadata. + * + * @param key - Key Name. + * @param value - Value. + * @throws IOException + */ + public void addMetadata(String key, String value) throws IOException { + synchronized (this.metadata) { + if (this.metadata.containsKey(key)) { + throw new IOException("This key already exists. Key " + key); + } + metadata.put(key, value); + } + } + + /** + * Gets a Chunkinfo class from the protobuf definitions. + * + * @param info - Protobuf class + * @return ChunkInfo + * @throws IOException + */ + public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info) + throws IOException { + Preconditions.checkNotNull(info); + + ChunkInfo chunkInfo = new ChunkInfo(info.getChunkName(), info.getOffset(), + info.getLen()); + + for (int x = 0; x < info.getMetadataCount(); x++) { + chunkInfo.addMetadata(info.getMetadata(x).getKey(), + info.getMetadata(x).getValue()); + } + + + if (info.hasChecksum()) { + chunkInfo.setChecksum(info.getChecksum()); + } + return chunkInfo; + } + + /** + * Returns a ProtoBuf Message from ChunkInfo. + * + * @return Protocol Buffer Message + */ + public ContainerProtos.ChunkInfo getProtoBufMessage() { + ContainerProtos.ChunkInfo.Builder builder = ContainerProtos + .ChunkInfo.newBuilder(); + + builder.setChunkName(this.getChunkName()); + builder.setOffset(this.getOffset()); + builder.setLen(this.getLen()); + if (this.getChecksum() != null && !this.getChecksum().isEmpty()) { + builder.setChecksum(this.getChecksum()); + } + + for (Map.Entry entry : metadata.entrySet()) { + ContainerProtos.KeyValue.Builder keyValBuilder = + ContainerProtos.KeyValue.newBuilder(); + builder.addMetadata(keyValBuilder.setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + + return builder.build(); + } + + /** + * Returns the chunkName. + * + * @return - String + */ + public String getChunkName() { + return chunkName; + } + + /** + * Gets the start offset of the given chunk in physical file. + * + * @return - long + */ + public long getOffset() { + return offset; + } + + /** + * Returns the length of the Chunk. + * + * @return long + */ + public long getLen() { + return len; + } + + /** + * Returns the SHA256 value of this chunk. + * + * @return - Hash String + */ + public String getChecksum() { + return checksum; + } + + /** + * Sets the Hash value of this chunk. + * + * @param checksum - Hash String. + */ + public void setChecksum(String checksum) { + this.checksum = checksum; + } + + /** + * Returns Metadata associated with this Chunk. + * + * @return - Map of Key,values. + */ + public Map getMetadata() { + return metadata; + } + + @Override + public String toString() { + return "ChunkInfo{" + + "chunkName='" + chunkName + + ", offset=" + offset + + ", len=" + len + + '}'; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java new file mode 100644 index 00000000000..03370ac4b8e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +/** + * Set of utility functions used by the chunk Manager. + */ +public final class ChunkUtils { + + /* Never constructed. */ + private ChunkUtils() { + } + + /** + * Checks if we are getting a request to overwrite an existing range of + * chunk. + * + * @param chunkFile - File + * @param chunkInfo - Buffer to write + * @return bool + */ + public static boolean isOverWriteRequested(File chunkFile, ChunkInfo + chunkInfo) { + + if (!chunkFile.exists()) { + return false; + } + + long offset = chunkInfo.getOffset(); + return offset < chunkFile.length(); + } + + /** + * Overwrite is permitted if an only if the user explicitly asks for it. We + * permit this iff the key/value pair contains a flag called + * [OverWriteRequested, true]. + * + * @param chunkInfo - Chunk info + * @return true if the user asks for it. + */ + public static boolean isOverWritePermitted(ChunkInfo chunkInfo) { + String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE); + return (overWrite != null) && + (!overWrite.isEmpty()) && + (Boolean.valueOf(overWrite)); + } + + /** + * Validates chunk data and returns a file object to Chunk File that we are + * expected to write data to. + * + * @param pipeline - pipeline. + * @param data - container data. + * @param info - chunk info. + * @return File + * @throws IOException + */ + public static File validateChunk(Pipeline pipeline, ContainerData data, + ChunkInfo info) throws IOException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + File chunkFile = getChunkFile(pipeline, data, info); + if (ChunkUtils.isOverWriteRequested(chunkFile, info)) { + if (!ChunkUtils.isOverWritePermitted(info)) { + log.error("Rejecting write chunk request. Chunk overwrite " + + "without explicit request. {}", info.toString()); + throw new IOException("Rejecting write chunk request. OverWrite " + + "flag required." + info.toString()); + } + } + return chunkFile; + } + + /** + * Validates that Path to chunk file exists. + * + * @param pipeline - Container Info. + * @param data - Container Data + * @param info - Chunk info + * @return - File. + * @throws IOException + */ + public static File getChunkFile(Pipeline pipeline, ContainerData data, + ChunkInfo info) throws IOException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + if (data == null) { + log.error("Invalid container Name: {}", pipeline.getContainerName()); + throw new IOException("Unable to find the container Name: " + + pipeline.getContainerName()); + } + + File dataDir = ContainerUtils.getDataDirectory(data).toFile(); + if (!dataDir.exists()) { + log.error("Unable to find the data directory: {}", dataDir); + throw new IOException("Unable to find the data directory: " + dataDir); + } + + return dataDir.toPath().resolve(info.getChunkName()).toFile(); + + } + + /** + * Writes the data in chunk Info to the specified location in the chunkfile. + * + * @param chunkFile - File to write data to. + * @param chunkInfo - Data stream to write. + * @throws IOException + */ + public static void writeData(File chunkFile, ChunkInfo chunkInfo, + byte[] data) + throws IOException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + if (data.length != chunkInfo.getLen()) { + String err = String.format("data array does not match the length " + + "specified. DataLen: %d Byte Array: %d", + chunkInfo.getLen(), data.length); + log.error(err); + throw new IOException(err); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.SPARSE, + StandardOpenOption.SYNC); + lock = file.lock().get(); + if (!chunkInfo.getChecksum().isEmpty()) { + verifyChecksum(chunkInfo, data, log); + } + int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); + if(size != data.length) { + log.error("Invalid write size found. Size:{} Expected: {} " , size, + data.length); + throw new IOException("Invalid write size found. Size: " + size + + " Expected: " + data.length); + } + } finally { + if (lock != null) { + lock.release(); + } + if (file != null) { + IOUtils.closeStream(file); + } + } + } + + /** + * Verifies the checksum of a chunk against the data buffer. + * + * @param chunkInfo - Chunk Info. + * @param data - data buffer + * @param log - log + * @throws NoSuchAlgorithmException + * @throws IOException + */ + private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger + log) throws NoSuchAlgorithmException, IOException { + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha.update(data); + if (!Hex.encodeHexString(sha.digest()).equals( + chunkInfo.getChecksum())) { + log.error("Checksum mismatch. Provided: {} , computed: {}", + chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest())); + throw new IOException("Checksum mismatch. Provided: " + + chunkInfo.getChecksum() + " , computed: " + + DigestUtils.sha256Hex(sha.digest())); + } + } + + /** + * Reads data from an existing chunk file. + * + * @param chunkFile - file where data lives. + * @param data - chunk defintion. + * @return ByteBuffer + * @throws IOException + * @throws ExecutionException + * @throws InterruptedException + */ + public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws + IOException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + if (!chunkFile.exists()) { + log.error("Unable to find the chunk file. chunk info : {}", + data.toString()); + throw new IOException("Unable to find the chunk file. chunk info " + + data.toString()); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.READ); + lock = file.lock(data.getOffset(), data.getLen(), true).get(); + + ByteBuffer buf = ByteBuffer.allocate((int) data.getLen()); + file.read(buf, data.getOffset()).get(); + + if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { + verifyChecksum(data, buf.array(), log); + } + + return buf; + } finally { + if (lock != null) { + lock.release(); + } + if (file != null) { + IOUtils.closeStream(file); + } + } + } + + /** + * Returns a CreateContainer Response. This call is used by create and delete + * containers which have null success responses. + * + * @param msg Request + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) { + return ContainerUtils.getContainerResponse(msg); + } + + /** + * Gets a response to the read chunk calls. + * @param msg - Msg + * @param data - Data + * @param info - Info + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg, + byte[] data, ChunkInfo info) { + Preconditions.checkNotNull(msg); + + ContainerProtos.ReadChunkReponseProto.Builder response = + ContainerProtos.ReadChunkReponseProto.newBuilder(); + response.setChunkData(info.getProtoBufMessage()); + response.setData(ByteString.copyFrom(data)); + response.setPipeline(msg.getReadChunk().getPipeline()); + + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setReadChunk(response); + return builder.build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index 79e9aeb6cd0..9300d06921c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -59,7 +59,8 @@ public final class ContainerUtils { /** * Returns a ReadContainer Response. * - * @param msg Request + * @param msg Request + * @param containerData - data * @return Response. */ public static ContainerProtos.ContainerCommandResponseProto @@ -81,7 +82,9 @@ public final class ContainerUtils { * We found a command type but no associated payload for the command. Hence * return malformed Command as response. * - * @param msg - Protobuf message. + * @param msg - Protobuf message. + * @param result - result + * @param message - Error message. * @return ContainerCommandResponseProto - MALFORMED_REQUEST. */ public static ContainerProtos.ContainerCommandResponseProto.Builder @@ -185,23 +188,96 @@ public final class ContainerUtils { * @throws IOException */ public static Path createMetadata(Path containerPath) throws IOException { + Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class); Preconditions.checkNotNull(containerPath); - containerPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH); - if (!containerPath.toFile().mkdirs()) { + Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH); + if (!metadataPath.toFile().mkdirs()) { + log.error("Unable to create directory for metadata storage. Path: {}", + metadataPath); throw new IOException("Unable to create directory for metadata storage." + - " Path {}" + containerPath); + " Path: " + metadataPath); } - containerPath = containerPath.resolve(OzoneConsts.CONTAINER_DB); - LevelDBStore store = new LevelDBStore(containerPath.toFile(), true); + LevelDBStore store = + new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB) + .toFile(), true); // we close since the SCM pre-creates containers. // we will open and put Db handle into a cache when keys are being created // in a container. store.close(); - return containerPath; + + Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH); + if (!dataPath.toFile().mkdirs()) { + + // If we failed to create data directory, we cleanup the + // metadata directory completely. That is, we will delete the + // whole directory including LevelDB file. + log.error("Unable to create directory for data storage. cleaning up the" + + " container path: {} dataPath: {}", + containerPath, dataPath); + FileUtils.deleteDirectory(containerPath.toFile()); + throw new IOException("Unable to create directory for data storage." + + " Path: " + dataPath); + } + return metadataPath; } + /** + * Returns Metadata location. + * + * @param containerData - Data + * @param location - Path + * @return Path + */ + public static File getMetadataFile(ContainerData containerData, + Path location) { + return location.resolve(containerData + .getContainerName().concat(CONTAINER_META)) + .toFile(); + } + + /** + * Returns container file location. + * @param containerData - Data + * @param location - Root path + * @return Path + */ + public static File getContainerFile(ContainerData containerData, + Path location) { + return location.resolve(containerData + .getContainerName().concat(CONTAINER_EXTENSION)) + .toFile(); + } + + /** + * Container metadata directory -- here is where the level DB lives. + * @param cData - cData. + * @return Path to the parent directory where the DB lives. + */ + public static Path getMetadataDirectory(ContainerData cData) { + Path dbPath = Paths.get(cData.getDBPath()); + Preconditions.checkNotNull(dbPath); + Preconditions.checkState(dbPath.toString().length() > 0); + return dbPath.getParent(); + } + + /** + * Returns the path where data or chunks live for a given container. + * @param cData - cData container + * @return - Path + */ + public static Path getDataDirectory(ContainerData cData) throws IOException { + Path path = getMetadataDirectory(cData); + Preconditions.checkNotNull(path); + path = path.getParent(); + if(path == null) { + throw new IOException("Unable to get Data directory. null path found"); + } + return path.resolve(OzoneConsts.CONTAINER_DATA_PATH); + } + + /** * remove Container if it is empty. *

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java index 140341ca0c3..c1ec48d41bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java @@ -58,9 +58,8 @@ public class Pipeline { for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) { newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID)); } - if (pipeline.hasContainerName()) { - newPipeline.containerName = newPipeline.getContainerName(); - } + + newPipeline.setContainerName(pipeline.getContainerName()); return newPipeline; } @@ -105,9 +104,7 @@ public class Pipeline { builder.addMembers(datanode.getProtoBufMessage()); } builder.setLeaderID(leaderID); - if (this.containerName != null) { - builder.setContainerName(this.containerName); - } + builder.setContainerName(this.containerName); return builder.build(); } @@ -128,5 +125,4 @@ public class Pipeline { public void setContainerName(String containerName) { this.containerName = containerName; } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java index fe7e37a2452..21f31e1bbe3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java @@ -17,5 +17,6 @@ */ package org.apache.hadoop.ozone.container.common.helpers; /** - Contains protocol buffer helper classes. + Contains protocol buffer helper classes and utilites used in + impl. **/ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java new file mode 100644 index 00000000000..090d659dd32 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +/** + * An implementation of ChunkManager that is used by default in ozone. + */ +public class ChunkManagerImpl implements ChunkManager { + static final Logger LOG = + LoggerFactory.getLogger(ChunkManagerImpl.class); + + private final ContainerManager containerManager; + + /** + * Constructs a ChunkManager. + * + * @param manager - ContainerManager. + */ + public ChunkManagerImpl(ContainerManager manager) { + this.containerManager = manager; + } + + /** + * writes a given chunk. + * + * @param pipeline - Name and the set of machines that make this container. + * @param keyName - Name of the Key. + * @param info - ChunkInfo. + * @throws IOException + */ + @Override + public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, + byte[] data) + throws IOException { + + // we don't want container manager to go away while we are writing chunks. + containerManager.readLock(); + + // TODO : Take keyManager Write lock here. + try { + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(pipeline.getContainerName()); + File chunkFile = ChunkUtils.validateChunk(pipeline, + containerManager.readContainer(pipeline.getContainerName()), info); + ChunkUtils.writeData(chunkFile, info, data); + + } catch (ExecutionException | + NoSuchAlgorithmException e) { + LOG.error("write data failed. error: {}", e); + throw new IOException("Internal error: ", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("write data failed. error: {}", e); + throw new IOException("Internal error: ", e); + } finally { + containerManager.readUnlock(); + } + } + + /** + * reads the data defined by a chunk. + * + * @param pipeline - container pipeline. + * @param keyName - Name of the Key + * @param info - ChunkInfo. + * @return byte array + * @throws IOException TODO: Right now we do not support partial reads and + * writes of chunks. TODO: Explore if we need to do that + * for ozone. + */ + @Override + public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) + throws IOException { + containerManager.readLock(); + try { + File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager + .readContainer(pipeline.getContainerName()), info); + return ChunkUtils.readData(chunkFile, info).array(); + } catch (ExecutionException | NoSuchAlgorithmException e) { + LOG.error("read data failed. error: {}", e); + throw new IOException("Internal error: ", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("read data failed. error: {}", e); + throw new IOException("Internal error: ", e); + } finally { + containerManager.readUnlock(); + } + } + + /** + * Deletes a given chunk. + * + * @param pipeline - Pipeline. + * @param keyName - Key Name + * @param info - Chunk Info + * @throws IOException + */ + @Override + public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) + throws IOException { + + containerManager.readLock(); + try { + File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager + .readContainer(pipeline.getContainerName()), info); + if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { + FileUtil.fullyDelete(chunkFile); + } else { + LOG.error("Not Supported Operation. Trying to delete a " + + "chunk that is in shared file. chunk info : " + info.toString()); + throw new IOException("Not Supported Operation. Trying to delete a " + + "chunk that is in shared file. chunk info : " + info.toString()); + } + } finally { + containerManager.readUnlock(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 599b2f28ead..1d6a6958d89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -26,9 +26,11 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.slf4j.Logger; @@ -69,6 +71,7 @@ public class ContainerManagerImpl implements ContainerManager { // for waiting threads. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private ContainerLocationManager locationManager; + private ChunkManager chunkManager; /** * Init call that sets up a container Manager. @@ -141,7 +144,7 @@ public class ContainerManagerImpl implements ContainerManager { metaStream = new FileInputStream(metaFileName); - MessageDigest sha = MessageDigest.getInstance("SHA-256"); + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); dis = new DigestInputStream(containerStream, sha); @@ -238,13 +241,9 @@ public class ContainerManagerImpl implements ContainerManager { FileOutputStream metaStream = null; Path location = locationManager.getContainerPath(); - File containerFile = location.resolve(containerData - .getContainerName().concat(CONTAINER_EXTENSION)) - .toFile(); - - File metadataFile = location.resolve(containerData - .getContainerName().concat(CONTAINER_META)) - .toFile(); + File containerFile = ContainerUtils.getContainerFile(containerData, + location); + File metadataFile = ContainerUtils.getMetadataFile(containerData, location); try { ContainerUtils.verifyIsNewContainer(containerFile, metadataFile); @@ -255,10 +254,11 @@ public class ContainerManagerImpl implements ContainerManager { containerStream = new FileOutputStream(containerFile); metaStream = new FileOutputStream(metadataFile); - MessageDigest sha = MessageDigest.getInstance("SHA-256"); + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); dos = new DigestOutputStream(containerStream, sha); - containerData.setDBPath(metadataPath.toString()); + containerData.setDBPath(metadataPath.resolve(OzoneConsts.CONTAINER_DB) + .toString()); containerData.setContainerPath(containerFile.toString()); ContainerProtos.ContainerData protoData = containerData @@ -293,6 +293,8 @@ public class ContainerManagerImpl implements ContainerManager { } } + + /** * Deletes an existing container. * @@ -368,6 +370,10 @@ public class ContainerManagerImpl implements ContainerManager { */ @Override public ContainerData readContainer(String containerName) throws IOException { + if(!containerMap.containsKey(containerName)) { + throw new IOException("Unable to find the container. Name: " + + containerName); + } return containerMap.get(containerName).getContainer(); } @@ -446,6 +452,18 @@ public class ContainerManagerImpl implements ContainerManager { return this.lock.writeLock().isHeldByCurrentThread(); } + /** + * Sets the chunk Manager. + * @param chunkManager + */ + public void setChunkManager(ChunkManager chunkManager) { + this.chunkManager = chunkManager; + } + + public ChunkManager getChunkManager() { + return this.chunkManager; + } + /** * Filter out only container files from the container metadata dir. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index 7a45557684b..d39b7096010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; @@ -68,6 +70,12 @@ public class Dispatcher implements ContainerDispatcher { } + if ((cmdType == Type.WriteChunk) || + (cmdType == Type.ReadChunk) || + (cmdType == Type.DeleteChunk)) { + return chunkProcessHandler(msg); + } + return ContainerUtils.unsupportedRequest(msg); } @@ -81,28 +89,66 @@ public class Dispatcher implements ContainerDispatcher { private ContainerCommandResponseProto containerProcessHandler( ContainerCommandRequestProto msg) throws IOException { try { - ContainerData cData = ContainerData.getFromProtBuf( - msg.getCreateContainer().getContainerData()); - - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getCreateContainer().getPipeline()); - Preconditions.checkNotNull(pipeline); switch (msg.getCmdType()) { case CreateContainer: - return handleCreateContainer(msg, cData, pipeline); + return handleCreateContainer(msg); case DeleteContainer: - return handleDeleteContainer(msg, cData, pipeline); + return handleDeleteContainer(msg); case ListContainer: + // TODO : Support List Container. return ContainerUtils.unsupportedRequest(msg); case UpdateContainer: + // TODO : Support Update Container. return ContainerUtils.unsupportedRequest(msg); case ReadContainer: - return handleReadContainer(msg, cData); + return handleReadContainer(msg); + + default: + return ContainerUtils.unsupportedRequest(msg); + } + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString()); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** + * Handles the all chunk related functionality. + * + * @param msg - command + * @return - response + * @throws IOException + */ + private ContainerCommandResponseProto chunkProcessHandler( + ContainerCommandRequestProto msg) throws IOException { + try { + + switch (msg.getCmdType()) { + case WriteChunk: + return handleWriteChunk(msg); + + case ReadChunk: + return handleReadChunk(msg); + + case DeleteChunk: + return handleDeleteChunk(msg); + + case ListChunk: + return ContainerUtils.unsupportedRequest(msg); default: return ContainerUtils.unsupportedRequest(msg); @@ -125,13 +171,12 @@ public class Dispatcher implements ContainerDispatcher { /** * Calls into container logic and returns appropriate response. * - * @param msg - Request - * @param cData - Container Data object + * @param msg - Request * @return ContainerCommandResponseProto * @throws IOException */ private ContainerCommandResponseProto handleReadContainer( - ContainerCommandRequestProto msg, ContainerData cData) + ContainerCommandRequestProto msg) throws IOException { if (!msg.hasReadContainer()) { @@ -139,51 +184,147 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - ContainerData container = this.containerManager.readContainer( - cData.getContainerName()); + + String name = msg.getReadContainer().getName(); + ContainerData container = this.containerManager.readContainer(name); return ContainerUtils.getReadContainerResponse(msg, container); } /** * Calls into container logic and returns appropriate response. * - * @param msg - Request - * @param cData - ContainerData - * @param pipeline - Pipeline is the machines where this container lives. + * @param msg - Request * @return Response. * @throws IOException */ private ContainerCommandResponseProto handleDeleteContainer( - ContainerCommandRequestProto msg, ContainerData cData, - Pipeline pipeline) throws IOException { + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasDeleteContainer()) { LOG.debug("Malformed delete container request. trace ID: {}", msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - this.containerManager.deleteContainer(pipeline, - cData.getContainerName()); + + String name = msg.getDeleteContainer().getName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getDeleteContainer().getPipeline()); + Preconditions.checkNotNull(pipeline); + + this.containerManager.deleteContainer(pipeline, name); return ContainerUtils.getContainerResponse(msg); } /** * Calls into container logic and returns appropriate response. * - * @param msg - Request - * @param cData - ContainerData - * @param pipeline - Pipeline is the machines where this container lives. + * @param msg - Request * @return Response. * @throws IOException */ private ContainerCommandResponseProto handleCreateContainer( - ContainerCommandRequestProto msg, ContainerData cData, - Pipeline pipeline) throws IOException { + ContainerCommandRequestProto msg) throws IOException { if (!msg.hasCreateContainer()) { LOG.debug("Malformed create container request. trace ID: {}", msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } + ContainerData cData = ContainerData.getFromProtBuf( + msg.getCreateContainer().getContainerData()); + Preconditions.checkNotNull(cData); + + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getCreateContainer().getPipeline()); + Preconditions.checkNotNull(pipeline); + this.containerManager.createContainer(pipeline, cData); return ContainerUtils.getContainerResponse(msg); } + + /** + * Calls into chunk manager to write a chunk. + * + * @param msg - Request. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleWriteChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasWriteChunk()) { + LOG.debug("Malformed write chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String keyName = msg.getWriteChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getWriteChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + byte[] data = msg.getWriteChunk().getData().toByteArray(); + this.containerManager.getChunkManager().writeChunk(pipeline, keyName, + chunkInfo, data); + return ChunkUtils.getChunkResponse(msg); + } + + /** + * Calls into chunk manager to read a chunk. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleReadChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasReadChunk()) { + LOG.debug("Malformed read chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String keyName = msg.getReadChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getReadChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + byte[] data = this.containerManager.getChunkManager().readChunk(pipeline, + keyName, chunkInfo); + return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo); + } + + /** + * Calls into chunk manager to write a chunk. + * + * @param msg - Request. + * @return Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteChunk( + ContainerCommandRequestProto msg) throws IOException { + if (!msg.hasDeleteChunk()) { + LOG.debug("Malformed delete chunk request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + String keyName = msg.getDeleteChunk().getKeyName(); + Pipeline pipeline = Pipeline.getFromProtoBuf( + msg.getDeleteChunk().getPipeline()); + Preconditions.checkNotNull(pipeline); + + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk() + .getChunkData()); + Preconditions.checkNotNull(chunkInfo); + + this.containerManager.getChunkManager().deleteChunk(pipeline, keyName, + chunkInfo); + return ChunkUtils.getChunkResponse(msg); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java new file mode 100644 index 00000000000..19af45227a2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; + +import java.io.IOException; + +/** + * Chunk Manager allows read, write, delete and listing of chunks in + * a container. + */ +public interface ChunkManager { + + /** + * writes a given chunk. + * @param pipeline - Name and the set of machines that make this container. + * @param keyName - Name of the Key. + * @param info - ChunkInfo. + * @throws IOException + */ + void writeChunk(Pipeline pipeline, String keyName, + ChunkInfo info, byte[] data) throws IOException; + + /** + * reads the data defined by a chunk. + * @param pipeline - container pipeline. + * @param keyName - Name of the Key + * @param info - ChunkInfo. + * @return byte array + * @throws IOException + * + * TODO: Right now we do not support partial reads and writes of chunks. + * TODO: Explore if we need to do that for ozone. + */ + byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws + IOException; + + /** + * Deletes a given chunk. + * @param pipeline - Pipeline. + * @param keyName - Key Name + * @param info - Chunk Info + * @throws IOException + */ + void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws + IOException; + + // TODO : Support list operations. + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index a0ddcd9c250..eba5d9a335f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -97,4 +97,17 @@ public interface ContainerManager extends RwLock { */ void shutdown() throws IOException; + /** + * Sets the Chunk Manager. + * @param chunkManager - ChunkManager. + */ + void setChunkManager(ChunkManager chunkManager); + + /** + * Gets the Chunk Manager. + * @return ChunkManager. + */ + ChunkManager getChunkManager(); + + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index cc85a240fd5..b793f479e42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -21,8 +21,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; import org.apache.hadoop.ozone.container.common.impl.Dispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; @@ -48,6 +50,7 @@ public class OzoneContainer { private final ContainerDispatcher dispatcher; private final ContainerManager manager; private final XceiverServer server; + private final ChunkManager chunkManager; /** * Creates a network endpoint and enables Ozone container. @@ -74,6 +77,8 @@ public class OzoneContainer { manager = new ContainerManagerImpl(); manager.init(this.ozoneConfig, locations, this.dataSet); + this.chunkManager = new ChunkManagerImpl(manager); + manager.setChunkManager(this.chunkManager); this.dispatcher = new Dispatcher(manager); server = new XceiverServer(this.ozoneConfig, this.dispatcher); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto index 38a378fb0cd..4bb53e16ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto @@ -157,7 +157,7 @@ message ContainerCommandResponseProto { message Pipeline { required string leaderID = 1; repeated DatanodeIDProto members = 2; - optional string containerName = 3; + required string containerName = 3; } message KeyValue { @@ -244,7 +244,7 @@ message ReadKeyRequestProto { message ReadKeyResponeProto { repeated KeyValue metadata = 1; - repeated chunkInfo chunkData = 2; + repeated ChunkInfo chunkData = 2; } message UpdateKeyRequestProto { @@ -276,19 +276,19 @@ message ListKeyResponeProto { // Chunk Operations -message chunkInfo { - required uint64 offset = 1; - required uint64 len = 2; - optional uint64 checksum = 3; - repeated KeyValue metadata = 4; +message ChunkInfo { + required string chunkName = 1; + required uint64 offset = 2; + required uint64 len = 3; + optional string checksum = 4; + repeated KeyValue metadata = 5; } message WriteChunkRequestProto { required Pipeline pipeline = 1; - required string containerName = 2; - required string keyName = 3; - required chunkInfo chunkData = 4; - repeated bytes data = 5; + required string keyName = 2; + required ChunkInfo chunkData = 3; + required bytes data = 4; } message WriteChunkReponseProto { @@ -296,22 +296,20 @@ message WriteChunkReponseProto { message ReadChunkRequestProto { required Pipeline pipeline = 1; - required string containerName = 2; - required string keyName = 3; - required chunkInfo chunkData = 4; + required string keyName = 2; + required ChunkInfo chunkData = 3; } message ReadChunkReponseProto { required Pipeline pipeline = 1; - required chunkInfo chunkData = 2; - repeated bytes data = 3; + required ChunkInfo chunkData = 2; + required bytes data = 3; } message DeleteChunkRequestProto { required Pipeline pipeline = 1; - required string containerName = 2; - required string keyName = 3; - required chunkInfo chunkData = 4; + required string keyName = 2; + required ChunkInfo chunkData = 3; } message DeleteChunkResponseProto { @@ -319,13 +317,12 @@ message DeleteChunkResponseProto { message ListChunkRequestProto { required Pipeline pipeline = 1; - required string containerName = 2; - required string keyName = 3; - required uint64 startOffset = 4; - required uint32 count = 5; + required string keyName = 2; + required string prevChunkName = 3; + required uint32 count = 4; } message ListChunkResponseProto { - repeated chunkInfo chunkData = 1; + repeated ChunkInfo chunkData = 1; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 0d1b26932a7..bef290c7b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -18,22 +18,32 @@ package org.apache.hadoop.ozone.container; +import com.google.protobuf.ByteString; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOExceptionWithCause; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import java.io.IOException; import java.net.ServerSocket; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Random; import java.util.UUID; /** * Helpers for container tests. */ public class ContainerTestHelper { + private static Random r = new Random(); /** * Create a pipeline with single node replica. @@ -41,7 +51,8 @@ public class ContainerTestHelper { * @return Pipeline with single node in it. * @throws IOException */ - public static Pipeline createSingleNodePipeline() throws IOException { + public static Pipeline createSingleNodePipeline(String containerName) throws + IOException { ServerSocket socket = new ServerSocket(0); int port = socket.getLocalPort(); DatanodeID datanodeID = new DatanodeID(socket.getInetAddress() @@ -50,26 +61,160 @@ public class ContainerTestHelper { datanodeID.setContainerPort(port); Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid()); pipeline.addMember(datanodeID); + pipeline.setContainerName(containerName); socket.close(); return pipeline; } + /** + * Creates a ChunkInfo for testing. + * + * @param keyName - Name of the key + * @param seqNo - Chunk number. + * @return ChunkInfo + * @throws IOException + */ + public static ChunkInfo getChunk(String keyName, int seqNo, long offset, + long len) throws IOException { + + ChunkInfo info = new ChunkInfo(String.format("%s.data.%d", keyName, + seqNo), offset, len); + return info; + } + + /** + * Generates some data of the requested len. + * + * @param len - Number of bytes. + * @return byte array with valid data. + */ + public static byte[] getData(int len) { + byte[] data = new byte[len]; + r.nextBytes(data); + return data; + } + + /** + * Computes the hash and sets the value correctly. + * + * @param info - chunk info. + * @param data - data array + * @throws NoSuchAlgorithmException + */ + public static void setDataChecksum(ChunkInfo info, byte[] data) + throws NoSuchAlgorithmException { + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha.update(data); + info.setChecksum(Hex.encodeHexString(sha.digest())); + } + + /** + * Returns a writeChunk Request. + * + * @param containerName - Name + * @param keyName - Name + * @param datalen - data len. + * @return Request. + * @throws IOException + * @throws NoSuchAlgorithmException + */ + public static ContainerCommandRequestProto getWriteChunkRequest( + Pipeline pipeline, String containerName, String keyName, int datalen) + throws + IOException, NoSuchAlgorithmException { + ContainerProtos.WriteChunkRequestProto.Builder writeRequest = + ContainerProtos.WriteChunkRequestProto + .newBuilder(); + + pipeline.setContainerName(containerName); + writeRequest.setPipeline(pipeline.getProtobufMessage()); + writeRequest.setKeyName(keyName); + + byte[] data = getData(datalen); + ChunkInfo info = getChunk(keyName, 0, 0, datalen); + setDataChecksum(info, data); + + writeRequest.setChunkData(info.getProtoBufMessage()); + writeRequest.setData(ByteString.copyFrom(data)); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.WriteChunk); + request.setWriteChunk(writeRequest); + return request.build(); + } + + /** + * Returns a read Request. + * + * @param request writeChunkRequest. + * @return Request. + * @throws IOException + * @throws NoSuchAlgorithmException + */ + public static ContainerCommandRequestProto getReadChunkRequest( + ContainerProtos.WriteChunkRequestProto request) + throws + IOException, NoSuchAlgorithmException { + ContainerProtos.ReadChunkRequestProto.Builder readRequest = + ContainerProtos.ReadChunkRequestProto.newBuilder(); + + readRequest.setPipeline(request.getPipeline()); + + readRequest.setKeyName(request.getKeyName()); + readRequest.setChunkData(request.getChunkData()); + + ContainerCommandRequestProto.Builder newRequest = + ContainerCommandRequestProto.newBuilder(); + newRequest.setCmdType(ContainerProtos.Type.ReadChunk); + newRequest.setReadChunk(readRequest); + return newRequest.build(); + } + + /** + * Returns a delete Request. + * + * @param writeRequest - write request + * @return request + * @throws IOException + * @throws NoSuchAlgorithmException + */ + public static ContainerCommandRequestProto getDeleteChunkRequest( + ContainerProtos.WriteChunkRequestProto writeRequest) + throws + IOException, NoSuchAlgorithmException { + ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest = + ContainerProtos.DeleteChunkRequestProto + .newBuilder(); + + deleteRequest.setPipeline(writeRequest.getPipeline()); + deleteRequest.setChunkData(writeRequest.getChunkData()); + deleteRequest.setKeyName(writeRequest.getKeyName()); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.DeleteChunk); + request.setDeleteChunk(deleteRequest); + return request.build(); + } + /** * Returns a create container command for test purposes. There are a bunch of * tests where we need to just send a request and get a reply. * * @return ContainerCommandRequestProto. */ - public static ContainerCommandRequestProto getCreateContainerRequest() throws - IOException { + public static ContainerCommandRequestProto getCreateContainerRequest( + String containerName) throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto .newBuilder(); ContainerProtos.ContainerData.Builder containerData = ContainerProtos .ContainerData.newBuilder(); - containerData.setName("testContainer"); + containerData.setName(containerName); createRequest.setPipeline( - ContainerTestHelper.createSingleNodePipeline().getProtobufMessage()); + ContainerTestHelper.createSingleNodePipeline(containerName) + .getProtobufMessage()); createRequest.setContainerData(containerData.build()); ContainerCommandRequestProto.Builder request = @@ -86,7 +231,7 @@ public class ContainerTestHelper { * @return ContainerCommandRequestProto. */ public static ContainerCommandResponseProto - getCreateContainerResponse(ContainerCommandRequestProto request) throws + getCreateContainerResponse(ContainerCommandRequestProto request) throws IOException { ContainerProtos.CreateContainerResponseProto.Builder createResponse = ContainerProtos.CreateContainerResponseProto.newBuilder(); @@ -99,6 +244,4 @@ public class ContainerTestHelper { response.setResult(ContainerProtos.Result.SUCCESS); return response.build(); } - - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 3b498e276a2..213019eccfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -18,14 +18,17 @@ package org.apache.hadoop.ozone.container.common.impl; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.utils.LevelDBStore; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.junit.After; @@ -33,13 +36,19 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -47,6 +56,10 @@ import java.util.Map; import static org.apache.hadoop.ozone.container.ContainerTestHelper .createSingleNodePipeline; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; +import static org.apache.hadoop.ozone.container.ContainerTestHelper + .setDataChecksum; import static org.junit.Assert.fail; /** @@ -56,11 +69,15 @@ public class TestContainerPersistence { static String path; static ContainerManagerImpl containerManager; + static ChunkManagerImpl chunkManager; static OzoneConfiguration conf; static FsDatasetSpi fsDataSet; static MiniDFSCluster cluster; static List pathLists = new LinkedList<>(); + @Rule + public ExpectedException exception = ExpectedException.none(); + @BeforeClass public static void init() throws IOException { conf = new OzoneConfiguration(); @@ -84,6 +101,9 @@ public class TestContainerPersistence { cluster.waitActive(); fsDataSet = cluster.getDataNodes().get(0).getFSDataset(); containerManager = new ContainerManagerImpl(); + chunkManager = new ChunkManagerImpl(containerManager); + containerManager.setChunkManager(chunkManager); + } @AfterClass @@ -115,7 +135,8 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName), + data); Assert.assertTrue(containerManager.getContainerMap() .containsKey(containerName)); ContainerManagerImpl.ContainerStatus status = containerManager @@ -132,14 +153,11 @@ public class TestContainerPersistence { String containerPathString = ContainerUtils.getContainerNameFromFile(new File(status.getContainer().getContainerPath())); - Path meta = Paths.get(containerPathString); - - String metadataFile = meta.toString() + OzoneConsts.CONTAINER_META; - Assert.assertTrue(new File(metadataFile).exists()); + Path meta = Paths.get(status.getContainer().getDBPath()).getParent(); + Assert.assertTrue(Files.exists(meta)); String dbPath = status.getContainer().getDBPath(); - LevelDBStore store = null; try { store = new LevelDBStore(new File(dbPath), false); @@ -158,9 +176,9 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName), data); try { - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName), data); fail("Expected Exception not thrown."); } catch (IOException ex) { Assert.assertNotNull(ex); @@ -176,12 +194,12 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName1); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName1), data); data = new ContainerData(containerName2); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName2), data); Assert.assertTrue(containerManager.getContainerMap() @@ -189,7 +207,7 @@ public class TestContainerPersistence { Assert.assertTrue(containerManager.getContainerMap() .containsKey(containerName2)); - containerManager.deleteContainer(createSingleNodePipeline(), + containerManager.deleteContainer(createSingleNodePipeline(containerName1), containerName1); Assert.assertFalse(containerManager.getContainerMap() .containsKey(containerName1)); @@ -200,7 +218,7 @@ public class TestContainerPersistence { data = new ContainerData(containerName1); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName1), data); // Assert we still have both containers. Assert.assertTrue(containerManager.getContainerMap() @@ -228,7 +246,7 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(), data); + containerManager.createContainer(createSingleNodePipeline(containerName), data); testMap.put(containerName, data); } @@ -251,6 +269,204 @@ public class TestContainerPersistence { // Assert that we listed all the keys that we had put into // container. Assert.assertTrue(testMap.isEmpty()); - } -} + + /** + * Writes a single chunk. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testWriteChunk() throws IOException, NoSuchAlgorithmException { + final int datalen = 1024; + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + + pipeline.setContainerName(containerName); + ContainerData cData = new ContainerData(containerName); + cData.addMetadata("VOLUME", "shire"); + cData.addMetadata("owner)", "bilbo"); + containerManager.createContainer(pipeline, cData); + ChunkInfo info = getChunk(keyName, 0, 0, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + } + + /** + * Writes many chunks of the same key into different chunk files and verifies + * that we have that data in many files. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testWritReadManyChunks() throws IOException, + NoSuchAlgorithmException { + final int datalen = 1024; + final int chunkCount = 1024; + + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + Map fileHashMap = new HashMap<>(); + + pipeline.setContainerName(containerName); + ContainerData cData = new ContainerData(containerName); + cData.addMetadata("VOLUME", "shire"); + cData.addMetadata("owner)", "bilbo"); + containerManager.createContainer(pipeline, cData); + for (int x = 0; x < chunkCount; x++) { + ChunkInfo info = getChunk(keyName, x, 0, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + String fileName = String.format("%s.data.%d", keyName, x); + fileHashMap.put(fileName, info); + } + + ContainerData cNewData = containerManager.readContainer(containerName); + Assert.assertNotNull(cNewData); + Path dataDir = ContainerUtils.getDataDirectory(cNewData); + + String globFormat = String.format("%s.data.*", keyName); + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + + // Read chunk via file system and verify. + int count = 0; + try (DirectoryStream stream = + Files.newDirectoryStream(dataDir, globFormat)) { + for (Path fname : stream) { + sha.update(FileUtils.readFileToByteArray(fname.toFile())); + String val = Hex.encodeHexString(sha.digest()); + Assert.assertEquals(fileHashMap.get(fname.getFileName().toString()) + .getChecksum(), + val); + count++; + sha.reset(); + } + Assert.assertEquals(chunkCount, count); + + // Read chunk via ReadChunk call. + sha.reset(); + for (int x = 0; x < chunkCount; x++) { + String fileName = String.format("%s.data.%d", keyName, x); + ChunkInfo info = fileHashMap.get(fileName); + byte[] data = chunkManager.readChunk(pipeline, keyName, info); + sha.update(data); + Assert.assertEquals(Hex.encodeHexString(sha.digest()), + info.getChecksum()); + sha.reset(); + } + } + } + + /** + * Writes a single chunk and tries to overwrite that chunk without over write + * flag then re-tries with overwrite flag. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testOverWrite() throws IOException, + NoSuchAlgorithmException { + final int datalen = 1024; + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + + pipeline.setContainerName(containerName); + ContainerData cData = new ContainerData(containerName); + cData.addMetadata("VOLUME", "shire"); + cData.addMetadata("owner)", "bilbo"); + containerManager.createContainer(pipeline, cData); + ChunkInfo info = getChunk(keyName, 0, 0, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + try { + chunkManager.writeChunk(pipeline, keyName, info, data); + } catch(IOException ex) { + Assert.assertTrue(ex.getMessage().contains( + "Rejecting write chunk request. OverWrite flag required.")); + } + + // With the overwrite flag it should work now. + info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); + chunkManager.writeChunk(pipeline, keyName, info, data); + } + + /** + * This test writes data as many small writes and tries to read back the data + * in a single large read. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testMultipleWriteSingleRead() throws IOException, + NoSuchAlgorithmException { + final int datalen = 1024; + final int chunkCount = 1024; + + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + + pipeline.setContainerName(containerName); + ContainerData cData = new ContainerData(containerName); + cData.addMetadata("VOLUME", "shire"); + cData.addMetadata("owner)", "bilbo"); + containerManager.createContainer(pipeline, cData); + MessageDigest oldSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + for (int x = 0; x < chunkCount; x++) { + // we are writing to the same chunk file but at different offsets. + long offset = x * datalen; + ChunkInfo info = getChunk(keyName, 0, offset, datalen); + byte[] data = getData(datalen); + oldSha.update(data); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + } + + // Request to read the whole data in a single go. + ChunkInfo largeChunk = getChunk(keyName, 0, 0, datalen * chunkCount); + byte[] newdata = chunkManager.readChunk(pipeline, keyName, largeChunk); + MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + newSha.update(newdata); + Assert.assertEquals(Hex.encodeHexString(oldSha.digest()), + Hex.encodeHexString(newSha.digest())); + } + + /** + * Writes a chunk and deletes it, re-reads to make sure it is gone. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testDeleteChunk() throws IOException, + NoSuchAlgorithmException { + final int datalen = 1024; + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + + pipeline.setContainerName(containerName); + ContainerData cData = new ContainerData(containerName); + cData.addMetadata("VOLUME", "shire"); + cData.addMetadata("owner)", "bilbo"); + containerManager.createContainer(pipeline, cData); + ChunkInfo info = getChunk(keyName, 0, 0, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.deleteChunk(pipeline, keyName, info); + exception.expect(IOException.class); + exception.expectMessage("Unable to find the chunk file."); + chunkManager.readChunk(pipeline, keyName, info); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 5beb8b907cb..b3c7a771e92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -37,14 +38,13 @@ import java.io.File; import java.io.IOException; import java.net.URL; - public class TestOzoneContainer { @Test public void testCreateOzoneContainer() throws Exception { - + String containerName = OzoneUtils.getRequestID(); Configuration conf = new OzoneConfiguration(); URL p = conf.getClass().getResource(""); - String path = p.getPath().concat( + String path = p.getPath().concat( TestOzoneContainer.class.getSimpleName()); path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); @@ -59,7 +59,8 @@ public class TestOzoneContainer { cluster.waitActive(); - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline + (containerName); conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes @@ -69,7 +70,7 @@ public class TestOzoneContainer { XceiverClient client = new XceiverClient(pipeline, conf); client.connect(); ContainerProtos.ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest(); + ContainerTestHelper.getCreateContainerRequest(containerName); ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertNotNull(response); @@ -79,13 +80,13 @@ public class TestOzoneContainer { } - @Test public void testOzoneContainerViaDataNode() throws Exception { - + String keyName = OzoneUtils.getRequestID(); + String containerName = OzoneUtils.getRequestID(); Configuration conf = new OzoneConfiguration(); URL p = conf.getClass().getResource(""); - String path = p.getPath().concat( + String path = p.getPath().concat( TestOzoneContainer.class.getSimpleName()); path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); @@ -95,7 +96,8 @@ public class TestOzoneContainer { conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true); conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local"); - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); + Pipeline pipeline = + ContainerTestHelper.createSingleNodePipeline(containerName); conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); @@ -105,12 +107,44 @@ public class TestOzoneContainer { // This client talks to ozone container via datanode. XceiverClient client = new XceiverClient(pipeline, conf); client.connect(); + + // Create container ContainerProtos.ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest(); + ContainerTestHelper.getCreateContainerRequest(containerName); ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertNotNull(response); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + // Write Chunk + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper.getWriteChunkRequest(pipeline, containerName, + keyName, 1024); + + response = client.sendCommand(writeChunkRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + // Read Chunk + request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest + .getWriteChunk()); + + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + //Delete Chunk + request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest + .getWriteChunk()); + + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + client.close(); cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java index 6b134fa639c..f1d9c023e5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.junit.Assert; import org.junit.Test; @@ -45,11 +46,12 @@ public class TestContainerServer { @Test public void testPipeline() throws IOException { EmbeddedChannel channel = null; + String containerName = OzoneUtils.getRequestID(); try { channel = new EmbeddedChannel(new XceiverServerHandler( new TestContainerDispatcher())); ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest(); + ContainerTestHelper.getCreateContainerRequest(containerName); channel.writeInbound(request); Assert.assertTrue(channel.finish()); ContainerCommandResponseProto response = channel.readOutbound(); @@ -65,9 +67,10 @@ public class TestContainerServer { public void testClientServer() throws Exception { XceiverServer server = null; XceiverClient client = null; - + String containerName = OzoneUtils.getRequestID(); try { - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline + (containerName); OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); @@ -79,7 +82,7 @@ public class TestContainerServer { client.connect(); ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest(); + ContainerTestHelper.getCreateContainerRequest(containerName); ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); } finally { @@ -96,9 +99,11 @@ public class TestContainerServer { public void testClientServerWithContainerDispatcher() throws Exception { XceiverServer server = null; XceiverClient client = null; + String containerName = OzoneUtils.getRequestID(); try { - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline + (containerName); OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); @@ -111,7 +116,7 @@ public class TestContainerServer { client.connect(); ContainerCommandRequestProto request = - ContainerTestHelper.getCreateContainerRequest(); + ContainerTestHelper.getCreateContainerRequest(containerName); ContainerCommandResponseProto response = client.sendCommand(request); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);