HDFS-10238. Ozone : Add chunk persistance. Contributed by Anu Engineer.
This commit is contained in:
parent
0744d0a947
commit
c6fd5ea3f0
|
@ -65,8 +65,8 @@ public final class OzoneConsts {
|
||||||
public static final String CONTAINER_ROOT_PREFIX = "repository";
|
public static final String CONTAINER_ROOT_PREFIX = "repository";
|
||||||
|
|
||||||
public static final String CONTAINER_DB = "container.db";
|
public static final String CONTAINER_DB = "container.db";
|
||||||
|
public static final String FILE_HASH = "SHA-256";
|
||||||
|
public final static String CHUNK_OVERWRITE = "OverWriteRequested";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Supports Bucket Versioning.
|
* Supports Bucket Versioning.
|
||||||
|
|
|
@ -0,0 +1,184 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java class that represents ChunkInfo ProtoBuf class. This helper class allows
|
||||||
|
* us to convert to and from protobuf to normal java.
|
||||||
|
*/
|
||||||
|
public class ChunkInfo {
|
||||||
|
private final String chunkName;
|
||||||
|
private final long offset;
|
||||||
|
private final long len;
|
||||||
|
private String checksum;
|
||||||
|
private final Map<String, String> metadata;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a ChunkInfo.
|
||||||
|
*
|
||||||
|
* @param chunkName - File Name where chunk lives.
|
||||||
|
* @param offset - offset where Chunk Starts.
|
||||||
|
* @param len - Length of the Chunk.
|
||||||
|
*/
|
||||||
|
public ChunkInfo(String chunkName, long offset, long len) {
|
||||||
|
this.chunkName = chunkName;
|
||||||
|
this.offset = offset;
|
||||||
|
this.len = len;
|
||||||
|
this.metadata = new TreeMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds metadata.
|
||||||
|
*
|
||||||
|
* @param key - Key Name.
|
||||||
|
* @param value - Value.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void addMetadata(String key, String value) throws IOException {
|
||||||
|
synchronized (this.metadata) {
|
||||||
|
if (this.metadata.containsKey(key)) {
|
||||||
|
throw new IOException("This key already exists. Key " + key);
|
||||||
|
}
|
||||||
|
metadata.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a Chunkinfo class from the protobuf definitions.
|
||||||
|
*
|
||||||
|
* @param info - Protobuf class
|
||||||
|
* @return ChunkInfo
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(info);
|
||||||
|
|
||||||
|
ChunkInfo chunkInfo = new ChunkInfo(info.getChunkName(), info.getOffset(),
|
||||||
|
info.getLen());
|
||||||
|
|
||||||
|
for (int x = 0; x < info.getMetadataCount(); x++) {
|
||||||
|
chunkInfo.addMetadata(info.getMetadata(x).getKey(),
|
||||||
|
info.getMetadata(x).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (info.hasChecksum()) {
|
||||||
|
chunkInfo.setChecksum(info.getChecksum());
|
||||||
|
}
|
||||||
|
return chunkInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ProtoBuf Message from ChunkInfo.
|
||||||
|
*
|
||||||
|
* @return Protocol Buffer Message
|
||||||
|
*/
|
||||||
|
public ContainerProtos.ChunkInfo getProtoBufMessage() {
|
||||||
|
ContainerProtos.ChunkInfo.Builder builder = ContainerProtos
|
||||||
|
.ChunkInfo.newBuilder();
|
||||||
|
|
||||||
|
builder.setChunkName(this.getChunkName());
|
||||||
|
builder.setOffset(this.getOffset());
|
||||||
|
builder.setLen(this.getLen());
|
||||||
|
if (this.getChecksum() != null && !this.getChecksum().isEmpty()) {
|
||||||
|
builder.setChecksum(this.getChecksum());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||||
|
ContainerProtos.KeyValue.Builder keyValBuilder =
|
||||||
|
ContainerProtos.KeyValue.newBuilder();
|
||||||
|
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
||||||
|
.setValue(entry.getValue()).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the chunkName.
|
||||||
|
*
|
||||||
|
* @return - String
|
||||||
|
*/
|
||||||
|
public String getChunkName() {
|
||||||
|
return chunkName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the start offset of the given chunk in physical file.
|
||||||
|
*
|
||||||
|
* @return - long
|
||||||
|
*/
|
||||||
|
public long getOffset() {
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the length of the Chunk.
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getLen() {
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the SHA256 value of this chunk.
|
||||||
|
*
|
||||||
|
* @return - Hash String
|
||||||
|
*/
|
||||||
|
public String getChecksum() {
|
||||||
|
return checksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Hash value of this chunk.
|
||||||
|
*
|
||||||
|
* @param checksum - Hash String.
|
||||||
|
*/
|
||||||
|
public void setChecksum(String checksum) {
|
||||||
|
this.checksum = checksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns Metadata associated with this Chunk.
|
||||||
|
*
|
||||||
|
* @return - Map of Key,values.
|
||||||
|
*/
|
||||||
|
public Map<String, String> getMetadata() {
|
||||||
|
return metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ChunkInfo{" +
|
||||||
|
"chunkName='" + chunkName +
|
||||||
|
", offset=" + offset +
|
||||||
|
", len=" + len +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,299 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import org.apache.commons.codec.binary.Hex;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.AsynchronousFileChannel;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set of utility functions used by the chunk Manager.
|
||||||
|
*/
|
||||||
|
public final class ChunkUtils {
|
||||||
|
|
||||||
|
/* Never constructed. */
|
||||||
|
private ChunkUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if we are getting a request to overwrite an existing range of
|
||||||
|
* chunk.
|
||||||
|
*
|
||||||
|
* @param chunkFile - File
|
||||||
|
* @param chunkInfo - Buffer to write
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
|
||||||
|
chunkInfo) {
|
||||||
|
|
||||||
|
if (!chunkFile.exists()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
long offset = chunkInfo.getOffset();
|
||||||
|
return offset < chunkFile.length();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overwrite is permitted if an only if the user explicitly asks for it. We
|
||||||
|
* permit this iff the key/value pair contains a flag called
|
||||||
|
* [OverWriteRequested, true].
|
||||||
|
*
|
||||||
|
* @param chunkInfo - Chunk info
|
||||||
|
* @return true if the user asks for it.
|
||||||
|
*/
|
||||||
|
public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
|
||||||
|
String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
|
||||||
|
return (overWrite != null) &&
|
||||||
|
(!overWrite.isEmpty()) &&
|
||||||
|
(Boolean.valueOf(overWrite));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates chunk data and returns a file object to Chunk File that we are
|
||||||
|
* expected to write data to.
|
||||||
|
*
|
||||||
|
* @param pipeline - pipeline.
|
||||||
|
* @param data - container data.
|
||||||
|
* @param info - chunk info.
|
||||||
|
* @return File
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static File validateChunk(Pipeline pipeline, ContainerData data,
|
||||||
|
ChunkInfo info) throws IOException {
|
||||||
|
|
||||||
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
|
||||||
|
File chunkFile = getChunkFile(pipeline, data, info);
|
||||||
|
if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
|
||||||
|
if (!ChunkUtils.isOverWritePermitted(info)) {
|
||||||
|
log.error("Rejecting write chunk request. Chunk overwrite " +
|
||||||
|
"without explicit request. {}", info.toString());
|
||||||
|
throw new IOException("Rejecting write chunk request. OverWrite " +
|
||||||
|
"flag required." + info.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return chunkFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates that Path to chunk file exists.
|
||||||
|
*
|
||||||
|
* @param pipeline - Container Info.
|
||||||
|
* @param data - Container Data
|
||||||
|
* @param info - Chunk info
|
||||||
|
* @return - File.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static File getChunkFile(Pipeline pipeline, ContainerData data,
|
||||||
|
ChunkInfo info) throws IOException {
|
||||||
|
|
||||||
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
if (data == null) {
|
||||||
|
log.error("Invalid container Name: {}", pipeline.getContainerName());
|
||||||
|
throw new IOException("Unable to find the container Name: " +
|
||||||
|
pipeline.getContainerName());
|
||||||
|
}
|
||||||
|
|
||||||
|
File dataDir = ContainerUtils.getDataDirectory(data).toFile();
|
||||||
|
if (!dataDir.exists()) {
|
||||||
|
log.error("Unable to find the data directory: {}", dataDir);
|
||||||
|
throw new IOException("Unable to find the data directory: " + dataDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dataDir.toPath().resolve(info.getChunkName()).toFile();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the data in chunk Info to the specified location in the chunkfile.
|
||||||
|
*
|
||||||
|
* @param chunkFile - File to write data to.
|
||||||
|
* @param chunkInfo - Data stream to write.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
|
||||||
|
byte[] data)
|
||||||
|
throws IOException, ExecutionException, InterruptedException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
|
||||||
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
if (data.length != chunkInfo.getLen()) {
|
||||||
|
String err = String.format("data array does not match the length " +
|
||||||
|
"specified. DataLen: %d Byte Array: %d",
|
||||||
|
chunkInfo.getLen(), data.length);
|
||||||
|
log.error(err);
|
||||||
|
throw new IOException(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
AsynchronousFileChannel file = null;
|
||||||
|
FileLock lock = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
file =
|
||||||
|
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||||
|
StandardOpenOption.CREATE,
|
||||||
|
StandardOpenOption.WRITE,
|
||||||
|
StandardOpenOption.SPARSE,
|
||||||
|
StandardOpenOption.SYNC);
|
||||||
|
lock = file.lock().get();
|
||||||
|
if (!chunkInfo.getChecksum().isEmpty()) {
|
||||||
|
verifyChecksum(chunkInfo, data, log);
|
||||||
|
}
|
||||||
|
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
|
||||||
|
if(size != data.length) {
|
||||||
|
log.error("Invalid write size found. Size:{} Expected: {} " , size,
|
||||||
|
data.length);
|
||||||
|
throw new IOException("Invalid write size found. Size: " + size
|
||||||
|
+ " Expected: " + data.length);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (lock != null) {
|
||||||
|
lock.release();
|
||||||
|
}
|
||||||
|
if (file != null) {
|
||||||
|
IOUtils.closeStream(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the checksum of a chunk against the data buffer.
|
||||||
|
*
|
||||||
|
* @param chunkInfo - Chunk Info.
|
||||||
|
* @param data - data buffer
|
||||||
|
* @param log - log
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
|
||||||
|
log) throws NoSuchAlgorithmException, IOException {
|
||||||
|
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
sha.update(data);
|
||||||
|
if (!Hex.encodeHexString(sha.digest()).equals(
|
||||||
|
chunkInfo.getChecksum())) {
|
||||||
|
log.error("Checksum mismatch. Provided: {} , computed: {}",
|
||||||
|
chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
|
||||||
|
throw new IOException("Checksum mismatch. Provided: " +
|
||||||
|
chunkInfo.getChecksum() + " , computed: " +
|
||||||
|
DigestUtils.sha256Hex(sha.digest()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads data from an existing chunk file.
|
||||||
|
*
|
||||||
|
* @param chunkFile - file where data lives.
|
||||||
|
* @param data - chunk defintion.
|
||||||
|
* @return ByteBuffer
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ExecutionException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
|
||||||
|
IOException, ExecutionException, InterruptedException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
|
||||||
|
if (!chunkFile.exists()) {
|
||||||
|
log.error("Unable to find the chunk file. chunk info : {}",
|
||||||
|
data.toString());
|
||||||
|
throw new IOException("Unable to find the chunk file. chunk info " +
|
||||||
|
data.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
AsynchronousFileChannel file = null;
|
||||||
|
FileLock lock = null;
|
||||||
|
try {
|
||||||
|
file =
|
||||||
|
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||||
|
StandardOpenOption.READ);
|
||||||
|
lock = file.lock(data.getOffset(), data.getLen(), true).get();
|
||||||
|
|
||||||
|
ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
|
||||||
|
file.read(buf, data.getOffset()).get();
|
||||||
|
|
||||||
|
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
|
||||||
|
verifyChecksum(data, buf.array(), log);
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
} finally {
|
||||||
|
if (lock != null) {
|
||||||
|
lock.release();
|
||||||
|
}
|
||||||
|
if (file != null) {
|
||||||
|
IOUtils.closeStream(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a CreateContainer Response. This call is used by create and delete
|
||||||
|
* containers which have null success responses.
|
||||||
|
*
|
||||||
|
* @param msg Request
|
||||||
|
* @return Response.
|
||||||
|
*/
|
||||||
|
public static ContainerProtos.ContainerCommandResponseProto
|
||||||
|
getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
|
||||||
|
return ContainerUtils.getContainerResponse(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a response to the read chunk calls.
|
||||||
|
* @param msg - Msg
|
||||||
|
* @param data - Data
|
||||||
|
* @param info - Info
|
||||||
|
* @return Response.
|
||||||
|
*/
|
||||||
|
public static ContainerProtos.ContainerCommandResponseProto
|
||||||
|
getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg,
|
||||||
|
byte[] data, ChunkInfo info) {
|
||||||
|
Preconditions.checkNotNull(msg);
|
||||||
|
|
||||||
|
ContainerProtos.ReadChunkReponseProto.Builder response =
|
||||||
|
ContainerProtos.ReadChunkReponseProto.newBuilder();
|
||||||
|
response.setChunkData(info.getProtoBufMessage());
|
||||||
|
response.setData(ByteString.copyFrom(data));
|
||||||
|
response.setPipeline(msg.getReadChunk().getPipeline());
|
||||||
|
|
||||||
|
ContainerProtos.ContainerCommandResponseProto.Builder builder =
|
||||||
|
ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
|
||||||
|
.SUCCESS, "");
|
||||||
|
builder.setReadChunk(response);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -59,7 +59,8 @@ public final class ContainerUtils {
|
||||||
/**
|
/**
|
||||||
* Returns a ReadContainer Response.
|
* Returns a ReadContainer Response.
|
||||||
*
|
*
|
||||||
* @param msg Request
|
* @param msg Request
|
||||||
|
* @param containerData - data
|
||||||
* @return Response.
|
* @return Response.
|
||||||
*/
|
*/
|
||||||
public static ContainerProtos.ContainerCommandResponseProto
|
public static ContainerProtos.ContainerCommandResponseProto
|
||||||
|
@ -81,7 +82,9 @@ public final class ContainerUtils {
|
||||||
* We found a command type but no associated payload for the command. Hence
|
* We found a command type but no associated payload for the command. Hence
|
||||||
* return malformed Command as response.
|
* return malformed Command as response.
|
||||||
*
|
*
|
||||||
* @param msg - Protobuf message.
|
* @param msg - Protobuf message.
|
||||||
|
* @param result - result
|
||||||
|
* @param message - Error message.
|
||||||
* @return ContainerCommandResponseProto - MALFORMED_REQUEST.
|
* @return ContainerCommandResponseProto - MALFORMED_REQUEST.
|
||||||
*/
|
*/
|
||||||
public static ContainerProtos.ContainerCommandResponseProto.Builder
|
public static ContainerProtos.ContainerCommandResponseProto.Builder
|
||||||
|
@ -185,23 +188,96 @@ public final class ContainerUtils {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Path createMetadata(Path containerPath) throws IOException {
|
public static Path createMetadata(Path containerPath) throws IOException {
|
||||||
|
Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
|
||||||
Preconditions.checkNotNull(containerPath);
|
Preconditions.checkNotNull(containerPath);
|
||||||
containerPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
|
Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
|
||||||
if (!containerPath.toFile().mkdirs()) {
|
if (!metadataPath.toFile().mkdirs()) {
|
||||||
|
log.error("Unable to create directory for metadata storage. Path: {}",
|
||||||
|
metadataPath);
|
||||||
throw new IOException("Unable to create directory for metadata storage." +
|
throw new IOException("Unable to create directory for metadata storage." +
|
||||||
" Path {}" + containerPath);
|
" Path: " + metadataPath);
|
||||||
}
|
}
|
||||||
containerPath = containerPath.resolve(OzoneConsts.CONTAINER_DB);
|
LevelDBStore store =
|
||||||
LevelDBStore store = new LevelDBStore(containerPath.toFile(), true);
|
new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
|
||||||
|
.toFile(), true);
|
||||||
|
|
||||||
// we close since the SCM pre-creates containers.
|
// we close since the SCM pre-creates containers.
|
||||||
// we will open and put Db handle into a cache when keys are being created
|
// we will open and put Db handle into a cache when keys are being created
|
||||||
// in a container.
|
// in a container.
|
||||||
|
|
||||||
store.close();
|
store.close();
|
||||||
return containerPath;
|
|
||||||
|
Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
|
||||||
|
if (!dataPath.toFile().mkdirs()) {
|
||||||
|
|
||||||
|
// If we failed to create data directory, we cleanup the
|
||||||
|
// metadata directory completely. That is, we will delete the
|
||||||
|
// whole directory including LevelDB file.
|
||||||
|
log.error("Unable to create directory for data storage. cleaning up the" +
|
||||||
|
" container path: {} dataPath: {}",
|
||||||
|
containerPath, dataPath);
|
||||||
|
FileUtils.deleteDirectory(containerPath.toFile());
|
||||||
|
throw new IOException("Unable to create directory for data storage." +
|
||||||
|
" Path: " + dataPath);
|
||||||
|
}
|
||||||
|
return metadataPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns Metadata location.
|
||||||
|
*
|
||||||
|
* @param containerData - Data
|
||||||
|
* @param location - Path
|
||||||
|
* @return Path
|
||||||
|
*/
|
||||||
|
public static File getMetadataFile(ContainerData containerData,
|
||||||
|
Path location) {
|
||||||
|
return location.resolve(containerData
|
||||||
|
.getContainerName().concat(CONTAINER_META))
|
||||||
|
.toFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns container file location.
|
||||||
|
* @param containerData - Data
|
||||||
|
* @param location - Root path
|
||||||
|
* @return Path
|
||||||
|
*/
|
||||||
|
public static File getContainerFile(ContainerData containerData,
|
||||||
|
Path location) {
|
||||||
|
return location.resolve(containerData
|
||||||
|
.getContainerName().concat(CONTAINER_EXTENSION))
|
||||||
|
.toFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container metadata directory -- here is where the level DB lives.
|
||||||
|
* @param cData - cData.
|
||||||
|
* @return Path to the parent directory where the DB lives.
|
||||||
|
*/
|
||||||
|
public static Path getMetadataDirectory(ContainerData cData) {
|
||||||
|
Path dbPath = Paths.get(cData.getDBPath());
|
||||||
|
Preconditions.checkNotNull(dbPath);
|
||||||
|
Preconditions.checkState(dbPath.toString().length() > 0);
|
||||||
|
return dbPath.getParent();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the path where data or chunks live for a given container.
|
||||||
|
* @param cData - cData container
|
||||||
|
* @return - Path
|
||||||
|
*/
|
||||||
|
public static Path getDataDirectory(ContainerData cData) throws IOException {
|
||||||
|
Path path = getMetadataDirectory(cData);
|
||||||
|
Preconditions.checkNotNull(path);
|
||||||
|
path = path.getParent();
|
||||||
|
if(path == null) {
|
||||||
|
throw new IOException("Unable to get Data directory. null path found");
|
||||||
|
}
|
||||||
|
return path.resolve(OzoneConsts.CONTAINER_DATA_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove Container if it is empty.
|
* remove Container if it is empty.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
|
@ -58,9 +58,8 @@ public class Pipeline {
|
||||||
for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
|
for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
|
||||||
newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
|
newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
|
||||||
}
|
}
|
||||||
if (pipeline.hasContainerName()) {
|
|
||||||
newPipeline.containerName = newPipeline.getContainerName();
|
newPipeline.setContainerName(pipeline.getContainerName());
|
||||||
}
|
|
||||||
return newPipeline;
|
return newPipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,9 +104,7 @@ public class Pipeline {
|
||||||
builder.addMembers(datanode.getProtoBufMessage());
|
builder.addMembers(datanode.getProtoBufMessage());
|
||||||
}
|
}
|
||||||
builder.setLeaderID(leaderID);
|
builder.setLeaderID(leaderID);
|
||||||
if (this.containerName != null) {
|
builder.setContainerName(this.containerName);
|
||||||
builder.setContainerName(this.containerName);
|
|
||||||
}
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,5 +125,4 @@ public class Pipeline {
|
||||||
public void setContainerName(String containerName) {
|
public void setContainerName(String containerName) {
|
||||||
this.containerName = containerName;
|
this.containerName = containerName;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,5 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common.helpers;
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
/**
|
/**
|
||||||
Contains protocol buffer helper classes.
|
Contains protocol buffer helper classes and utilites used in
|
||||||
|
impl.
|
||||||
**/
|
**/
|
|
@ -0,0 +1,149 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.impl;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of ChunkManager that is used by default in ozone.
|
||||||
|
*/
|
||||||
|
public class ChunkManagerImpl implements ChunkManager {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
|
||||||
|
private final ContainerManager containerManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a ChunkManager.
|
||||||
|
*
|
||||||
|
* @param manager - ContainerManager.
|
||||||
|
*/
|
||||||
|
public ChunkManagerImpl(ContainerManager manager) {
|
||||||
|
this.containerManager = manager;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* writes a given chunk.
|
||||||
|
*
|
||||||
|
* @param pipeline - Name and the set of machines that make this container.
|
||||||
|
* @param keyName - Name of the Key.
|
||||||
|
* @param info - ChunkInfo.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
|
||||||
|
byte[] data)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// we don't want container manager to go away while we are writing chunks.
|
||||||
|
containerManager.readLock();
|
||||||
|
|
||||||
|
// TODO : Take keyManager Write lock here.
|
||||||
|
try {
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
Preconditions.checkNotNull(pipeline.getContainerName());
|
||||||
|
File chunkFile = ChunkUtils.validateChunk(pipeline,
|
||||||
|
containerManager.readContainer(pipeline.getContainerName()), info);
|
||||||
|
ChunkUtils.writeData(chunkFile, info, data);
|
||||||
|
|
||||||
|
} catch (ExecutionException |
|
||||||
|
NoSuchAlgorithmException e) {
|
||||||
|
LOG.error("write data failed. error: {}", e);
|
||||||
|
throw new IOException("Internal error: ", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.error("write data failed. error: {}", e);
|
||||||
|
throw new IOException("Internal error: ", e);
|
||||||
|
} finally {
|
||||||
|
containerManager.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reads the data defined by a chunk.
|
||||||
|
*
|
||||||
|
* @param pipeline - container pipeline.
|
||||||
|
* @param keyName - Name of the Key
|
||||||
|
* @param info - ChunkInfo.
|
||||||
|
* @return byte array
|
||||||
|
* @throws IOException TODO: Right now we do not support partial reads and
|
||||||
|
* writes of chunks. TODO: Explore if we need to do that
|
||||||
|
* for ozone.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
|
||||||
|
throws IOException {
|
||||||
|
containerManager.readLock();
|
||||||
|
try {
|
||||||
|
File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
|
||||||
|
.readContainer(pipeline.getContainerName()), info);
|
||||||
|
return ChunkUtils.readData(chunkFile, info).array();
|
||||||
|
} catch (ExecutionException | NoSuchAlgorithmException e) {
|
||||||
|
LOG.error("read data failed. error: {}", e);
|
||||||
|
throw new IOException("Internal error: ", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.error("read data failed. error: {}", e);
|
||||||
|
throw new IOException("Internal error: ", e);
|
||||||
|
} finally {
|
||||||
|
containerManager.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes a given chunk.
|
||||||
|
*
|
||||||
|
* @param pipeline - Pipeline.
|
||||||
|
* @param keyName - Key Name
|
||||||
|
* @param info - Chunk Info
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
containerManager.readLock();
|
||||||
|
try {
|
||||||
|
File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
|
||||||
|
.readContainer(pipeline.getContainerName()), info);
|
||||||
|
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
|
||||||
|
FileUtil.fullyDelete(chunkFile);
|
||||||
|
} else {
|
||||||
|
LOG.error("Not Supported Operation. Trying to delete a " +
|
||||||
|
"chunk that is in shared file. chunk info : " + info.toString());
|
||||||
|
throw new IOException("Not Supported Operation. Trying to delete a " +
|
||||||
|
"chunk that is in shared file. chunk info : " + info.toString());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
containerManager.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,9 +26,11 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -69,6 +71,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// for waiting threads.
|
// for waiting threads.
|
||||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||||
private ContainerLocationManager locationManager;
|
private ContainerLocationManager locationManager;
|
||||||
|
private ChunkManager chunkManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Init call that sets up a container Manager.
|
* Init call that sets up a container Manager.
|
||||||
|
@ -141,7 +144,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
metaStream = new FileInputStream(metaFileName);
|
metaStream = new FileInputStream(metaFileName);
|
||||||
|
|
||||||
MessageDigest sha = MessageDigest.getInstance("SHA-256");
|
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
|
||||||
dis = new DigestInputStream(containerStream, sha);
|
dis = new DigestInputStream(containerStream, sha);
|
||||||
|
|
||||||
|
@ -238,13 +241,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
FileOutputStream metaStream = null;
|
FileOutputStream metaStream = null;
|
||||||
Path location = locationManager.getContainerPath();
|
Path location = locationManager.getContainerPath();
|
||||||
|
|
||||||
File containerFile = location.resolve(containerData
|
File containerFile = ContainerUtils.getContainerFile(containerData,
|
||||||
.getContainerName().concat(CONTAINER_EXTENSION))
|
location);
|
||||||
.toFile();
|
File metadataFile = ContainerUtils.getMetadataFile(containerData, location);
|
||||||
|
|
||||||
File metadataFile = location.resolve(containerData
|
|
||||||
.getContainerName().concat(CONTAINER_META))
|
|
||||||
.toFile();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
|
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
|
||||||
|
@ -255,10 +254,11 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
containerStream = new FileOutputStream(containerFile);
|
containerStream = new FileOutputStream(containerFile);
|
||||||
metaStream = new FileOutputStream(metadataFile);
|
metaStream = new FileOutputStream(metadataFile);
|
||||||
MessageDigest sha = MessageDigest.getInstance("SHA-256");
|
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
|
||||||
dos = new DigestOutputStream(containerStream, sha);
|
dos = new DigestOutputStream(containerStream, sha);
|
||||||
containerData.setDBPath(metadataPath.toString());
|
containerData.setDBPath(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
|
||||||
|
.toString());
|
||||||
containerData.setContainerPath(containerFile.toString());
|
containerData.setContainerPath(containerFile.toString());
|
||||||
|
|
||||||
ContainerProtos.ContainerData protoData = containerData
|
ContainerProtos.ContainerData protoData = containerData
|
||||||
|
@ -293,6 +293,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes an existing container.
|
* Deletes an existing container.
|
||||||
*
|
*
|
||||||
|
@ -368,6 +370,10 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ContainerData readContainer(String containerName) throws IOException {
|
public ContainerData readContainer(String containerName) throws IOException {
|
||||||
|
if(!containerMap.containsKey(containerName)) {
|
||||||
|
throw new IOException("Unable to find the container. Name: "
|
||||||
|
+ containerName);
|
||||||
|
}
|
||||||
return containerMap.get(containerName).getContainer();
|
return containerMap.get(containerName).getContainer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,6 +452,18 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
return this.lock.writeLock().isHeldByCurrentThread();
|
return this.lock.writeLock().isHeldByCurrentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the chunk Manager.
|
||||||
|
* @param chunkManager
|
||||||
|
*/
|
||||||
|
public void setChunkManager(ChunkManager chunkManager) {
|
||||||
|
this.chunkManager = chunkManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ChunkManager getChunkManager() {
|
||||||
|
return this.chunkManager;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter out only container files from the container metadata dir.
|
* Filter out only container files from the container metadata dir.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
|
@ -68,6 +70,12 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if ((cmdType == Type.WriteChunk) ||
|
||||||
|
(cmdType == Type.ReadChunk) ||
|
||||||
|
(cmdType == Type.DeleteChunk)) {
|
||||||
|
return chunkProcessHandler(msg);
|
||||||
|
}
|
||||||
|
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,28 +89,66 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
private ContainerCommandResponseProto containerProcessHandler(
|
private ContainerCommandResponseProto containerProcessHandler(
|
||||||
ContainerCommandRequestProto msg) throws IOException {
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
try {
|
try {
|
||||||
ContainerData cData = ContainerData.getFromProtBuf(
|
|
||||||
msg.getCreateContainer().getContainerData());
|
|
||||||
|
|
||||||
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
|
||||||
msg.getCreateContainer().getPipeline());
|
|
||||||
Preconditions.checkNotNull(pipeline);
|
|
||||||
|
|
||||||
switch (msg.getCmdType()) {
|
switch (msg.getCmdType()) {
|
||||||
case CreateContainer:
|
case CreateContainer:
|
||||||
return handleCreateContainer(msg, cData, pipeline);
|
return handleCreateContainer(msg);
|
||||||
|
|
||||||
case DeleteContainer:
|
case DeleteContainer:
|
||||||
return handleDeleteContainer(msg, cData, pipeline);
|
return handleDeleteContainer(msg);
|
||||||
|
|
||||||
case ListContainer:
|
case ListContainer:
|
||||||
|
// TODO : Support List Container.
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
|
||||||
case UpdateContainer:
|
case UpdateContainer:
|
||||||
|
// TODO : Support Update Container.
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
|
||||||
case ReadContainer:
|
case ReadContainer:
|
||||||
return handleReadContainer(msg, cData);
|
return handleReadContainer(msg);
|
||||||
|
|
||||||
|
default:
|
||||||
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.warn("Container operation failed. " +
|
||||||
|
"Container: {} Operation: {} trace ID: {} Error: {}",
|
||||||
|
msg.getCreateContainer().getContainerData().getName(),
|
||||||
|
msg.getCmdType().name(),
|
||||||
|
msg.getTraceID(),
|
||||||
|
ex.toString());
|
||||||
|
|
||||||
|
// TODO : Replace with finer error codes.
|
||||||
|
return ContainerUtils.getContainerResponse(msg,
|
||||||
|
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
|
||||||
|
ex.toString()).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles the all chunk related functionality.
|
||||||
|
*
|
||||||
|
* @param msg - command
|
||||||
|
* @return - response
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ContainerCommandResponseProto chunkProcessHandler(
|
||||||
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
|
try {
|
||||||
|
|
||||||
|
switch (msg.getCmdType()) {
|
||||||
|
case WriteChunk:
|
||||||
|
return handleWriteChunk(msg);
|
||||||
|
|
||||||
|
case ReadChunk:
|
||||||
|
return handleReadChunk(msg);
|
||||||
|
|
||||||
|
case DeleteChunk:
|
||||||
|
return handleDeleteChunk(msg);
|
||||||
|
|
||||||
|
case ListChunk:
|
||||||
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
@ -125,13 +171,12 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
/**
|
/**
|
||||||
* Calls into container logic and returns appropriate response.
|
* Calls into container logic and returns appropriate response.
|
||||||
*
|
*
|
||||||
* @param msg - Request
|
* @param msg - Request
|
||||||
* @param cData - Container Data object
|
|
||||||
* @return ContainerCommandResponseProto
|
* @return ContainerCommandResponseProto
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private ContainerCommandResponseProto handleReadContainer(
|
private ContainerCommandResponseProto handleReadContainer(
|
||||||
ContainerCommandRequestProto msg, ContainerData cData)
|
ContainerCommandRequestProto msg)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
if (!msg.hasReadContainer()) {
|
if (!msg.hasReadContainer()) {
|
||||||
|
@ -139,51 +184,147 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
msg.getTraceID());
|
msg.getTraceID());
|
||||||
return ContainerUtils.malformedRequest(msg);
|
return ContainerUtils.malformedRequest(msg);
|
||||||
}
|
}
|
||||||
ContainerData container = this.containerManager.readContainer(
|
|
||||||
cData.getContainerName());
|
String name = msg.getReadContainer().getName();
|
||||||
|
ContainerData container = this.containerManager.readContainer(name);
|
||||||
return ContainerUtils.getReadContainerResponse(msg, container);
|
return ContainerUtils.getReadContainerResponse(msg, container);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls into container logic and returns appropriate response.
|
* Calls into container logic and returns appropriate response.
|
||||||
*
|
*
|
||||||
* @param msg - Request
|
* @param msg - Request
|
||||||
* @param cData - ContainerData
|
|
||||||
* @param pipeline - Pipeline is the machines where this container lives.
|
|
||||||
* @return Response.
|
* @return Response.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private ContainerCommandResponseProto handleDeleteContainer(
|
private ContainerCommandResponseProto handleDeleteContainer(
|
||||||
ContainerCommandRequestProto msg, ContainerData cData,
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
Pipeline pipeline) throws IOException {
|
|
||||||
if (!msg.hasDeleteContainer()) {
|
if (!msg.hasDeleteContainer()) {
|
||||||
LOG.debug("Malformed delete container request. trace ID: {}",
|
LOG.debug("Malformed delete container request. trace ID: {}",
|
||||||
msg.getTraceID());
|
msg.getTraceID());
|
||||||
return ContainerUtils.malformedRequest(msg);
|
return ContainerUtils.malformedRequest(msg);
|
||||||
}
|
}
|
||||||
this.containerManager.deleteContainer(pipeline,
|
|
||||||
cData.getContainerName());
|
String name = msg.getDeleteContainer().getName();
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getDeleteContainer().getPipeline());
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
|
||||||
|
this.containerManager.deleteContainer(pipeline, name);
|
||||||
return ContainerUtils.getContainerResponse(msg);
|
return ContainerUtils.getContainerResponse(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls into container logic and returns appropriate response.
|
* Calls into container logic and returns appropriate response.
|
||||||
*
|
*
|
||||||
* @param msg - Request
|
* @param msg - Request
|
||||||
* @param cData - ContainerData
|
|
||||||
* @param pipeline - Pipeline is the machines where this container lives.
|
|
||||||
* @return Response.
|
* @return Response.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private ContainerCommandResponseProto handleCreateContainer(
|
private ContainerCommandResponseProto handleCreateContainer(
|
||||||
ContainerCommandRequestProto msg, ContainerData cData,
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
Pipeline pipeline) throws IOException {
|
|
||||||
if (!msg.hasCreateContainer()) {
|
if (!msg.hasCreateContainer()) {
|
||||||
LOG.debug("Malformed create container request. trace ID: {}",
|
LOG.debug("Malformed create container request. trace ID: {}",
|
||||||
msg.getTraceID());
|
msg.getTraceID());
|
||||||
return ContainerUtils.malformedRequest(msg);
|
return ContainerUtils.malformedRequest(msg);
|
||||||
}
|
}
|
||||||
|
ContainerData cData = ContainerData.getFromProtBuf(
|
||||||
|
msg.getCreateContainer().getContainerData());
|
||||||
|
Preconditions.checkNotNull(cData);
|
||||||
|
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getCreateContainer().getPipeline());
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
|
||||||
this.containerManager.createContainer(pipeline, cData);
|
this.containerManager.createContainer(pipeline, cData);
|
||||||
return ContainerUtils.getContainerResponse(msg);
|
return ContainerUtils.getContainerResponse(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls into chunk manager to write a chunk.
|
||||||
|
*
|
||||||
|
* @param msg - Request.
|
||||||
|
* @return Response.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ContainerCommandResponseProto handleWriteChunk(
|
||||||
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
|
if (!msg.hasWriteChunk()) {
|
||||||
|
LOG.debug("Malformed write chunk request. trace ID: {}",
|
||||||
|
msg.getTraceID());
|
||||||
|
return ContainerUtils.malformedRequest(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
String keyName = msg.getWriteChunk().getKeyName();
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getWriteChunk().getPipeline());
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
|
||||||
|
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
|
||||||
|
.getChunkData());
|
||||||
|
Preconditions.checkNotNull(chunkInfo);
|
||||||
|
byte[] data = msg.getWriteChunk().getData().toByteArray();
|
||||||
|
this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
|
||||||
|
chunkInfo, data);
|
||||||
|
return ChunkUtils.getChunkResponse(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls into chunk manager to read a chunk.
|
||||||
|
*
|
||||||
|
* @param msg - Request.
|
||||||
|
* @return - Response.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ContainerCommandResponseProto handleReadChunk(
|
||||||
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
|
if (!msg.hasReadChunk()) {
|
||||||
|
LOG.debug("Malformed read chunk request. trace ID: {}",
|
||||||
|
msg.getTraceID());
|
||||||
|
return ContainerUtils.malformedRequest(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
String keyName = msg.getReadChunk().getKeyName();
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getReadChunk().getPipeline());
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
|
||||||
|
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
|
||||||
|
.getChunkData());
|
||||||
|
Preconditions.checkNotNull(chunkInfo);
|
||||||
|
byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
|
||||||
|
keyName, chunkInfo);
|
||||||
|
return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls into chunk manager to write a chunk.
|
||||||
|
*
|
||||||
|
* @param msg - Request.
|
||||||
|
* @return Response.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ContainerCommandResponseProto handleDeleteChunk(
|
||||||
|
ContainerCommandRequestProto msg) throws IOException {
|
||||||
|
if (!msg.hasDeleteChunk()) {
|
||||||
|
LOG.debug("Malformed delete chunk request. trace ID: {}",
|
||||||
|
msg.getTraceID());
|
||||||
|
return ContainerUtils.malformedRequest(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
String keyName = msg.getDeleteChunk().getKeyName();
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getDeleteChunk().getPipeline());
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
|
||||||
|
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
|
||||||
|
.getChunkData());
|
||||||
|
Preconditions.checkNotNull(chunkInfo);
|
||||||
|
|
||||||
|
this.containerManager.getChunkManager().deleteChunk(pipeline, keyName,
|
||||||
|
chunkInfo);
|
||||||
|
return ChunkUtils.getChunkResponse(msg);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chunk Manager allows read, write, delete and listing of chunks in
|
||||||
|
* a container.
|
||||||
|
*/
|
||||||
|
public interface ChunkManager {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* writes a given chunk.
|
||||||
|
* @param pipeline - Name and the set of machines that make this container.
|
||||||
|
* @param keyName - Name of the Key.
|
||||||
|
* @param info - ChunkInfo.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void writeChunk(Pipeline pipeline, String keyName,
|
||||||
|
ChunkInfo info, byte[] data) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reads the data defined by a chunk.
|
||||||
|
* @param pipeline - container pipeline.
|
||||||
|
* @param keyName - Name of the Key
|
||||||
|
* @param info - ChunkInfo.
|
||||||
|
* @return byte array
|
||||||
|
* @throws IOException
|
||||||
|
*
|
||||||
|
* TODO: Right now we do not support partial reads and writes of chunks.
|
||||||
|
* TODO: Explore if we need to do that for ozone.
|
||||||
|
*/
|
||||||
|
byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
|
||||||
|
IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes a given chunk.
|
||||||
|
* @param pipeline - Pipeline.
|
||||||
|
* @param keyName - Key Name
|
||||||
|
* @param info - Chunk Info
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
|
||||||
|
IOException;
|
||||||
|
|
||||||
|
// TODO : Support list operations.
|
||||||
|
|
||||||
|
}
|
|
@ -97,4 +97,17 @@ public interface ContainerManager extends RwLock {
|
||||||
*/
|
*/
|
||||||
void shutdown() throws IOException;
|
void shutdown() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Chunk Manager.
|
||||||
|
* @param chunkManager - ChunkManager.
|
||||||
|
*/
|
||||||
|
void setChunkManager(ChunkManager chunkManager);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the Chunk Manager.
|
||||||
|
* @return ChunkManager.
|
||||||
|
*/
|
||||||
|
ChunkManager getChunkManager();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
||||||
|
@ -48,6 +50,7 @@ public class OzoneContainer {
|
||||||
private final ContainerDispatcher dispatcher;
|
private final ContainerDispatcher dispatcher;
|
||||||
private final ContainerManager manager;
|
private final ContainerManager manager;
|
||||||
private final XceiverServer server;
|
private final XceiverServer server;
|
||||||
|
private final ChunkManager chunkManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a network endpoint and enables Ozone container.
|
* Creates a network endpoint and enables Ozone container.
|
||||||
|
@ -74,6 +77,8 @@ public class OzoneContainer {
|
||||||
|
|
||||||
manager = new ContainerManagerImpl();
|
manager = new ContainerManagerImpl();
|
||||||
manager.init(this.ozoneConfig, locations, this.dataSet);
|
manager.init(this.ozoneConfig, locations, this.dataSet);
|
||||||
|
this.chunkManager = new ChunkManagerImpl(manager);
|
||||||
|
manager.setChunkManager(this.chunkManager);
|
||||||
|
|
||||||
this.dispatcher = new Dispatcher(manager);
|
this.dispatcher = new Dispatcher(manager);
|
||||||
server = new XceiverServer(this.ozoneConfig, this.dispatcher);
|
server = new XceiverServer(this.ozoneConfig, this.dispatcher);
|
||||||
|
|
|
@ -157,7 +157,7 @@ message ContainerCommandResponseProto {
|
||||||
message Pipeline {
|
message Pipeline {
|
||||||
required string leaderID = 1;
|
required string leaderID = 1;
|
||||||
repeated DatanodeIDProto members = 2;
|
repeated DatanodeIDProto members = 2;
|
||||||
optional string containerName = 3;
|
required string containerName = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message KeyValue {
|
message KeyValue {
|
||||||
|
@ -244,7 +244,7 @@ message ReadKeyRequestProto {
|
||||||
|
|
||||||
message ReadKeyResponeProto {
|
message ReadKeyResponeProto {
|
||||||
repeated KeyValue metadata = 1;
|
repeated KeyValue metadata = 1;
|
||||||
repeated chunkInfo chunkData = 2;
|
repeated ChunkInfo chunkData = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message UpdateKeyRequestProto {
|
message UpdateKeyRequestProto {
|
||||||
|
@ -276,19 +276,19 @@ message ListKeyResponeProto {
|
||||||
|
|
||||||
// Chunk Operations
|
// Chunk Operations
|
||||||
|
|
||||||
message chunkInfo {
|
message ChunkInfo {
|
||||||
required uint64 offset = 1;
|
required string chunkName = 1;
|
||||||
required uint64 len = 2;
|
required uint64 offset = 2;
|
||||||
optional uint64 checksum = 3;
|
required uint64 len = 3;
|
||||||
repeated KeyValue metadata = 4;
|
optional string checksum = 4;
|
||||||
|
repeated KeyValue metadata = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message WriteChunkRequestProto {
|
message WriteChunkRequestProto {
|
||||||
required Pipeline pipeline = 1;
|
required Pipeline pipeline = 1;
|
||||||
required string containerName = 2;
|
required string keyName = 2;
|
||||||
required string keyName = 3;
|
required ChunkInfo chunkData = 3;
|
||||||
required chunkInfo chunkData = 4;
|
required bytes data = 4;
|
||||||
repeated bytes data = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message WriteChunkReponseProto {
|
message WriteChunkReponseProto {
|
||||||
|
@ -296,22 +296,20 @@ message WriteChunkReponseProto {
|
||||||
|
|
||||||
message ReadChunkRequestProto {
|
message ReadChunkRequestProto {
|
||||||
required Pipeline pipeline = 1;
|
required Pipeline pipeline = 1;
|
||||||
required string containerName = 2;
|
required string keyName = 2;
|
||||||
required string keyName = 3;
|
required ChunkInfo chunkData = 3;
|
||||||
required chunkInfo chunkData = 4;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ReadChunkReponseProto {
|
message ReadChunkReponseProto {
|
||||||
required Pipeline pipeline = 1;
|
required Pipeline pipeline = 1;
|
||||||
required chunkInfo chunkData = 2;
|
required ChunkInfo chunkData = 2;
|
||||||
repeated bytes data = 3;
|
required bytes data = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteChunkRequestProto {
|
message DeleteChunkRequestProto {
|
||||||
required Pipeline pipeline = 1;
|
required Pipeline pipeline = 1;
|
||||||
required string containerName = 2;
|
required string keyName = 2;
|
||||||
required string keyName = 3;
|
required ChunkInfo chunkData = 3;
|
||||||
required chunkInfo chunkData = 4;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteChunkResponseProto {
|
message DeleteChunkResponseProto {
|
||||||
|
@ -319,13 +317,12 @@ message DeleteChunkResponseProto {
|
||||||
|
|
||||||
message ListChunkRequestProto {
|
message ListChunkRequestProto {
|
||||||
required Pipeline pipeline = 1;
|
required Pipeline pipeline = 1;
|
||||||
required string containerName = 2;
|
required string keyName = 2;
|
||||||
required string keyName = 3;
|
required string prevChunkName = 3;
|
||||||
required uint64 startOffset = 4;
|
required uint32 count = 4;
|
||||||
required uint32 count = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListChunkResponseProto {
|
message ListChunkResponseProto {
|
||||||
repeated chunkInfo chunkData = 1;
|
repeated ChunkInfo chunkData = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,22 +18,32 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.container;
|
package org.apache.hadoop.ozone.container;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import org.apache.commons.codec.binary.Hex;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.io.IOExceptionWithCause;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.ContainerCommandRequestProto;
|
.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helpers for container tests.
|
* Helpers for container tests.
|
||||||
*/
|
*/
|
||||||
public class ContainerTestHelper {
|
public class ContainerTestHelper {
|
||||||
|
private static Random r = new Random();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a pipeline with single node replica.
|
* Create a pipeline with single node replica.
|
||||||
|
@ -41,7 +51,8 @@ public class ContainerTestHelper {
|
||||||
* @return Pipeline with single node in it.
|
* @return Pipeline with single node in it.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Pipeline createSingleNodePipeline() throws IOException {
|
public static Pipeline createSingleNodePipeline(String containerName) throws
|
||||||
|
IOException {
|
||||||
ServerSocket socket = new ServerSocket(0);
|
ServerSocket socket = new ServerSocket(0);
|
||||||
int port = socket.getLocalPort();
|
int port = socket.getLocalPort();
|
||||||
DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
|
DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
|
||||||
|
@ -50,26 +61,160 @@ public class ContainerTestHelper {
|
||||||
datanodeID.setContainerPort(port);
|
datanodeID.setContainerPort(port);
|
||||||
Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
|
Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
|
||||||
pipeline.addMember(datanodeID);
|
pipeline.addMember(datanodeID);
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
socket.close();
|
socket.close();
|
||||||
return pipeline;
|
return pipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a ChunkInfo for testing.
|
||||||
|
*
|
||||||
|
* @param keyName - Name of the key
|
||||||
|
* @param seqNo - Chunk number.
|
||||||
|
* @return ChunkInfo
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static ChunkInfo getChunk(String keyName, int seqNo, long offset,
|
||||||
|
long len) throws IOException {
|
||||||
|
|
||||||
|
ChunkInfo info = new ChunkInfo(String.format("%s.data.%d", keyName,
|
||||||
|
seqNo), offset, len);
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates some data of the requested len.
|
||||||
|
*
|
||||||
|
* @param len - Number of bytes.
|
||||||
|
* @return byte array with valid data.
|
||||||
|
*/
|
||||||
|
public static byte[] getData(int len) {
|
||||||
|
byte[] data = new byte[len];
|
||||||
|
r.nextBytes(data);
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the hash and sets the value correctly.
|
||||||
|
*
|
||||||
|
* @param info - chunk info.
|
||||||
|
* @param data - data array
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
public static void setDataChecksum(ChunkInfo info, byte[] data)
|
||||||
|
throws NoSuchAlgorithmException {
|
||||||
|
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
sha.update(data);
|
||||||
|
info.setChecksum(Hex.encodeHexString(sha.digest()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a writeChunk Request.
|
||||||
|
*
|
||||||
|
* @param containerName - Name
|
||||||
|
* @param keyName - Name
|
||||||
|
* @param datalen - data len.
|
||||||
|
* @return Request.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getWriteChunkRequest(
|
||||||
|
Pipeline pipeline, String containerName, String keyName, int datalen)
|
||||||
|
throws
|
||||||
|
IOException, NoSuchAlgorithmException {
|
||||||
|
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
|
||||||
|
ContainerProtos.WriteChunkRequestProto
|
||||||
|
.newBuilder();
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
writeRequest.setPipeline(pipeline.getProtobufMessage());
|
||||||
|
writeRequest.setKeyName(keyName);
|
||||||
|
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
|
||||||
|
writeRequest.setChunkData(info.getProtoBufMessage());
|
||||||
|
writeRequest.setData(ByteString.copyFrom(data));
|
||||||
|
|
||||||
|
ContainerCommandRequestProto.Builder request =
|
||||||
|
ContainerCommandRequestProto.newBuilder();
|
||||||
|
request.setCmdType(ContainerProtos.Type.WriteChunk);
|
||||||
|
request.setWriteChunk(writeRequest);
|
||||||
|
return request.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a read Request.
|
||||||
|
*
|
||||||
|
* @param request writeChunkRequest.
|
||||||
|
* @return Request.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getReadChunkRequest(
|
||||||
|
ContainerProtos.WriteChunkRequestProto request)
|
||||||
|
throws
|
||||||
|
IOException, NoSuchAlgorithmException {
|
||||||
|
ContainerProtos.ReadChunkRequestProto.Builder readRequest =
|
||||||
|
ContainerProtos.ReadChunkRequestProto.newBuilder();
|
||||||
|
|
||||||
|
readRequest.setPipeline(request.getPipeline());
|
||||||
|
|
||||||
|
readRequest.setKeyName(request.getKeyName());
|
||||||
|
readRequest.setChunkData(request.getChunkData());
|
||||||
|
|
||||||
|
ContainerCommandRequestProto.Builder newRequest =
|
||||||
|
ContainerCommandRequestProto.newBuilder();
|
||||||
|
newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
|
||||||
|
newRequest.setReadChunk(readRequest);
|
||||||
|
return newRequest.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a delete Request.
|
||||||
|
*
|
||||||
|
* @param writeRequest - write request
|
||||||
|
* @return request
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getDeleteChunkRequest(
|
||||||
|
ContainerProtos.WriteChunkRequestProto writeRequest)
|
||||||
|
throws
|
||||||
|
IOException, NoSuchAlgorithmException {
|
||||||
|
ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest =
|
||||||
|
ContainerProtos.DeleteChunkRequestProto
|
||||||
|
.newBuilder();
|
||||||
|
|
||||||
|
deleteRequest.setPipeline(writeRequest.getPipeline());
|
||||||
|
deleteRequest.setChunkData(writeRequest.getChunkData());
|
||||||
|
deleteRequest.setKeyName(writeRequest.getKeyName());
|
||||||
|
|
||||||
|
ContainerCommandRequestProto.Builder request =
|
||||||
|
ContainerCommandRequestProto.newBuilder();
|
||||||
|
request.setCmdType(ContainerProtos.Type.DeleteChunk);
|
||||||
|
request.setDeleteChunk(deleteRequest);
|
||||||
|
return request.build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a create container command for test purposes. There are a bunch of
|
* Returns a create container command for test purposes. There are a bunch of
|
||||||
* tests where we need to just send a request and get a reply.
|
* tests where we need to just send a request and get a reply.
|
||||||
*
|
*
|
||||||
* @return ContainerCommandRequestProto.
|
* @return ContainerCommandRequestProto.
|
||||||
*/
|
*/
|
||||||
public static ContainerCommandRequestProto getCreateContainerRequest() throws
|
public static ContainerCommandRequestProto getCreateContainerRequest(
|
||||||
IOException {
|
String containerName) throws IOException {
|
||||||
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
|
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
|
||||||
ContainerProtos.CreateContainerRequestProto
|
ContainerProtos.CreateContainerRequestProto
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
|
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
|
||||||
.ContainerData.newBuilder();
|
.ContainerData.newBuilder();
|
||||||
containerData.setName("testContainer");
|
containerData.setName(containerName);
|
||||||
createRequest.setPipeline(
|
createRequest.setPipeline(
|
||||||
ContainerTestHelper.createSingleNodePipeline().getProtobufMessage());
|
ContainerTestHelper.createSingleNodePipeline(containerName)
|
||||||
|
.getProtobufMessage());
|
||||||
createRequest.setContainerData(containerData.build());
|
createRequest.setContainerData(containerData.build());
|
||||||
|
|
||||||
ContainerCommandRequestProto.Builder request =
|
ContainerCommandRequestProto.Builder request =
|
||||||
|
@ -86,7 +231,7 @@ public class ContainerTestHelper {
|
||||||
* @return ContainerCommandRequestProto.
|
* @return ContainerCommandRequestProto.
|
||||||
*/
|
*/
|
||||||
public static ContainerCommandResponseProto
|
public static ContainerCommandResponseProto
|
||||||
getCreateContainerResponse(ContainerCommandRequestProto request) throws
|
getCreateContainerResponse(ContainerCommandRequestProto request) throws
|
||||||
IOException {
|
IOException {
|
||||||
ContainerProtos.CreateContainerResponseProto.Builder createResponse =
|
ContainerProtos.CreateContainerResponseProto.Builder createResponse =
|
||||||
ContainerProtos.CreateContainerResponseProto.newBuilder();
|
ContainerProtos.CreateContainerResponseProto.newBuilder();
|
||||||
|
@ -99,6 +244,4 @@ public class ContainerTestHelper {
|
||||||
response.setResult(ContainerProtos.Result.SUCCESS);
|
response.setResult(ContainerProtos.Result.SUCCESS);
|
||||||
return response.build();
|
return response.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,17 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.container.common.impl;
|
package org.apache.hadoop.ozone.container.common.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Hex;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -33,13 +36,19 @@ import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.nio.file.DirectoryStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -47,6 +56,10 @@ import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||||
.createSingleNodePipeline;
|
.createSingleNodePipeline;
|
||||||
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
|
||||||
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
|
||||||
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||||
|
.setDataChecksum;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -56,11 +69,15 @@ public class TestContainerPersistence {
|
||||||
|
|
||||||
static String path;
|
static String path;
|
||||||
static ContainerManagerImpl containerManager;
|
static ContainerManagerImpl containerManager;
|
||||||
|
static ChunkManagerImpl chunkManager;
|
||||||
static OzoneConfiguration conf;
|
static OzoneConfiguration conf;
|
||||||
static FsDatasetSpi fsDataSet;
|
static FsDatasetSpi fsDataSet;
|
||||||
static MiniDFSCluster cluster;
|
static MiniDFSCluster cluster;
|
||||||
static List<Path> pathLists = new LinkedList<>();
|
static List<Path> pathLists = new LinkedList<>();
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() throws IOException {
|
public static void init() throws IOException {
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
|
@ -84,6 +101,9 @@ public class TestContainerPersistence {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fsDataSet = cluster.getDataNodes().get(0).getFSDataset();
|
fsDataSet = cluster.getDataNodes().get(0).getFSDataset();
|
||||||
containerManager = new ContainerManagerImpl();
|
containerManager = new ContainerManagerImpl();
|
||||||
|
chunkManager = new ChunkManagerImpl(containerManager);
|
||||||
|
containerManager.setChunkManager(chunkManager);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -115,7 +135,8 @@ public class TestContainerPersistence {
|
||||||
ContainerData data = new ContainerData(containerName);
|
ContainerData data = new ContainerData(containerName);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName),
|
||||||
|
data);
|
||||||
Assert.assertTrue(containerManager.getContainerMap()
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
.containsKey(containerName));
|
.containsKey(containerName));
|
||||||
ContainerManagerImpl.ContainerStatus status = containerManager
|
ContainerManagerImpl.ContainerStatus status = containerManager
|
||||||
|
@ -132,14 +153,11 @@ public class TestContainerPersistence {
|
||||||
String containerPathString = ContainerUtils.getContainerNameFromFile(new
|
String containerPathString = ContainerUtils.getContainerNameFromFile(new
|
||||||
File(status.getContainer().getContainerPath()));
|
File(status.getContainer().getContainerPath()));
|
||||||
|
|
||||||
Path meta = Paths.get(containerPathString);
|
Path meta = Paths.get(status.getContainer().getDBPath()).getParent();
|
||||||
|
Assert.assertTrue(Files.exists(meta));
|
||||||
String metadataFile = meta.toString() + OzoneConsts.CONTAINER_META;
|
|
||||||
Assert.assertTrue(new File(metadataFile).exists());
|
|
||||||
|
|
||||||
|
|
||||||
String dbPath = status.getContainer().getDBPath();
|
String dbPath = status.getContainer().getDBPath();
|
||||||
|
|
||||||
LevelDBStore store = null;
|
LevelDBStore store = null;
|
||||||
try {
|
try {
|
||||||
store = new LevelDBStore(new File(dbPath), false);
|
store = new LevelDBStore(new File(dbPath), false);
|
||||||
|
@ -158,9 +176,9 @@ public class TestContainerPersistence {
|
||||||
ContainerData data = new ContainerData(containerName);
|
ContainerData data = new ContainerData(containerName);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||||
try {
|
try {
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||||
fail("Expected Exception not thrown.");
|
fail("Expected Exception not thrown.");
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
Assert.assertNotNull(ex);
|
Assert.assertNotNull(ex);
|
||||||
|
@ -176,12 +194,12 @@ public class TestContainerPersistence {
|
||||||
ContainerData data = new ContainerData(containerName1);
|
ContainerData data = new ContainerData(containerName1);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName1), data);
|
||||||
|
|
||||||
data = new ContainerData(containerName2);
|
data = new ContainerData(containerName2);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName2), data);
|
||||||
|
|
||||||
|
|
||||||
Assert.assertTrue(containerManager.getContainerMap()
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
|
@ -189,7 +207,7 @@ public class TestContainerPersistence {
|
||||||
Assert.assertTrue(containerManager.getContainerMap()
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
.containsKey(containerName2));
|
.containsKey(containerName2));
|
||||||
|
|
||||||
containerManager.deleteContainer(createSingleNodePipeline(),
|
containerManager.deleteContainer(createSingleNodePipeline(containerName1),
|
||||||
containerName1);
|
containerName1);
|
||||||
Assert.assertFalse(containerManager.getContainerMap()
|
Assert.assertFalse(containerManager.getContainerMap()
|
||||||
.containsKey(containerName1));
|
.containsKey(containerName1));
|
||||||
|
@ -200,7 +218,7 @@ public class TestContainerPersistence {
|
||||||
data = new ContainerData(containerName1);
|
data = new ContainerData(containerName1);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName1), data);
|
||||||
|
|
||||||
// Assert we still have both containers.
|
// Assert we still have both containers.
|
||||||
Assert.assertTrue(containerManager.getContainerMap()
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
|
@ -228,7 +246,7 @@ public class TestContainerPersistence {
|
||||||
ContainerData data = new ContainerData(containerName);
|
ContainerData data = new ContainerData(containerName);
|
||||||
data.addMetadata("VOLUME", "shire");
|
data.addMetadata("VOLUME", "shire");
|
||||||
data.addMetadata("owner)", "bilbo");
|
data.addMetadata("owner)", "bilbo");
|
||||||
containerManager.createContainer(createSingleNodePipeline(), data);
|
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||||
testMap.put(containerName, data);
|
testMap.put(containerName, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,6 +269,204 @@ public class TestContainerPersistence {
|
||||||
// Assert that we listed all the keys that we had put into
|
// Assert that we listed all the keys that we had put into
|
||||||
// container.
|
// container.
|
||||||
Assert.assertTrue(testMap.isEmpty());
|
Assert.assertTrue(testMap.isEmpty());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* Writes a single chunk.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWriteChunk() throws IOException, NoSuchAlgorithmException {
|
||||||
|
final int datalen = 1024;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
ContainerData cData = new ContainerData(containerName);
|
||||||
|
cData.addMetadata("VOLUME", "shire");
|
||||||
|
cData.addMetadata("owner)", "bilbo");
|
||||||
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes many chunks of the same key into different chunk files and verifies
|
||||||
|
* that we have that data in many files.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWritReadManyChunks() throws IOException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
final int datalen = 1024;
|
||||||
|
final int chunkCount = 1024;
|
||||||
|
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
|
Map<String, ChunkInfo> fileHashMap = new HashMap<>();
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
ContainerData cData = new ContainerData(containerName);
|
||||||
|
cData.addMetadata("VOLUME", "shire");
|
||||||
|
cData.addMetadata("owner)", "bilbo");
|
||||||
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
|
ChunkInfo info = getChunk(keyName, x, 0, datalen);
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
String fileName = String.format("%s.data.%d", keyName, x);
|
||||||
|
fileHashMap.put(fileName, info);
|
||||||
|
}
|
||||||
|
|
||||||
|
ContainerData cNewData = containerManager.readContainer(containerName);
|
||||||
|
Assert.assertNotNull(cNewData);
|
||||||
|
Path dataDir = ContainerUtils.getDataDirectory(cNewData);
|
||||||
|
|
||||||
|
String globFormat = String.format("%s.data.*", keyName);
|
||||||
|
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
|
||||||
|
// Read chunk via file system and verify.
|
||||||
|
int count = 0;
|
||||||
|
try (DirectoryStream<Path> stream =
|
||||||
|
Files.newDirectoryStream(dataDir, globFormat)) {
|
||||||
|
for (Path fname : stream) {
|
||||||
|
sha.update(FileUtils.readFileToByteArray(fname.toFile()));
|
||||||
|
String val = Hex.encodeHexString(sha.digest());
|
||||||
|
Assert.assertEquals(fileHashMap.get(fname.getFileName().toString())
|
||||||
|
.getChecksum(),
|
||||||
|
val);
|
||||||
|
count++;
|
||||||
|
sha.reset();
|
||||||
|
}
|
||||||
|
Assert.assertEquals(chunkCount, count);
|
||||||
|
|
||||||
|
// Read chunk via ReadChunk call.
|
||||||
|
sha.reset();
|
||||||
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
|
String fileName = String.format("%s.data.%d", keyName, x);
|
||||||
|
ChunkInfo info = fileHashMap.get(fileName);
|
||||||
|
byte[] data = chunkManager.readChunk(pipeline, keyName, info);
|
||||||
|
sha.update(data);
|
||||||
|
Assert.assertEquals(Hex.encodeHexString(sha.digest()),
|
||||||
|
info.getChecksum());
|
||||||
|
sha.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a single chunk and tries to overwrite that chunk without over write
|
||||||
|
* flag then re-tries with overwrite flag.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOverWrite() throws IOException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
final int datalen = 1024;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
ContainerData cData = new ContainerData(containerName);
|
||||||
|
cData.addMetadata("VOLUME", "shire");
|
||||||
|
cData.addMetadata("owner)", "bilbo");
|
||||||
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
try {
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
} catch(IOException ex) {
|
||||||
|
Assert.assertTrue(ex.getMessage().contains(
|
||||||
|
"Rejecting write chunk request. OverWrite flag required."));
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the overwrite flag it should work now.
|
||||||
|
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test writes data as many small writes and tries to read back the data
|
||||||
|
* in a single large read.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultipleWriteSingleRead() throws IOException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
final int datalen = 1024;
|
||||||
|
final int chunkCount = 1024;
|
||||||
|
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
ContainerData cData = new ContainerData(containerName);
|
||||||
|
cData.addMetadata("VOLUME", "shire");
|
||||||
|
cData.addMetadata("owner)", "bilbo");
|
||||||
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
MessageDigest oldSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
|
// we are writing to the same chunk file but at different offsets.
|
||||||
|
long offset = x * datalen;
|
||||||
|
ChunkInfo info = getChunk(keyName, 0, offset, datalen);
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
oldSha.update(data);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request to read the whole data in a single go.
|
||||||
|
ChunkInfo largeChunk = getChunk(keyName, 0, 0, datalen * chunkCount);
|
||||||
|
byte[] newdata = chunkManager.readChunk(pipeline, keyName, largeChunk);
|
||||||
|
MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
newSha.update(newdata);
|
||||||
|
Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
|
||||||
|
Hex.encodeHexString(newSha.digest()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a chunk and deletes it, re-reads to make sure it is gone.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws NoSuchAlgorithmException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteChunk() throws IOException,
|
||||||
|
NoSuchAlgorithmException {
|
||||||
|
final int datalen = 1024;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
|
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
ContainerData cData = new ContainerData(containerName);
|
||||||
|
cData.addMetadata("VOLUME", "shire");
|
||||||
|
cData.addMetadata("owner)", "bilbo");
|
||||||
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
|
||||||
|
byte[] data = getData(datalen);
|
||||||
|
setDataChecksum(info, data);
|
||||||
|
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||||
|
chunkManager.deleteChunk(pipeline, keyName, info);
|
||||||
|
exception.expect(IOException.class);
|
||||||
|
exception.expectMessage("Unable to find the chunk file.");
|
||||||
|
chunkManager.readChunk(pipeline, keyName, info);
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
|
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -37,14 +38,13 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
|
|
||||||
public class TestOzoneContainer {
|
public class TestOzoneContainer {
|
||||||
@Test
|
@Test
|
||||||
public void testCreateOzoneContainer() throws Exception {
|
public void testCreateOzoneContainer() throws Exception {
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
Configuration conf = new OzoneConfiguration();
|
Configuration conf = new OzoneConfiguration();
|
||||||
URL p = conf.getClass().getResource("");
|
URL p = conf.getClass().getResource("");
|
||||||
String path = p.getPath().concat(
|
String path = p.getPath().concat(
|
||||||
TestOzoneContainer.class.getSimpleName());
|
TestOzoneContainer.class.getSimpleName());
|
||||||
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
||||||
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
||||||
|
@ -59,7 +59,8 @@ public class TestOzoneContainer {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
|
||||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
|
||||||
|
(containerName);
|
||||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
pipeline.getLeader().getContainerPort());
|
pipeline.getLeader().getContainerPort());
|
||||||
OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
|
OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes
|
||||||
|
@ -69,7 +70,7 @@ public class TestOzoneContainer {
|
||||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||||
client.connect();
|
client.connect();
|
||||||
ContainerProtos.ContainerCommandRequestProto request =
|
ContainerProtos.ContainerCommandRequestProto request =
|
||||||
ContainerTestHelper.getCreateContainerRequest();
|
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||||
ContainerProtos.ContainerCommandResponseProto response =
|
ContainerProtos.ContainerCommandResponseProto response =
|
||||||
client.sendCommand(request);
|
client.sendCommand(request);
|
||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
|
@ -79,13 +80,13 @@ public class TestOzoneContainer {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOzoneContainerViaDataNode() throws Exception {
|
public void testOzoneContainerViaDataNode() throws Exception {
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
Configuration conf = new OzoneConfiguration();
|
Configuration conf = new OzoneConfiguration();
|
||||||
URL p = conf.getClass().getResource("");
|
URL p = conf.getClass().getResource("");
|
||||||
String path = p.getPath().concat(
|
String path = p.getPath().concat(
|
||||||
TestOzoneContainer.class.getSimpleName());
|
TestOzoneContainer.class.getSimpleName());
|
||||||
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
|
||||||
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
|
||||||
|
@ -95,7 +96,8 @@ public class TestOzoneContainer {
|
||||||
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
|
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
|
||||||
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
|
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
|
||||||
|
|
||||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
Pipeline pipeline =
|
||||||
|
ContainerTestHelper.createSingleNodePipeline(containerName);
|
||||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
pipeline.getLeader().getContainerPort());
|
pipeline.getLeader().getContainerPort());
|
||||||
|
|
||||||
|
@ -105,12 +107,44 @@ public class TestOzoneContainer {
|
||||||
// This client talks to ozone container via datanode.
|
// This client talks to ozone container via datanode.
|
||||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||||
client.connect();
|
client.connect();
|
||||||
|
|
||||||
|
// Create container
|
||||||
ContainerProtos.ContainerCommandRequestProto request =
|
ContainerProtos.ContainerCommandRequestProto request =
|
||||||
ContainerTestHelper.getCreateContainerRequest();
|
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||||
ContainerProtos.ContainerCommandResponseProto response =
|
ContainerProtos.ContainerCommandResponseProto response =
|
||||||
client.sendCommand(request);
|
client.sendCommand(request);
|
||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
|
// Write Chunk
|
||||||
|
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||||
|
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
|
||||||
|
keyName, 1024);
|
||||||
|
|
||||||
|
response = client.sendCommand(writeChunkRequest);
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||||
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
|
// Read Chunk
|
||||||
|
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
|
||||||
|
.getWriteChunk());
|
||||||
|
|
||||||
|
response = client.sendCommand(request);
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||||
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
|
//Delete Chunk
|
||||||
|
request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
|
||||||
|
.getWriteChunk());
|
||||||
|
|
||||||
|
response = client.sendCommand(request);
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||||
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
|
client.close();
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
|
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -45,11 +46,12 @@ public class TestContainerServer {
|
||||||
@Test
|
@Test
|
||||||
public void testPipeline() throws IOException {
|
public void testPipeline() throws IOException {
|
||||||
EmbeddedChannel channel = null;
|
EmbeddedChannel channel = null;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
try {
|
try {
|
||||||
channel = new EmbeddedChannel(new XceiverServerHandler(
|
channel = new EmbeddedChannel(new XceiverServerHandler(
|
||||||
new TestContainerDispatcher()));
|
new TestContainerDispatcher()));
|
||||||
ContainerCommandRequestProto request =
|
ContainerCommandRequestProto request =
|
||||||
ContainerTestHelper.getCreateContainerRequest();
|
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||||
channel.writeInbound(request);
|
channel.writeInbound(request);
|
||||||
Assert.assertTrue(channel.finish());
|
Assert.assertTrue(channel.finish());
|
||||||
ContainerCommandResponseProto response = channel.readOutbound();
|
ContainerCommandResponseProto response = channel.readOutbound();
|
||||||
|
@ -65,9 +67,10 @@ public class TestContainerServer {
|
||||||
public void testClientServer() throws Exception {
|
public void testClientServer() throws Exception {
|
||||||
XceiverServer server = null;
|
XceiverServer server = null;
|
||||||
XceiverClient client = null;
|
XceiverClient client = null;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
try {
|
try {
|
||||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
|
||||||
|
(containerName);
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
pipeline.getLeader().getContainerPort());
|
pipeline.getLeader().getContainerPort());
|
||||||
|
@ -79,7 +82,7 @@ public class TestContainerServer {
|
||||||
client.connect();
|
client.connect();
|
||||||
|
|
||||||
ContainerCommandRequestProto request =
|
ContainerCommandRequestProto request =
|
||||||
ContainerTestHelper.getCreateContainerRequest();
|
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||||
ContainerCommandResponseProto response = client.sendCommand(request);
|
ContainerCommandResponseProto response = client.sendCommand(request);
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -96,9 +99,11 @@ public class TestContainerServer {
|
||||||
public void testClientServerWithContainerDispatcher() throws Exception {
|
public void testClientServerWithContainerDispatcher() throws Exception {
|
||||||
XceiverServer server = null;
|
XceiverServer server = null;
|
||||||
XceiverClient client = null;
|
XceiverClient client = null;
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline
|
||||||
|
(containerName);
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
pipeline.getLeader().getContainerPort());
|
pipeline.getLeader().getContainerPort());
|
||||||
|
@ -111,7 +116,7 @@ public class TestContainerServer {
|
||||||
client.connect();
|
client.connect();
|
||||||
|
|
||||||
ContainerCommandRequestProto request =
|
ContainerCommandRequestProto request =
|
||||||
ContainerTestHelper.getCreateContainerRequest();
|
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||||
ContainerCommandResponseProto response = client.sendCommand(request);
|
ContainerCommandResponseProto response = client.sendCommand(request);
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
|
Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
|
||||||
|
|
Loading…
Reference in New Issue