HDDS-394. Rename *Key Apis in DatanodeContainerProtocol to *Block apis.

Contributed Dinesh Chitlangia.
This commit is contained in:
Anu Engineer 2018-09-20 11:51:49 -07:00
parent aa4bd493c3
commit 096a716080
44 changed files with 797 additions and 716 deletions

View File

@ -23,7 +23,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.client.BlockID;
@ -32,7 +32,8 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putKey;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.putBlock;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.writeChunk;
@ -57,7 +58,7 @@ public class ChunkOutputStream extends OutputStream {
private final BlockID blockID;
private final String key;
private final String traceID;
private final KeyData.Builder containerKeyData;
private final BlockData.Builder containerBlockData;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private ByteBuffer buffer;
@ -84,7 +85,7 @@ public class ChunkOutputStream extends OutputStream {
this.chunkSize = chunkSize;
KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder()
this.containerBlockData = BlockData.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
@ -154,7 +155,7 @@ public class ChunkOutputStream extends OutputStream {
writeChunkToContainer();
}
try {
putKey(xceiverClient, containerKeyData.build(), traceID);
putBlock(xceiverClient, containerBlockData.build(), traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
@ -230,6 +231,6 @@ public class ChunkOutputStream extends OutputStream {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
containerKeyData.addChunks(chunk);
containerBlockData.addChunks(chunk);
}
}

View File

@ -326,8 +326,8 @@ public final class HddsUtils {
switch (proto.getCmdType()) {
case ReadContainer:
case ReadChunk:
case ListKey:
case GetKey:
case ListBlock:
case GetBlock:
case GetSmallFile:
case ListContainer:
case ListChunk:
@ -340,8 +340,8 @@ public final class HddsUtils {
case CreateContainer:
case DeleteChunk:
case DeleteContainer:
case DeleteKey:
case PutKey:
case DeleteBlock:
case PutBlock:
case PutSmallFile:
default:
return false;

View File

@ -35,16 +35,16 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyRequestProto;
.GetBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyResponseProto;
.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutKeyRequestProto;
.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -76,33 +76,33 @@ public final class ContainerProtocolCalls {
}
/**
* Calls the container protocol to get a container key.
* Calls the container protocol to get a container block.
*
* @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container
* @param traceID container protocol call args
* @return container protocol get key response
* @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call
*/
public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetKey)
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetKey(readKeyRequest)
.setGetBlock(readBlockRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
return response.getGetKey();
return response.getGetBlock();
}
/**
@ -136,26 +136,26 @@ public final class ContainerProtocolCalls {
}
/**
* Calls the container protocol to put a container key.
* Calls the container protocol to put a container block.
*
* @param xceiverClient client to perform call
* @param containerKeyData key data to identify container
* @param containerBlockData block data to identify container
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
public static void putKey(XceiverClientSpi xceiverClient,
KeyData containerKeyData, String traceID) throws IOException {
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
public static void putBlock(XceiverClientSpi xceiverClient,
BlockData containerBlockData, String traceID) throws IOException {
PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto
.newBuilder()
.setKeyData(containerKeyData);
.setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.PutKey)
.setContainerID(containerKeyData.getBlockID().getContainerID())
.setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setPutKey(createKeyRequest)
.setPutBlock(createBlockRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
@ -224,9 +224,9 @@ public final class ContainerProtocolCalls {
/**
* Allows writing a small file using single RPC. This takes the container
* name, key name and data to write sends all that data to the container using
* a single RPC. This API is designed to be used for files which are smaller
* than 1 MB.
* name, block name and data to write sends all that data to the container
* using a single RPC. This API is designed to be used for files which are
* smaller than 1 MB.
*
* @param client - client that communicates with the container.
* @param blockID - ID of the block
@ -238,12 +238,12 @@ public final class ContainerProtocolCalls {
BlockID blockID, byte[] data, String traceID)
throws IOException {
KeyData containerKeyData =
KeyData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
BlockData containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.build();
PutKeyRequestProto.Builder createKeyRequest =
PutKeyRequestProto.newBuilder()
.setKeyData(containerKeyData);
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
.setBlockData(containerBlockData);
KeyValue keyValue =
KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
@ -255,7 +255,7 @@ public final class ContainerProtocolCalls {
PutSmallFileRequestProto putSmallFileRequest =
PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk)
.setKey(createKeyRequest).setData(ByteString.copyFrom(data))
.setBlock(createBlockRequest).setData(ByteString.copyFrom(data))
.build();
String id = client.getPipeline().getLeader().getUuidString();
@ -387,12 +387,12 @@ public final class ContainerProtocolCalls {
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
BlockID blockID, String traceID) throws IOException {
GetKeyRequestProto.Builder getKey = GetKeyRequestProto
GetBlockRequestProto.Builder getBlock = GetBlockRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto
.newBuilder().setKey(getKey)
.newBuilder().setBlock(getBlock)
.build();
String id = client.getPipeline().getLeader().getUuidString();

View File

@ -31,7 +31,7 @@ import java.util.ArrayList;
/**
* Helper class to convert Protobuf to Java classes.
*/
public class KeyData {
public class BlockData {
private final BlockID blockID;
private final Map<String, String> metadata;
@ -43,9 +43,10 @@ public class KeyData {
* When #elements == 1, chunkList refers to the only element.
* When #elements > 1, chunkList refers to the list.
*
* Please note : when we are working with keys, we don't care what they point
* to. So we We don't read chunkinfo nor validate them. It is responsibility
* of higher layer like ozone. We just read and write data from network.
* Please note : when we are working with blocks, we don't care what they
* point to. So we We don't read chunkinfo nor validate them. It is
* responsibility of higher layer like ozone. We just read and write data
* from network.
*/
private Object chunkList;
@ -55,44 +56,45 @@ public class KeyData {
private long size;
/**
* Constructs a KeyData Object.
* Constructs a BlockData Object.
*
* @param blockID
*/
public KeyData(BlockID blockID) {
public BlockData(BlockID blockID) {
this.blockID = blockID;
this.metadata = new TreeMap<>();
this.size = 0;
}
/**
* Returns a keyData object from the protobuf data.
* Returns a blockData object from the protobuf data.
*
* @param data - Protobuf data.
* @return - KeyData
* @return - BlockData
* @throws IOException
*/
public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws
IOException {
KeyData keyData = new KeyData(BlockID.getFromProtobuf(data.getBlockID()));
BlockData blockData = new BlockData(
BlockID.getFromProtobuf(data.getBlockID()));
for (int x = 0; x < data.getMetadataCount(); x++) {
keyData.addMetadata(data.getMetadata(x).getKey(),
blockData.addMetadata(data.getMetadata(x).getKey(),
data.getMetadata(x).getValue());
}
keyData.setChunks(data.getChunksList());
blockData.setChunks(data.getChunksList());
if (data.hasSize()) {
Preconditions.checkArgument(data.getSize() == keyData.getSize());
Preconditions.checkArgument(data.getSize() == blockData.getSize());
}
return keyData;
return blockData;
}
/**
* Returns a Protobuf message from KeyData.
* Returns a Protobuf message from BlockData.
* @return Proto Buf Message.
*/
public ContainerProtos.KeyData getProtoBufMessage() {
ContainerProtos.KeyData.Builder builder =
ContainerProtos.KeyData.newBuilder();
public ContainerProtos.BlockData getProtoBufMessage() {
ContainerProtos.BlockData.Builder builder =
ContainerProtos.BlockData.newBuilder();
builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf());
for (Map.Entry<String, String> entry : metadata.entrySet()) {
ContainerProtos.KeyValue.Builder keyValBuilder =

View File

@ -49,13 +49,13 @@ package hadoop.hdds.datanode;
* 5. ListContainer - Returns the list of containers on this
* datanode. This will be used by tests and tools.
*
* 6. PutKey - Given a valid container, creates a key.
* 6. PutBlock - Given a valid container, creates a block.
*
* 7. GetKey - Allows user to read the metadata of a Key.
* 7. GetBlock - Allows user to read the metadata of a block.
*
* 8. DeleteKey - Deletes a given key.
* 8. DeleteBlock - Deletes a given block.
*
* 9. ListKey - Returns a list of keys that are present inside
* 9. ListBlock - Returns a list of blocks that are present inside
* a given container.
*
* 10. ReadChunk - Allows us to read a chunk.
@ -64,13 +64,13 @@ package hadoop.hdds.datanode;
*
* 12. WriteChunk - Allows us to write a chunk
*
* 13. ListChunk - Given a Container/Key returns the list of Chunks.
* 13. ListChunk - Given a Container/Block returns the list of Chunks.
*
* 14. CompactChunk - Re-writes a chunk based on Offsets.
*
* 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk.
* 15. PutSmallFile - A single RPC that combines both putBlock and WriteChunk.
*
* 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk.
* 16. GetSmallFile - A single RPC that combines both getBlock and ReadChunk.
*
* 17. CloseContainer - Closes an open container and makes it immutable.
*
@ -84,10 +84,10 @@ enum Type {
DeleteContainer = 4;
ListContainer = 5;
PutKey = 6;
GetKey = 7;
DeleteKey = 8;
ListKey = 9;
PutBlock = 6;
GetBlock = 7;
DeleteBlock = 8;
ListBlock = 9;
ReadChunk = 10;
DeleteChunk = 11;
@ -95,7 +95,7 @@ enum Type {
ListChunk = 13;
CompactChunk = 14;
/** Combines Key and Chunk Operation into Single RPC. */
/** Combines Block and Chunk Operation into Single RPC. */
PutSmallFile = 15;
GetSmallFile = 16;
CloseContainer = 17;
@ -115,7 +115,7 @@ enum Result {
CONTAINER_NOT_FOUND = 9;
IO_EXCEPTION = 10;
UNABLE_TO_READ_METADATA_DB = 11;
NO_SUCH_KEY = 12;
NO_SUCH_BLOCK = 12;
OVERWRITE_FLAG_REQUIRED = 13;
UNABLE_TO_FIND_DATA_DIR = 14;
INVALID_WRITE_SIZE = 15;
@ -185,10 +185,10 @@ message ContainerCommandRequestProto {
optional ListContainerRequestProto listContainer = 9;
optional CloseContainerRequestProto closeContainer = 10;
optional PutKeyRequestProto putKey = 11;
optional GetKeyRequestProto getKey = 12;
optional DeleteKeyRequestProto deleteKey = 13;
optional ListKeyRequestProto listKey = 14;
optional PutBlockRequestProto putBlock = 11;
optional GetBlockRequestProto getBlock = 12;
optional DeleteBlockRequestProto deleteBlock = 13;
optional ListBlockRequestProto listBlock = 14;
optional ReadChunkRequestProto readChunk = 15;
optional WriteChunkRequestProto writeChunk = 16;
@ -215,10 +215,10 @@ message ContainerCommandResponseProto {
optional ListContainerResponseProto listContainer = 9;
optional CloseContainerResponseProto closeContainer = 10;
optional PutKeyResponseProto putKey = 11;
optional GetKeyResponseProto getKey = 12;
optional DeleteKeyResponseProto deleteKey = 13;
optional ListKeyResponseProto listKey = 14;
optional PutBlockResponseProto putBlock = 11;
optional GetBlockResponseProto getBlock = 12;
optional DeleteBlockResponseProto deleteBlock = 13;
optional ListBlockResponseProto listBlock = 14;
optional WriteChunkResponseProto writeChunk = 15;
optional ReadChunkResponseProto readChunk = 16;
@ -294,7 +294,7 @@ message CloseContainerResponseProto {
optional int64 containerID = 2;
}
message KeyData {
message BlockData {
required DatanodeBlockID blockID = 1;
optional int64 flags = 2; // for future use.
repeated KeyValue metadata = 3;
@ -302,25 +302,25 @@ message KeyData {
optional int64 size = 5;
}
// Key Messages.
message PutKeyRequestProto {
required KeyData keyData = 1;
// Block Messages.
message PutBlockRequestProto {
required BlockData blockData = 1;
}
message PutKeyResponseProto {
message PutBlockResponseProto {
required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}
message GetKeyRequestProto {
message GetBlockRequestProto {
required DatanodeBlockID blockID = 1;
}
message GetKeyResponseProto {
required KeyData keyData = 1;
message GetBlockResponseProto {
required BlockData blockData = 1;
}
message DeleteKeyRequestProto {
message DeleteBlockRequestProto {
required DatanodeBlockID blockID = 1;
}
@ -333,17 +333,17 @@ message GetCommittedBlockLengthResponseProto {
required int64 blockLength = 2;
}
message DeleteKeyResponseProto {
message DeleteBlockResponseProto {
}
message ListKeyRequestProto {
message ListBlockRequestProto {
optional int64 startLocalID = 2;
required uint32 count = 3;
}
message ListKeyResponseProto {
repeated KeyData keyData = 1;
message ListBlockResponseProto {
repeated BlockData blockData = 1;
}
// Chunk Operations
@ -401,11 +401,11 @@ message ListChunkResponseProto {
repeated ChunkInfo chunkData = 1;
}
/** For small file access combines write chunk and putKey into a single
/** For small file access combines write chunk and putBlock into a single
RPC */
message PutSmallFileRequestProto {
required PutKeyRequestProto key = 1;
required PutBlockRequestProto block = 1;
required ChunkInfo chunkInfo = 2;
required bytes data = 3;
}
@ -416,7 +416,7 @@ message PutSmallFileResponseProto {
}
message GetSmallFileRequestProto {
required GetKeyRequestProto key = 1;
required GetBlockRequestProto block = 1;
}
message GetSmallFileResponseProto {

View File

@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import java.util.ArrayList;
import java.util.Collections;
@ -33,9 +33,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
/**
* Map: containerId -> (localId -> {@link KeyData}).
* Map: containerId {@literal ->} (localId {@literal ->} {@link BlockData}).
* The outer container map does not entail locking for a better performance.
* The inner {@link KeyDataMap} is synchronized.
* The inner {@link BlockDataMap} is synchronized.
*
* This class will maintain list of open keys per container when closeContainer
* command comes, it should autocommit all open keys of a open container before
@ -43,16 +43,16 @@ import java.util.function.Function;
*/
public class OpenContainerBlockMap {
/**
* Map: localId -> KeyData.
* Map: localId {@literal ->} BlockData.
*
* In order to support {@link #getAll()}, the update operations are
* synchronized.
*/
static class KeyDataMap {
private final ConcurrentMap<Long, KeyData> blocks =
static class BlockDataMap {
private final ConcurrentMap<Long, BlockData> blocks =
new ConcurrentHashMap<>();
KeyData get(long localId) {
BlockData get(long localId) {
return blocks.get(localId);
}
@ -61,12 +61,12 @@ public class OpenContainerBlockMap {
return blocks.size();
}
synchronized KeyData computeIfAbsent(
long localId, Function<Long, KeyData> f) {
synchronized BlockData computeIfAbsent(
long localId, Function<Long, BlockData> f) {
return blocks.computeIfAbsent(localId, f);
}
synchronized List<KeyData> getAll() {
synchronized List<BlockData> getAll() {
return new ArrayList<>(blocks.values());
}
}
@ -79,7 +79,7 @@ public class OpenContainerBlockMap {
*
* For now, we will track all open blocks of a container in the blockMap.
*/
private final ConcurrentMap<Long, KeyDataMap> containers =
private final ConcurrentMap<Long, BlockDataMap> containers =
new ConcurrentHashMap<>();
/**
@ -94,9 +94,9 @@ public class OpenContainerBlockMap {
public void addChunk(BlockID blockID, ChunkInfo info) {
Preconditions.checkNotNull(info);
containers.computeIfAbsent(blockID.getContainerID(), id -> new KeyDataMap())
.computeIfAbsent(blockID.getLocalID(), id -> new KeyData(blockID))
.addChunk(info);
containers.computeIfAbsent(blockID.getContainerID(),
id -> new BlockDataMap()).computeIfAbsent(blockID.getLocalID(),
id -> new BlockData(blockID)).addChunk(info);
}
/**
@ -113,21 +113,21 @@ public class OpenContainerBlockMap {
}
/**
* Returns the list of open to the openContainerBlockMap.
* Returns the list of open blocks to the openContainerBlockMap.
* @param containerId container id
* @return List of open Keys(blocks)
* @return List of open blocks
*/
public List<KeyData> getOpenKeys(long containerId) {
public List<BlockData> getOpenBlocks(long containerId) {
return Optional.ofNullable(containers.get(containerId))
.map(KeyDataMap::getAll)
.map(BlockDataMap::getAll)
.orElseGet(Collections::emptyList);
}
/**
* removes the block from the block map.
* @param blockID
* @param blockID - block ID
*/
public void removeFromKeyMap(BlockID blockID) {
public void removeFromBlockMap(BlockID blockID) {
Preconditions.checkNotNull(blockID);
containers.computeIfPresent(blockID.getContainerID(), (containerId, blocks)
-> blocks.removeAndGetSize(blockID.getLocalID()) == 0? null: blocks);
@ -136,16 +136,16 @@ public class OpenContainerBlockMap {
/**
* Returns true if the block exists in the map, false otherwise.
*
* @param blockID
* @param blockID - Block ID.
* @return True, if it exists, false otherwise
*/
public boolean checkIfBlockExists(BlockID blockID) {
KeyDataMap keyDataMap = containers.get(blockID.getContainerID());
BlockDataMap keyDataMap = containers.get(blockID.getContainerID());
return keyDataMap != null && keyDataMap.get(blockID.getLocalID()) != null;
}
@VisibleForTesting
KeyDataMap getKeyDataMap(long containerId) {
BlockDataMap getBlockDataMap(long containerId) {
return containers.get(containerId);
}
}

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.container.common.helpers
.DeletedContainerBlocksSummary;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
@ -199,7 +199,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
}
int newDeletionBlocks = 0;
MetadataStore containerDB = KeyUtils.getDB(containerData, conf);
MetadataStore containerDB = BlockUtils.getDB(containerData, conf);
for (Long blk : delTX.getLocalIDList()) {
BatchOperation batch = new BatchOperation();
byte[] blkBytes = Longs.toByteArray(blk);

View File

@ -103,10 +103,10 @@ import java.util.stream.Collectors;
* implementation. For example, synchronization between writeChunk and
* createContainer in {@link ContainerStateMachine}.
*
* PutKey is synchronized with WriteChunk operations, PutKey for a block is
* executed only after all the WriteChunk preceding the PutKey have finished.
* PutBlock is synchronized with WriteChunk operations, PutBlock for a block is
* executed only after all the WriteChunk preceding the PutBlock have finished.
*
* CloseContainer is synchronized with WriteChunk and PutKey operations,
* CloseContainer is synchronized with WriteChunk and PutBlock operations,
* CloseContainer for a container is processed after all the preceding write
* operations for the container have finished.
* */
@ -443,7 +443,7 @@ public class ContainerStateMachine extends BaseStateMachine {
/**
* This class maintains maps and provide utilities to enforce synchronization
* among createContainer, writeChunk, putKey and closeContainer.
* among createContainer, writeChunk, putBlock and closeContainer.
*/
private class StateMachineHelper {
@ -453,7 +453,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final ConcurrentHashMap<Long, CommitChunkFutureMap>
block2ChunkMap;
// Map for putKey futures
// Map for putBlock futures
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
blockCommitMap;
@ -505,11 +505,11 @@ public class ContainerStateMachine extends BaseStateMachine {
// The following section handles applyTransaction transactions
// on a container
private CompletableFuture<Message> handlePutKey(
private CompletableFuture<Message> handlePutBlock(
ContainerCommandRequestProto requestProto) {
List<CompletableFuture<Message>> futureList = new ArrayList<>();
long localId =
requestProto.getPutKey().getKeyData().getBlockID().getLocalID();
requestProto.getPutBlock().getBlockData().getBlockID().getLocalID();
// Need not wait for create container future here as it has already
// finished.
if (block2ChunkMap.get(localId) != null) {
@ -518,18 +518,18 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture<Message> effectiveFuture =
runCommandAfterFutures(futureList, requestProto);
CompletableFuture<Message> putKeyFuture =
CompletableFuture<Message> putBlockFuture =
effectiveFuture.thenApply(message -> {
blockCommitMap.remove(localId);
return message;
});
blockCommitMap.put(localId, putKeyFuture);
return putKeyFuture;
blockCommitMap.put(localId, putBlockFuture);
return putBlockFuture;
}
// Close Container should be executed only if all pending WriteType
// container cmds get executed. Transactions which can return a future
// are WriteChunk and PutKey.
// are WriteChunk and PutBlock.
private CompletableFuture<Message> handleCloseContainer(
ContainerCommandRequestProto requestProto) {
List<CompletableFuture<Message>> futureList = new ArrayList<>();
@ -539,7 +539,7 @@ public class ContainerStateMachine extends BaseStateMachine {
block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
futureList.addAll(blockCommitMap.values());
// There are pending write Chunk/PutKey type requests
// There are pending write Chunk/PutBlock type requests
// Queue this closeContainer request behind all these requests
CompletableFuture<Message> closeContainerFuture =
runCommandAfterFutures(futureList, requestProto);
@ -615,8 +615,8 @@ public class ContainerStateMachine extends BaseStateMachine {
return handleChunkCommit(requestProto, index);
case CloseContainer:
return handleCloseContainer(requestProto);
case PutKey:
return handlePutKey(requestProto);
case PutBlock:
return handlePutBlock(requestProto);
case CreateContainer:
return handleCreateContainer(requestProto);
default:

View File

@ -21,12 +21,12 @@ package org.apache.hadoop.ozone.container.keyvalue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.utils.MetaStoreIterator;
import org.apache.hadoop.utils.MetadataKeyFilters;
@ -48,7 +48,7 @@ import java.util.NoSuchElementException;
* {@link MetadataKeyFilters#getNormalKeyFilter()}
*/
@InterfaceAudience.Public
public class KeyValueBlockIterator implements BlockIterator<KeyData> {
public class KeyValueBlockIterator implements BlockIterator<BlockData> {
private static final Logger LOG = LoggerFactory.getLogger(
KeyValueBlockIterator.class);
@ -57,7 +57,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> {
private static KeyPrefixFilter defaultBlockFilter = MetadataKeyFilters
.getNormalKeyFilter();
private KeyPrefixFilter blockFilter;
private KeyData nextBlock;
private BlockData nextBlock;
private long containerId;
/**
@ -91,7 +91,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> {
containerData;
keyValueContainerData.setDbFile(KeyValueContainerLocationUtil
.getContainerDBFile(metdataPath, containerId));
MetadataStore metadataStore = KeyUtils.getDB(keyValueContainerData, new
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, new
OzoneConfiguration());
blockIterator = metadataStore.iterator();
blockFilter = filter;
@ -103,9 +103,9 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> {
* @throws IOException
*/
@Override
public KeyData nextBlock() throws IOException, NoSuchElementException {
public BlockData nextBlock() throws IOException, NoSuchElementException {
if (nextBlock != null) {
KeyData currentBlock = nextBlock;
BlockData currentBlock = nextBlock;
nextBlock = null;
return currentBlock;
}
@ -124,7 +124,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> {
if (blockIterator.hasNext()) {
KeyValue block = blockIterator.next();
if (blockFilter.filterKey(null, block.getKey(), null)) {
nextBlock = KeyUtils.getKeyData(block.getValue());
nextBlock = BlockUtils.getBlockData(block.getValue());
LOG.trace("Block matching with filter found: blockID is : {} for " +
"containerID {}", nextBlock.getLocalID(), containerId);
return true;

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers
.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
@ -293,7 +293,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
try {
MetadataStore db = KeyUtils.getDB(containerData, config);
MetadataStore db = BlockUtils.getDB(containerData, config);
db.compactDB();
} catch (StorageContainerException ex) {
throw ex;

View File

@ -48,10 +48,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -62,13 +62,13 @@ import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.util.AutoCloseableLock;
@ -117,7 +117,7 @@ public class KeyValueHandler extends Handler {
KeyValueHandler.class);
private final ContainerType containerType;
private final KeyManager keyManager;
private final BlockManager blockManager;
private final ChunkManager chunkManager;
private final BlockDeletingService blockDeletingService;
private final VolumeChoosingPolicy volumeChoosingPolicy;
@ -129,7 +129,7 @@ public class KeyValueHandler extends Handler {
VolumeSet volSet, ContainerMetrics metrics) {
super(config, contSet, volSet, metrics);
containerType = ContainerType.KeyValueContainer;
keyManager = new KeyManagerImpl(config);
blockManager = new BlockManagerImpl(config);
chunkManager = new ChunkManagerImpl();
long svcInterval = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
@ -187,13 +187,13 @@ public class KeyValueHandler extends Handler {
return handleUnsupportedOp(request);
case CloseContainer:
return handleCloseContainer(request, kvContainer);
case PutKey:
return handlePutKey(request, kvContainer);
case GetKey:
return handleGetKey(request, kvContainer);
case DeleteKey:
return handleDeleteKey(request, kvContainer);
case ListKey:
case PutBlock:
return handlePutBlock(request, kvContainer);
case GetBlock:
return handleGetBlock(request, kvContainer);
case DeleteBlock:
return handleDeleteBlock(request, kvContainer);
case ListBlock:
return handleUnsupportedOp(request);
case ReadChunk:
return handleReadChunk(request, kvContainer);
@ -222,8 +222,8 @@ public class KeyValueHandler extends Handler {
}
@VisibleForTesting
public KeyManager getKeyManager() {
return this.keyManager;
public BlockManager getBlockManager() {
return this.blockManager;
}
/**
@ -413,7 +413,7 @@ public class KeyValueHandler extends Handler {
// remove the container from open block map once, all the blocks
// have been committed and the container is closed
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
commitPendingKeys(kvContainer);
commitPendingBlocks(kvContainer);
kvContainer.close();
// make sure the the container open keys from BlockMap gets removed
openContainerBlockMap.removeContainer(kvData.getContainerID());
@ -429,13 +429,13 @@ public class KeyValueHandler extends Handler {
}
/**
* Handle Put Key operation. Calls KeyManager to process the request.
* Handle Put Block operation. Calls BlockManager to process the request.
*/
ContainerCommandResponseProto handlePutKey(
ContainerCommandResponseProto handlePutBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
long blockLength;
if (!request.hasPutKey()) {
if (!request.hasPutBlock()) {
LOG.debug("Malformed Put Key request. trace ID: {}",
request.getTraceID());
return ContainerUtils.malformedRequest(request);
@ -444,11 +444,11 @@ public class KeyValueHandler extends Handler {
try {
checkContainerOpen(kvContainer);
KeyData keyData = KeyData.getFromProtoBuf(
request.getPutKey().getKeyData());
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
blockLength = commitKey(keyData, kvContainer);
metrics.incContainerBytesStats(Type.PutKey, numBytes);
BlockData blockData = BlockData.getFromProtoBuf(
request.getPutBlock().getBlockData());
long numBytes = blockData.getProtoBufMessage().toByteArray().length;
blockLength = commitKey(blockData, kvContainer);
metrics.incContainerBytesStats(Type.PutBlock, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
@ -457,46 +457,46 @@ public class KeyValueHandler extends Handler {
request);
}
return KeyUtils.putKeyResponseSuccess(request, blockLength);
return BlockUtils.putBlockResponseSuccess(request, blockLength);
}
private void commitPendingKeys(KeyValueContainer kvContainer)
private void commitPendingBlocks(KeyValueContainer kvContainer)
throws IOException {
long containerId = kvContainer.getContainerData().getContainerID();
List<KeyData> pendingKeys =
this.openContainerBlockMap.getOpenKeys(containerId);
for(KeyData keyData : pendingKeys) {
commitKey(keyData, kvContainer);
List<BlockData> pendingBlocks =
this.openContainerBlockMap.getOpenBlocks(containerId);
for(BlockData blockData : pendingBlocks) {
commitKey(blockData, kvContainer);
}
}
private long commitKey(KeyData keyData, KeyValueContainer kvContainer)
private long commitKey(BlockData blockData, KeyValueContainer kvContainer)
throws IOException {
Preconditions.checkNotNull(keyData);
long length = keyManager.putKey(kvContainer, keyData);
Preconditions.checkNotNull(blockData);
long length = blockManager.putBlock(kvContainer, blockData);
//update the open key Map in containerManager
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID());
return length;
}
/**
* Handle Get Key operation. Calls KeyManager to process the request.
* Handle Get Block operation. Calls BlockManager to process the request.
*/
ContainerCommandResponseProto handleGetKey(
ContainerCommandResponseProto handleGetBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
if (!request.hasGetKey()) {
if (!request.hasGetBlock()) {
LOG.debug("Malformed Get Key request. trace ID: {}",
request.getTraceID());
return ContainerUtils.malformedRequest(request);
}
KeyData responseData;
BlockData responseData;
try {
BlockID blockID = BlockID.getFromProtobuf(
request.getGetKey().getBlockID());
responseData = keyManager.getKey(kvContainer, blockID);
request.getGetBlock().getBlockID());
responseData = blockManager.getBlock(kvContainer, blockID);
long numBytes = responseData.getProtoBufMessage().toByteArray().length;
metrics.incContainerBytesStats(Type.GetKey, numBytes);
metrics.incContainerBytesStats(Type.GetBlock, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -506,12 +506,12 @@ public class KeyValueHandler extends Handler {
request);
}
return KeyUtils.getKeyDataResponse(request, responseData);
return BlockUtils.getBlockDataResponse(request, responseData);
}
/**
* Handles GetCommittedBlockLength operation.
* Calls KeyManager to process the request.
* Calls BlockManager to process the request.
*/
ContainerCommandResponseProto handleGetCommittedBlockLength(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
@ -530,7 +530,7 @@ public class KeyValueHandler extends Handler {
String msg = "Block " + blockID + " is not committed yet.";
throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED);
}
blockLength = keyManager.getCommittedBlockLength(kvContainer, blockID);
blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
@ -539,16 +539,16 @@ public class KeyValueHandler extends Handler {
IO_EXCEPTION), request);
}
return KeyUtils.getBlockLengthResponse(request, blockLength);
return BlockUtils.getBlockLengthResponse(request, blockLength);
}
/**
* Handle Delete Key operation. Calls KeyManager to process the request.
* Handle Delete Block operation. Calls BlockManager to process the request.
*/
ContainerCommandResponseProto handleDeleteKey(
ContainerCommandResponseProto handleDeleteBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
if (!request.hasDeleteKey()) {
if (!request.hasDeleteBlock()) {
LOG.debug("Malformed Delete Key request. trace ID: {}",
request.getTraceID());
return ContainerUtils.malformedRequest(request);
@ -558,9 +558,9 @@ public class KeyValueHandler extends Handler {
checkContainerOpen(kvContainer);
BlockID blockID = BlockID.getFromProtobuf(
request.getDeleteKey().getBlockID());
request.getDeleteBlock().getBlockID());
keyManager.deleteKey(kvContainer, blockID);
blockManager.deleteBlock(kvContainer, blockID);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
@ -569,7 +569,7 @@ public class KeyValueHandler extends Handler {
request);
}
return KeyUtils.getKeyResponseSuccess(request);
return BlockUtils.getBlockResponseSuccess(request);
}
/**
@ -698,7 +698,7 @@ public class KeyValueHandler extends Handler {
/**
* Handle Put Small File operation. Writes the chunk and associated key
* using a single RPC. Calls KeyManager and ChunkManager to process the
* using a single RPC. Calls BlockManager and ChunkManager to process the
* request.
*/
ContainerCommandResponseProto handlePutSmallFile(
@ -715,11 +715,11 @@ public class KeyValueHandler extends Handler {
try {
checkContainerOpen(kvContainer);
BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getKey()
.getKeyData().getBlockID());
KeyData keyData = KeyData.getFromProtoBuf(
putSmallFileReq.getKey().getKeyData());
Preconditions.checkNotNull(keyData);
BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock()
.getBlockData().getBlockID());
BlockData blockData = BlockData.getFromProtoBuf(
putSmallFileReq.getBlock().getBlockData());
Preconditions.checkNotNull(blockData);
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
putSmallFileReq.getChunkInfo());
@ -732,8 +732,8 @@ public class KeyValueHandler extends Handler {
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunks);
keyManager.putKey(kvContainer, keyData);
blockData.setChunks(chunks);
blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
} catch (StorageContainerException ex) {
@ -749,7 +749,7 @@ public class KeyValueHandler extends Handler {
/**
* Handle Get Small File operation. Gets a data stream using a key. This
* helps in reducing the RPC overhead for small files. Calls KeyManager and
* helps in reducing the RPC overhead for small files. Calls BlockManager and
* ChunkManager to process the request.
*/
ContainerCommandResponseProto handleGetSmallFile(
@ -764,9 +764,9 @@ public class KeyValueHandler extends Handler {
GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
try {
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getKey()
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
.getBlockID());
KeyData responseData = keyManager.getKey(kvContainer, blockID);
BlockData responseData = blockManager.getBlock(kvContainer, blockID);
ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY;

View File

@ -26,15 +26,15 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyResponseProto;
.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
GetCommittedBlockLengthResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
PutKeyResponseProto;
PutBlockResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
@ -42,17 +42,17 @@ import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.NO_SUCH_KEY;
.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNABLE_TO_READ_METADATA_DB;
/**
* Utils functions to help key functions.
* Utils functions to help block functions.
*/
public final class KeyUtils {
public final class BlockUtils {
/** Never constructed. **/
private KeyUtils() {
private BlockUtils() {
}
/**
@ -108,64 +108,64 @@ public final class KeyUtils {
}
/**
* Parses the {@link KeyData} from a bytes array.
* Parses the {@link BlockData} from a bytes array.
*
* @param bytes key data in bytes.
* @return key data.
* @param bytes Block data in bytes.
* @return Block data.
* @throws IOException if the bytes array is malformed or invalid.
*/
public static KeyData getKeyData(byte[] bytes) throws IOException {
public static BlockData getBlockData(byte[] bytes) throws IOException {
try {
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(
ContainerProtos.BlockData blockData = ContainerProtos.BlockData.parseFrom(
bytes);
KeyData data = KeyData.getFromProtoBuf(keyData);
BlockData data = BlockData.getFromProtoBuf(blockData);
return data;
} catch (IOException e) {
throw new StorageContainerException("Failed to parse key data from the" +
" bytes array.", NO_SUCH_KEY);
throw new StorageContainerException("Failed to parse block data from " +
"the bytes array.", NO_SUCH_BLOCK);
}
}
/**
* Returns putKey response success.
* Returns putBlock response success.
* @param msg - Request.
* @return Response.
*/
public static ContainerCommandResponseProto putKeyResponseSuccess(
public static ContainerCommandResponseProto putBlockResponseSuccess(
ContainerCommandRequestProto msg, long blockLength) {
GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength,
msg.getPutKey().getKeyData().getBlockID());
PutKeyResponseProto.Builder putKeyResponse =
PutKeyResponseProto.newBuilder();
msg.getPutBlock().getBlockData().getBlockID());
PutBlockResponseProto.Builder putKeyResponse =
PutBlockResponseProto.newBuilder();
putKeyResponse
.setCommittedBlockLength(committedBlockLengthResponseBuilder);
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setPutKey(putKeyResponse);
builder.setPutBlock(putKeyResponse);
return builder.build();
}
/**
* Returns successful keyResponse.
* Returns successful blockResponse.
* @param msg - Request.
* @return Response.
*/
public static ContainerCommandResponseProto getKeyResponseSuccess(
public static ContainerCommandResponseProto getBlockResponseSuccess(
ContainerCommandRequestProto msg) {
return ContainerUtils.getSuccessResponse(msg);
}
public static ContainerCommandResponseProto getKeyDataResponse(
ContainerCommandRequestProto msg, KeyData data) {
GetKeyResponseProto.Builder getKey = ContainerProtos
.GetKeyResponseProto
public static ContainerCommandResponseProto getBlockDataResponse(
ContainerCommandRequestProto msg, BlockData data) {
GetBlockResponseProto.Builder getBlock = ContainerProtos
.GetBlockResponseProto
.newBuilder();
getKey.setKeyData(data.getProtoBufMessage());
getBlock.setBlockData(data.getProtoBufMessage());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setGetKey(getKey);
builder.setGetBlock(getBlock);
return builder.build();
}
@ -187,8 +187,8 @@ public final class KeyUtils {
}
private static GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder(
long blockLength, ContainerProtos.DatanodeBlockID blockID) {
getCommittedBlockLengthResponseBuilder(long blockLength,
ContainerProtos.DatanodeBlockID blockID) {
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder = ContainerProtos.
GetCommittedBlockLengthResponseProto.newBuilder();

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
@ -116,7 +116,7 @@ public final class KeyValueContainerUtil {
File chunksPath = new File(containerData.getChunksPath());
// Close the DB connection and remove the DB handler from cache
KeyUtils.removeDB(containerData, conf);
BlockUtils.removeDB(containerData, conf);
// Delete the Container MetaData path.
FileUtils.deleteDirectory(containerMetaDataPath);
@ -175,16 +175,16 @@ public final class KeyValueContainerUtil {
}
kvContainerData.setDbFile(dbFile);
MetadataStore metadata = KeyUtils.getDB(kvContainerData, config);
MetadataStore metadata = BlockUtils.getDB(kvContainerData, config);
long bytesUsed = 0;
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
.getRangeKVs(null, Integer.MAX_VALUE,
MetadataKeyFilters.getNormalKeyFilter());
bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
KeyData keyData;
BlockData blockData;
try {
keyData = KeyUtils.getKeyData(e.getValue());
return keyData.getSize();
blockData = BlockUtils.getBlockData(e.getValue());
return blockData.getSize();
} catch (IOException ex) {
return 0L;
}

View File

@ -69,7 +69,7 @@ public final class SmallFileUtils {
ContainerProtos.ReadChunkResponseProto.newBuilder();
readChunkresponse.setChunkData(info.getProtoBufMessage());
readChunkresponse.setData(ByteString.copyFrom(data));
readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().getBlockID());
readChunkresponse.setBlockID(msg.getGetSmallFile().getBlock().getBlockID());
ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
ContainerProtos.GetSmallFileResponseProto.newBuilder();

View File

@ -25,11 +25,11 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
@ -40,44 +40,44 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
/**
* This class is for performing key related operations on the KeyValue
* This class is for performing block related operations on the KeyValue
* Container.
*/
public class KeyManagerImpl implements KeyManager {
public class BlockManagerImpl implements BlockManager {
static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
static final Logger LOG = LoggerFactory.getLogger(BlockManagerImpl.class);
private Configuration config;
/**
* Constructs a key Manager.
* Constructs a Block Manager.
*
* @param conf - Ozone configuration
*/
public KeyManagerImpl(Configuration conf) {
public BlockManagerImpl(Configuration conf) {
Preconditions.checkNotNull(conf, "Config cannot be null");
this.config = conf;
}
/**
* Puts or overwrites a key.
* Puts or overwrites a block.
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @return length of the key.
* @param container - Container for which block need to be added.
* @param data - BlockData.
* @return length of the block.
* @throws IOException
*/
public long putKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
public long putBlock(Container container, BlockData data) throws IOException {
Preconditions.checkNotNull(data, "BlockData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
"cannot be negative");
// We are not locking the key manager since LevelDb serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
MetadataStore db = BlockUtils.getDB((KeyValueContainerData) container
.getContainerData(), config);
// This is a post condition that acts as a hint to the user.
@ -92,40 +92,41 @@ public class KeyManagerImpl implements KeyManager {
}
/**
* Gets an existing key.
* Gets an existing block.
*
* @param container - Container from which key need to be get.
* @param blockID - BlockID of the key.
* @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block.
* @return Key Data.
* @throws IOException
*/
public KeyData getKey(Container container, BlockID blockID)
public BlockData getBlock(Container container, BlockID blockID)
throws IOException {
Preconditions.checkNotNull(blockID,
"BlockID cannot be null in GetKet request");
"BlockID cannot be null in GetBlock request");
Preconditions.checkNotNull(blockID.getContainerID(),
"Container name cannot be null");
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(containerData, config);
MetadataStore db = BlockUtils.getDB(containerData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the key.",
NO_SUCH_KEY);
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
return KeyData.getFromProtoBuf(keyData);
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
return BlockData.getFromProtoBuf(blockData);
}
/**
* Returns the length of the committed block.
*
* @param container - Container from which key need to be get.
* @param blockID - BlockID of the key.
* @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block.
* @return length of the block.
* @throws IOException in case, the block key does not exist in db.
*/
@ -134,27 +135,28 @@ public class KeyManagerImpl implements KeyManager {
throws IOException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(containerData, config);
MetadataStore db = BlockUtils.getDB(containerData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the key.",
NO_SUCH_KEY);
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
return keyData.getSize();
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
return blockData.getSize();
}
/**
* Deletes an existing Key.
* Deletes an existing block.
*
* @param container - Container from which key need to be deleted.
* @param container - Container from which block need to be deleted.
* @param blockID - ID of the block.
* @throws StorageContainerException
*/
public void deleteKey(Container container, BlockID blockID) throws
public void deleteBlock(Container container, BlockID blockID) throws
IOException {
Preconditions.checkNotNull(blockID, "block ID cannot be null.");
Preconditions.checkState(blockID.getContainerID() >= 0,
@ -164,36 +166,36 @@ public class KeyManagerImpl implements KeyManager {
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(cData, config);
MetadataStore db = BlockUtils.getDB(cData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// Note : There is a race condition here, since get and delete
// are not atomic. Leaving it here since the impact is refusing
// to delete a key which might have just gotten inserted after
// to delete a Block which might have just gotten inserted after
// the get check.
byte[] kKey = Longs.toByteArray(blockID.getLocalID());
byte[] kData = db.get(kKey);
if (kData == null) {
throw new StorageContainerException("Unable to find the key.",
NO_SUCH_KEY);
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
db.delete(kKey);
// Decrement keycount here
// Decrement blockcount here
container.getContainerData().decrKeyCount();
}
/**
* List keys in a container.
* List blocks in a container.
*
* @param container - Container from which keys need to be listed.
* @param container - Container from which blocks need to be listed.
* @param startLocalID - Key to start from, 0 to begin.
* @param count - Number of keys to return.
* @return List of Keys that match the criteria.
* @param count - Number of blocks to return.
* @return List of Blocks that match the criteria.
*/
@Override
public List<KeyData> listKey(Container container, long startLocalID, int
public List<BlockData> listBlock(Container container, long startLocalID, int
count) throws IOException {
Preconditions.checkNotNull(container, "container cannot be null");
Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
@ -201,17 +203,17 @@ public class KeyManagerImpl implements KeyManager {
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
container.readLock();
List<KeyData> result = null;
List<BlockData> result = null;
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(cData, config);
MetadataStore db = BlockUtils.getDB(cData, config);
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
startKeyInBytes, count, null);
for (Map.Entry<byte[], byte[]> entry : range) {
KeyData value = KeyUtils.getKeyData(entry.getValue());
KeyData data = new KeyData(value.getBlockID());
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
}
return result;
@ -221,7 +223,7 @@ public class KeyManagerImpl implements KeyManager {
* Shutdown KeyValueContainerManager.
*/
public void shutdown() {
KeyUtils.shutdownCache(ContainerCache.getInstance(config));
BlockUtils.shutdownCache(ContainerCache.getInstance(config));
}
}

View File

@ -17,6 +17,5 @@
*/
package org.apache.hadoop.ozone.container.keyvalue.impl;
/**
This package contains chunk manager and key manager implementation for
keyvalue container type.
**/
* Chunk manager and block manager implementations for keyvalue container type.
*/

View File

@ -19,55 +19,55 @@ package org.apache.hadoop.ozone.container.keyvalue.interfaces;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import java.io.IOException;
import java.util.List;
/**
* KeyManager is for performing key related operations on the container.
* BlockManager is for performing key related operations on the container.
*/
public interface KeyManager {
public interface BlockManager {
/**
* Puts or overwrites a key.
* Puts or overwrites a block.
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @return length of the Key.
* @param container - Container for which block need to be added.
* @param data - Block Data.
* @return length of the Block.
* @throws IOException
*/
long putKey(Container container, KeyData data) throws IOException;
long putBlock(Container container, BlockData data) throws IOException;
/**
* Gets an existing key.
* Gets an existing block.
*
* @param container - Container from which key need to be get.
* @param blockID - BlockID of the Key.
* @return Key Data.
* @param container - Container from which block need to be get.
* @param blockID - BlockID of the Block.
* @return Block Data.
* @throws IOException
*/
KeyData getKey(Container container, BlockID blockID) throws IOException;
BlockData getBlock(Container container, BlockID blockID) throws IOException;
/**
* Deletes an existing Key.
* Deletes an existing block.
*
* @param container - Container from which key need to be deleted.
* @param container - Container from which block need to be deleted.
* @param blockID - ID of the block.
* @throws StorageContainerException
*/
void deleteKey(Container container, BlockID blockID) throws IOException;
void deleteBlock(Container container, BlockID blockID) throws IOException;
/**
* List keys in a container.
* List blocks in a container.
*
* @param container - Container from which keys need to be listed.
* @param startLocalID - Key to start from, 0 to begin.
* @param count - Number of keys to return.
* @return List of Keys that match the criteria.
* @param container - Container from which blocks need to be listed.
* @param startLocalID - Block to start from, 0 to begin.
* @param count - Number of blocks to return.
* @return List of Blocks that match the criteria.
*/
List<KeyData> listKey(Container container, long startLocalID, int count)
List<BlockData> listBlock(Container container, long startLocalID, int count)
throws IOException;
/**

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.interfaces;
/**
* Chunk manager and block manager interfaces for keyvalue container type.
*/

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ratis.shaded.com.google.protobuf
.InvalidProtocolBufferException;
@ -72,7 +72,7 @@ public class BlockDeletingService extends BackgroundService{
private static final Logger LOG =
LoggerFactory.getLogger(BlockDeletingService.class);
ContainerSet containerSet;
private ContainerSet containerSet;
private ContainerDeletionChoosingPolicy containerDeletionPolicy;
private final Configuration conf;
@ -185,7 +185,7 @@ public class BlockDeletingService extends BackgroundService{
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
MetadataStore meta = KeyUtils.getDB(
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData, conf);
// # of blocks to delete is throttled
KeyPrefixFilter filter =
@ -211,8 +211,8 @@ public class BlockDeletingService extends BackgroundService{
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.debug("Deleting block {}", blockName);
try {
ContainerProtos.KeyData data =
ContainerProtos.KeyData.parseFrom(entry.getValue());
ContainerProtos.BlockData data =
ContainerProtos.BlockData.parseFrom(entry.getValue());
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
File chunkFile = dataDir.toPath()
.resolve(chunkInfo.getChunkName()).toFile();

View File

@ -23,13 +23,13 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
@ -50,7 +50,7 @@ import static org.mockito.Mockito.mock;
/**
* This class is used to test key related operations on the container.
*/
public class TestKeyManagerImpl {
public class TestBlockManagerImpl {
private OzoneConfiguration config;
private String scmId = UUID.randomUUID().toString();
@ -58,8 +58,8 @@ public class TestKeyManagerImpl {
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
private KeyData keyData;
private KeyManagerImpl keyManager;
private BlockData blockData;
private BlockManagerImpl blockManager;
private BlockID blockID;
@Rule
@ -88,104 +88,124 @@ public class TestKeyManagerImpl {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
// Creating KeyData
// Creating BlockData
blockID = new BlockID(1L, 1L);
keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
blockData = new BlockData(blockID);
blockData.addMetadata("VOLUME", "ozone");
blockData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
blockData.setChunks(chunkList);
// Create KeyValueContainerManager
keyManager = new KeyManagerImpl(config);
blockManager = new BlockManagerImpl(config);
}
@Test
public void testPutAndGetKey() throws Exception {
public void testPutAndGetBlock() throws Exception {
assertEquals(0, keyValueContainer.getContainerData().getKeyCount());
//Put Key
keyManager.putKey(keyValueContainer, keyData);
//Put Block
blockManager.putBlock(keyValueContainer, blockData);
assertEquals(1, keyValueContainer.getContainerData().getKeyCount());
//Get Key
KeyData fromGetKeyData = keyManager.getKey(keyValueContainer,
keyData.getBlockID());
//Get Block
BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer,
blockData.getBlockID());
assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
assertEquals(keyData.getChunks().size(), fromGetKeyData.getChunks().size());
assertEquals(keyData.getMetadata().size(), fromGetKeyData.getMetadata()
assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID());
assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID());
assertEquals(blockData.getChunks().size(),
fromGetBlockData.getChunks().size());
assertEquals(blockData.getMetadata().size(), fromGetBlockData.getMetadata()
.size());
}
@Test
public void testDeleteKey() throws Exception {
public void testDeleteBlock() throws Exception {
try {
assertEquals(0, keyValueContainer.getContainerData().getKeyCount());
//Put Key
keyManager.putKey(keyValueContainer, keyData);
assertEquals(1, keyValueContainer.getContainerData().getKeyCount());
//Delete Key
keyManager.deleteKey(keyValueContainer, blockID);
assertEquals(0, keyValueContainer.getContainerData().getKeyCount());
assertEquals(0,
keyValueContainer.getContainerData().getKeyCount());
//Put Block
blockManager.putBlock(keyValueContainer, blockData);
assertEquals(1,
keyValueContainer.getContainerData().getKeyCount());
//Delete Block
blockManager.deleteBlock(keyValueContainer, blockID);
assertEquals(0,
keyValueContainer.getContainerData().getKeyCount());
try {
keyManager.getKey(keyValueContainer, blockID);
fail("testDeleteKey");
blockManager.getBlock(keyValueContainer, blockID);
fail("testDeleteBlock");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the key", ex);
GenericTestUtils.assertExceptionContains(
"Unable to find the block", ex);
}
} catch (IOException ex) {
fail("testDeleteKey failed");
fail("testDeleteBlock failed");
}
}
@Test
public void testListKey() throws Exception {
public void testListBlock() throws Exception {
try {
keyManager.putKey(keyValueContainer, keyData);
List<KeyData> listKeyData = keyManager.listKey(
blockManager.putBlock(keyValueContainer, blockData);
List<BlockData> listBlockData = blockManager.listBlock(
keyValueContainer, 1, 10);
assertNotNull(listKeyData);
assertTrue(listKeyData.size() == 1);
assertNotNull(listBlockData);
assertTrue(listBlockData.size() == 1);
for (long i = 2; i <= 10; i++) {
blockID = new BlockID(1L, i);
keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
blockData = new BlockData(blockID);
blockData.addMetadata("VOLUME", "ozone");
blockData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
keyManager.putKey(keyValueContainer, keyData);
blockData.setChunks(chunkList);
blockManager.putBlock(keyValueContainer, blockData);
}
listKeyData = keyManager.listKey(
listBlockData = blockManager.listBlock(
keyValueContainer, 1, 10);
assertNotNull(listKeyData);
assertTrue(listKeyData.size() == 10);
assertNotNull(listBlockData);
assertTrue(listBlockData.size() == 10);
} catch (IOException ex) {
fail("testListKey failed");
fail("testListBlock failed");
}
}
@Test
public void testGetNoSuchKey() throws Exception {
public void testGetNoSuchBlock() throws Exception {
try {
keyData = new KeyData(new BlockID(1L, 2L));
keyManager.getKey(keyValueContainer, new BlockID(1L, 2L));
fail("testGetNoSuchKey failed");
assertEquals(0,
keyValueContainer.getContainerData().getKeyCount());
//Put Block
blockManager.putBlock(keyValueContainer, blockData);
assertEquals(1,
keyValueContainer.getContainerData().getKeyCount());
//Delete Block
blockManager.deleteBlock(keyValueContainer, blockID);
assertEquals(0,
keyValueContainer.getContainerData().getKeyCount());
try {
//Since the block has been deleted, we should not be able to find it
blockManager.getBlock(keyValueContainer, blockID);
fail("testGetNoSuchBlock failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);
assertEquals(ContainerProtos.Result.NO_SUCH_KEY, ex.getResult());
GenericTestUtils.assertExceptionContains(
"Unable to find the block", ex);
assertEquals(ContainerProtos.Result.NO_SUCH_BLOCK, ex.getResult());
}
} catch (IOException ex) {
fail("testGetNoSuchBlock failed");
}
}
}

View File

@ -88,7 +88,7 @@ public class TestChunkManagerImpl {
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
data = "testing write chunks".getBytes();
// Creating KeyData
// Creating BlockData
blockID = new BlockID(1L, 1L);
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, data.length);

View File

@ -27,11 +27,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
@ -114,8 +114,8 @@ public class TestKeyValueBlockIterator {
int counter = 0;
while(keyValueBlockIterator.hasNext()) {
KeyData keyData = keyValueBlockIterator.nextBlock();
assertEquals(keyData.getLocalID(), counter++);
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
assertFalse(keyValueBlockIterator.hasNext());
@ -123,8 +123,8 @@ public class TestKeyValueBlockIterator {
keyValueBlockIterator.seekToFirst();
counter = 0;
while(keyValueBlockIterator.hasNext()) {
KeyData keyData = keyValueBlockIterator.nextBlock();
assertEquals(keyData.getLocalID(), counter++);
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
assertFalse(keyValueBlockIterator.hasNext());
@ -214,8 +214,8 @@ public class TestKeyValueBlockIterator {
int counter = 5;
while(keyValueBlockIterator.hasNext()) {
KeyData keyData = keyValueBlockIterator.nextBlock();
assertEquals(keyData.getLocalID(), counter++);
BlockData blockData = keyValueBlockIterator.nextBlock();
assertEquals(blockData.getLocalID(), counter++);
}
}
@ -250,7 +250,7 @@ public class TestKeyValueBlockIterator {
container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
.randomUUID().toString());
MetadataStore metadataStore = KeyUtils.getDB(containerData, conf);
MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo("chunkfile", 0, 1024);
@ -258,18 +258,18 @@ public class TestKeyValueBlockIterator {
for (int i=0; i<normalBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
KeyData keyData = new KeyData(blockID);
keyData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), keyData
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
}
for (int i=normalBlocks; i<deletedBlocks; i++) {
BlockID blockID = new BlockID(containerId, i);
KeyData keyData = new KeyData(blockID);
keyData.setChunks(chunkList);
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
metadataStore.put(DFSUtil.string2Bytes(OzoneConsts
.DELETING_KEY_PREFIX + blockID.getLocalID()), keyData
.DELETING_KEY_PREFIX + blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
}
}

View File

@ -28,14 +28,14 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerLifeCycleState;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.utils.MetadataStore;
@ -117,11 +117,11 @@ public class TestKeyValueContainer {
addBlocks(blockCount);
blockIterator = keyValueContainer.blockIterator();
assertTrue(blockIterator.hasNext());
KeyData keyData;
BlockData blockData;
int blockCounter = 0;
while(blockIterator.hasNext()) {
keyData = blockIterator.nextBlock();
assertEquals(blockCounter++, keyData.getBlockID().getLocalID());
blockData = blockIterator.nextBlock();
assertEquals(blockCounter++, blockData.getBlockID().getLocalID());
}
assertEquals(blockCount, blockCounter);
}
@ -129,20 +129,20 @@ public class TestKeyValueContainer {
private void addBlocks(int count) throws Exception {
long containerId = keyValueContainerData.getContainerID();
MetadataStore metadataStore = KeyUtils.getDB(keyValueContainer
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainer
.getContainerData(), conf);
for (int i=0; i < count; i++) {
// Creating KeyData
// Creating BlockData
BlockID blockID = new BlockID(containerId, i);
KeyData keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
BlockData blockData = new BlockData(blockID);
blockData.addMetadata("VOLUME", "ozone");
blockData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), keyData
blockData.setChunks(chunkList);
metadataStore.put(Longs.toByteArray(blockID.getLocalID()), blockData
.getProtoBufMessage().toByteArray());
}
@ -189,7 +189,7 @@ public class TestKeyValueContainer {
int numberOfKeysToWrite = 12;
//write one few keys to check the key count after import
MetadataStore metadataStore = KeyUtils.getDB(keyValueContainerData, conf);
MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, conf);
for (int i = 0; i < numberOfKeysToWrite; i++) {
metadataStore.put(("test" + i).getBytes(), "test".getBytes());
}

View File

@ -142,31 +142,31 @@ public class TestKeyValueHandler {
Mockito.verify(handler, times(1)).handleCloseContainer(
any(ContainerCommandRequestProto.class), any());
// Test Put Key Request handling
ContainerCommandRequestProto putKeyRequest =
getDummyCommandRequestProto(ContainerProtos.Type.PutKey);
dispatcher.dispatch(putKeyRequest);
Mockito.verify(handler, times(1)).handlePutKey(
// Test Put Block Request handling
ContainerCommandRequestProto putBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.PutBlock);
dispatcher.dispatch(putBlockRequest);
Mockito.verify(handler, times(1)).handlePutBlock(
any(ContainerCommandRequestProto.class), any());
// Test Get Key Request handling
ContainerCommandRequestProto getKeyRequest =
getDummyCommandRequestProto(ContainerProtos.Type.GetKey);
dispatcher.dispatch(getKeyRequest);
Mockito.verify(handler, times(1)).handleGetKey(
// Test Get Block Request handling
ContainerCommandRequestProto getBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.GetBlock);
dispatcher.dispatch(getBlockRequest);
Mockito.verify(handler, times(1)).handleGetBlock(
any(ContainerCommandRequestProto.class), any());
// Test Delete Key Request handling
ContainerCommandRequestProto deleteKeyRequest =
getDummyCommandRequestProto(ContainerProtos.Type.DeleteKey);
dispatcher.dispatch(deleteKeyRequest);
Mockito.verify(handler, times(1)).handleDeleteKey(
// Test Delete Block Request handling
ContainerCommandRequestProto deleteBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock);
dispatcher.dispatch(deleteBlockRequest);
Mockito.verify(handler, times(1)).handleDeleteBlock(
any(ContainerCommandRequestProto.class), any());
// Test List Key Request handling
ContainerCommandRequestProto listKeyRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ListKey);
dispatcher.dispatch(listKeyRequest);
// Test List Block Request handling
ContainerCommandRequestProto listBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ListBlock);
dispatcher.dispatch(listBlockRequest);
Mockito.verify(handler, times(2)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class));

View File

@ -281,10 +281,10 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
groupInputStream.streamOffset[i] = length;
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, datanodeBlockID, requestId);
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId);
List<ContainerProtos.ChunkInfo> chunks =
response.getKeyData().getChunksList();
response.getBlockData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -165,10 +165,10 @@ public class TestStorageContainerManagerHelper {
DatanodeDetails leadDN = containerWithPipeline.getPipeline().getLeader();
OzoneContainer containerServer =
getContainerServerByDatanodeUuid(leadDN.getUuidString());
KeyValueContainerData containerData = (KeyValueContainerData) containerServer
.getContainerSet()
KeyValueContainerData containerData =
(KeyValueContainerData) containerServer.getContainerSet()
.getContainer(containerID).getContainerData();
return KeyUtils.getDB(containerData, conf);
return BlockUtils.getDB(containerData, conf);
}
private OzoneContainer getContainerServerByDatanodeUuid(String dnUUID)

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.junit.AfterClass;
@ -469,9 +469,9 @@ public class TestOzoneRestClient {
containerID, new File(containerPath));
long valueLength = 0;
while (keyValueBlockIterator.hasNext()) {
KeyData keyData = keyValueBlockIterator.nextBlock();
if (keyData.getBlockID().getLocalID() == localID) {
List<ContainerProtos.ChunkInfo> chunks = keyData.getChunks();
BlockData blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunks) {
valueLength += chunk.getLen();
}

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.om.OzoneManager;
@ -603,10 +603,10 @@ public class TestOzoneRpcClient {
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
containerID, new File(containerPath));
while (keyValueBlockIterator.hasNext()) {
KeyData keyData = keyValueBlockIterator.nextBlock();
if (keyData.getBlockID().getLocalID() == localID) {
BlockData blockData = keyValueBlockIterator.nextBlock();
if (blockData.getBlockID().getLocalID() == localID) {
long length = 0;
List<ContainerProtos.ChunkInfo> chunks = keyData.getChunks();
List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
}

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
@ -241,18 +241,18 @@ public final class ContainerTestHelper {
setDataChecksum(info, data);
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(info.getProtoBufMessage());
keyData.setChunks(newList);
putRequest.setKeyData(keyData.getProtoBufMessage());
blockData.setChunks(newList);
putRequest.setBlockData(blockData.getProtoBufMessage());
smallFileRequest.setChunkInfo(info.getProtoBufMessage());
smallFileRequest.setData(ByteString.copyFrom(data));
smallFileRequest.setKey(putRequest);
smallFileRequest.setBlock(putRequest);
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
@ -266,17 +266,17 @@ public final class ContainerTestHelper {
public static ContainerCommandRequestProto getReadSmallFileRequest(
Pipeline pipeline, ContainerProtos.PutKeyRequestProto putKey)
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putKey)
throws Exception {
ContainerProtos.GetSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.GetSmallFileRequestProto.newBuilder();
ContainerCommandRequestProto getKey = getKeyRequest(pipeline, putKey);
smallFileRequest.setKey(getKey.getGetKey());
ContainerCommandRequestProto getKey = getBlockRequest(pipeline, putKey);
smallFileRequest.setBlock(getKey.getGetBlock());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.GetSmallFile);
request.setContainerID(getKey.getGetKey().getBlockID().getContainerID());
request.setContainerID(getKey.getGetBlock().getBlockID().getContainerID());
request.setGetSmallFile(smallFileRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
@ -421,58 +421,58 @@ public final class ContainerTestHelper {
}
/**
* Returns the PutKeyRequest for test purpose.
* Returns the PutBlockRequest for test purpose.
* @param pipeline - pipeline.
* @param writeRequest - Write Chunk Request.
* @return - Request
*/
public static ContainerCommandRequestProto getPutKeyRequest(
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest) {
LOG.trace("putKey: {} to pipeline={}",
LOG.trace("putBlock: {} to pipeline={}",
writeRequest.getBlockID());
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
KeyData keyData = new KeyData(
BlockData blockData = new BlockData(
BlockID.getFromProtobuf(writeRequest.getBlockID()));
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(writeRequest.getChunkData());
keyData.setChunks(newList);
putRequest.setKeyData(keyData.getProtoBufMessage());
blockData.setChunks(newList);
putRequest.setBlockData(blockData.getProtoBufMessage());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutKey);
request.setContainerID(keyData.getContainerID());
request.setPutKey(putRequest);
request.setCmdType(ContainerProtos.Type.PutBlock);
request.setContainerID(blockData.getContainerID());
request.setPutBlock(putRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
return request.build();
}
/**
* Gets a GetKeyRequest for test purpose.
* Gets a GetBlockRequest for test purpose.
* @param pipeline - pipeline
* @param putKeyRequest - putKeyRequest.
* @param putBlockRequest - putBlockRequest.
* @return - Request
* immediately.
*/
public static ContainerCommandRequestProto getKeyRequest(
Pipeline pipeline, ContainerProtos.PutKeyRequestProto putKeyRequest) {
public static ContainerCommandRequestProto getBlockRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putBlockRequest) {
ContainerProtos.DatanodeBlockID blockID =
putKeyRequest.getKeyData().getBlockID();
putBlockRequest.getBlockData().getBlockID();
LOG.trace("getKey: blockID={}", blockID);
ContainerProtos.GetKeyRequestProto.Builder getRequest =
ContainerProtos.GetKeyRequestProto.newBuilder();
ContainerProtos.GetBlockRequestProto.Builder getRequest =
ContainerProtos.GetBlockRequestProto.newBuilder();
getRequest.setBlockID(blockID);
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.GetKey);
request.setCmdType(ContainerProtos.Type.GetBlock);
request.setContainerID(blockID.getContainerID());
request.setGetKey(getRequest);
request.setGetBlock(getRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
return request.build();
@ -484,32 +484,32 @@ public final class ContainerTestHelper {
* @param request - Request
* @param response - Response
*/
public static void verifyGetKey(ContainerCommandRequestProto request,
public static void verifyGetBlock(ContainerCommandRequestProto request,
ContainerCommandResponseProto response, int expectedChunksCount) {
Assert.assertEquals(request.getTraceID(), response.getTraceID());
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(expectedChunksCount,
response.getGetKey().getKeyData().getChunksCount());
response.getGetBlock().getBlockData().getChunksCount());
}
/**
* @param pipeline - pipeline.
* @param putKeyRequest - putKeyRequest.
* @param putBlockRequest - putBlockRequest.
* @return - Request
*/
public static ContainerCommandRequestProto getDeleteKeyRequest(
Pipeline pipeline, ContainerProtos.PutKeyRequestProto putKeyRequest) {
ContainerProtos.DatanodeBlockID blockID = putKeyRequest.getKeyData()
public static ContainerCommandRequestProto getDeleteBlockRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putBlockRequest) {
ContainerProtos.DatanodeBlockID blockID = putBlockRequest.getBlockData()
.getBlockID();
LOG.trace("deleteKey: name={}", blockID);
ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
ContainerProtos.DeleteKeyRequestProto.newBuilder();
LOG.trace("deleteBlock: name={}", blockID);
ContainerProtos.DeleteBlockRequestProto.Builder delRequest =
ContainerProtos.DeleteBlockRequestProto.newBuilder();
delRequest.setBlockID(blockID);
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.DeleteKey);
request.setCmdType(ContainerProtos.Type.DeleteBlock);
request.setContainerID(blockID.getContainerID());
request.setDeleteKey(delRequest);
request.setDeleteBlock(delRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
return request.build();

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@ -100,17 +100,20 @@ public class TestContainerReplication {
DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
// Put Key to the test container
ContainerCommandRequestProto putKeyRequest = ContainerTestHelper
.getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk());
// Put Block to the test container
ContainerCommandRequestProto putBlockRequest = ContainerTestHelper
.getPutBlockRequest(sourcePipelines, requestProto.getWriteChunk());
ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData();
ContainerProtos.BlockData blockData =
putBlockRequest.getPutBlock().getBlockData();
ContainerCommandResponseProto response = client.sendCommand(putKeyRequest);
ContainerCommandResponseProto response =
client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
Assert.assertTrue(
putBlockRequest.getTraceID().equals(response.getTraceID()));
HddsDatanodeService destinationDatanode =
chooseDatanodeWithoutContainer(sourcePipelines,
@ -147,8 +150,8 @@ public class TestContainerReplication {
KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
.getHandler(ContainerType.KeyValueContainer);
KeyData key = handler.getKeyManager()
.getKey(container, BlockID.getFromProtobuf(blockID));
BlockData key = handler.getBlockManager()
.getBlock(container, BlockID.getFromProtobuf(blockID));
Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size());
@ -164,7 +167,8 @@ public class TestContainerReplication {
return datanode;
}
}
throw new AssertionError("No datanode outside of the pipeline");
throw new AssertionError(
"No datanode outside of the pipeline");
}
static OzoneConfiguration newOzoneConfiguration() {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -33,9 +34,8 @@ import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingP
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
.BlockDeletingService;
@ -117,13 +117,13 @@ public class TestBlockDeletingService {
containerSet.addContainer(container);
data = (KeyValueContainerData) containerSet.getContainer(
containerID).getContainerData();
MetadataStore metadata = KeyUtils.getDB(data, conf);
MetadataStore metadata = BlockUtils.getDB(data, conf);
for (int j = 0; j<numOfBlocksPerContainer; j++) {
BlockID blockID =
ContainerTestHelper.getTestBlockID(containerID);
String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
blockID.getLocalID();
KeyData kd = new KeyData(blockID);
BlockData kd = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
for (int k = 0; k<numOfChunksPerBlock; k++) {
// offset doesn't matter here
@ -200,7 +200,7 @@ public class TestBlockDeletingService {
containerSet.listContainer(0L, 1, containerData);
Assert.assertEquals(1, containerData.size());
MetadataStore meta = KeyUtils.getDB(
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf);
Map<Long, Container> containerMap = containerSet.getContainerMap();
// NOTE: this test assumes that all the container is KetValueContainer and
@ -309,7 +309,7 @@ public class TestBlockDeletingService {
// get container meta data
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, 1, containerData);
MetadataStore meta = KeyUtils.getDB(
MetadataStore meta = BlockUtils.getDB(
(KeyValueContainerData) containerData.get(0), conf);
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);

View File

@ -33,19 +33,20 @@ import java.util.concurrent.ThreadLocalRandom;
/**
* Tests to test block deleting service.
*/
public class TestKeyData {
static final Logger LOG = LoggerFactory.getLogger(TestKeyData.class);
public class TestBlockData {
static final Logger LOG = LoggerFactory.getLogger(TestBlockData.class);
@Rule
public TestRule timeout = new Timeout(10000);
static ContainerProtos.ChunkInfo buildChunkInfo(String name, long offset, long len) {
static ContainerProtos.ChunkInfo buildChunkInfo(String name, long offset,
long len) {
return ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(name).setOffset(offset).setLen(len).build();
}
@Test
public void testAddAndRemove() {
final KeyData computed = new KeyData(null);
final BlockData computed = new BlockData(null);
final List<ContainerProtos.ChunkInfo> expected = new ArrayList<>();
assertChunks(expected, computed);
@ -61,14 +62,17 @@ public class TestKeyData {
}
private static int chunkCount = 0;
static ContainerProtos.ChunkInfo addChunk(List<ContainerProtos.ChunkInfo> expected, long offset) {
static ContainerProtos.ChunkInfo addChunk(
List<ContainerProtos.ChunkInfo> expected, long offset) {
final long length = ThreadLocalRandom.current().nextLong(1000);
final ContainerProtos.ChunkInfo info = buildChunkInfo("c" + ++chunkCount, offset, length);
final ContainerProtos.ChunkInfo info =
buildChunkInfo("c" + ++chunkCount, offset, length);
expected.add(info);
return info;
}
static long assertAddChunk(List<ContainerProtos.ChunkInfo> expected, KeyData computed, long offset) {
static long assertAddChunk(List<ContainerProtos.ChunkInfo> expected,
BlockData computed, long offset) {
final ContainerProtos.ChunkInfo info = addChunk(expected, offset);
LOG.info("addChunk: " + toString(info));
computed.addChunk(info);
@ -77,7 +81,8 @@ public class TestKeyData {
}
static void removeChunk(List<ContainerProtos.ChunkInfo> expected, KeyData computed) {
static void removeChunk(List<ContainerProtos.ChunkInfo> expected,
BlockData computed) {
final int i = ThreadLocalRandom.current().nextInt(expected.size());
final ContainerProtos.ChunkInfo info = expected.remove(i);
LOG.info("removeChunk: " + toString(info));
@ -85,10 +90,13 @@ public class TestKeyData {
assertChunks(expected, computed);
}
static void assertChunks(List<ContainerProtos.ChunkInfo> expected, KeyData computed) {
static void assertChunks(List<ContainerProtos.ChunkInfo> expected,
BlockData computed) {
final List<ContainerProtos.ChunkInfo> computedChunks = computed.getChunks();
Assert.assertEquals("expected=" + expected + "\ncomputed=" + computedChunks, expected, computedChunks);
Assert.assertEquals(expected.stream().mapToLong(i -> i.getLen()).sum(), computed.getSize());
Assert.assertEquals("expected=" + expected + "\ncomputed=" +
computedChunks, expected, computedChunks);
Assert.assertEquals(expected.stream().mapToLong(i -> i.getLen()).sum(),
computed.getSize());
}
static String toString(ContainerProtos.ChunkInfo info) {
@ -96,14 +104,14 @@ public class TestKeyData {
}
static String toString(List<ContainerProtos.ChunkInfo> infos) {
return infos.stream().map(TestKeyData::toString)
return infos.stream().map(TestBlockData::toString)
.reduce((left, right) -> left + ", " + right)
.orElse("");
}
@Test
public void testSetChunks() {
final KeyData computed = new KeyData(null);
final BlockData computed = new BlockData(null);
final List<ContainerProtos.ChunkInfo> expected = new ArrayList<>();
assertChunks(expected, computed);

View File

@ -25,12 +25,12 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.junit.AfterClass;
@ -71,14 +71,14 @@ public class TestCloseContainerHandler {
private final static String DATANODE_UUID = UUID.randomUUID().toString();
private static final String baseDir = MiniDFSCluster.getBaseDirectory();
private static final String volume1 = baseDir + "disk1";
private static final String volume2 = baseDir + "disk2";
private static final String BASE_DIR = MiniDFSCluster.getBaseDirectory();
private static final String VOLUME_1 = BASE_DIR + "disk1";
private static final String VOLUME_2 = BASE_DIR + "disk2";
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
String dataDirKey = volume1 + "," + volume2;
String dataDirKey = VOLUME_1 + "," + VOLUME_2;
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
containerSet = new ContainerSet();
DatanodeDetails datanodeDetails =
@ -160,31 +160,31 @@ public class TestCloseContainerHandler {
getTestBlockID(testContainerID);
Pipeline pipeline = createSingleNodePipeline();
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
// the key should exist in the map
// the block should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getKeyDataMap(testContainerID)
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
for (ChunkInfo i : chunkList) {
chunkProtoList.add(i.getProtoBufMessage());
}
keyData.setChunks(chunkProtoList);
ContainerProtos.PutKeyRequestProto.Builder putKeyRequestProto =
ContainerProtos.PutKeyRequestProto.newBuilder();
putKeyRequestProto.setKeyData(keyData.getProtoBufMessage());
blockData.setChunks(chunkProtoList);
ContainerProtos.PutBlockRequestProto.Builder putBlockRequestProto =
ContainerProtos.PutBlockRequestProto.newBuilder();
putBlockRequestProto.setBlockData(blockData.getProtoBufMessage());
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutKey);
request.setCmdType(ContainerProtos.Type.PutBlock);
request.setContainerID(blockID.getContainerID());
request.setPutKey(putKeyRequestProto);
request.setPutBlock(putBlockRequestProto);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
dispatcher.dispatch(request.build());
//the open key should be removed from Map
//the open block should be removed from Map
Assert.assertNull(
openContainerBlockMap.getKeyDataMap(testContainerID));
openContainerBlockMap.getBlockDataMap(testContainerID));
}
@Test
@ -197,10 +197,10 @@ public class TestCloseContainerHandler {
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
// the key should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getKeyDataMap(testContainerID)
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
Assert.assertTrue(
openContainerBlockMap.getKeyDataMap(testContainerID)
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()).getChunks().size() == 3);
ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
ContainerProtos.DeleteChunkRequestProto.newBuilder();
@ -220,7 +220,7 @@ public class TestCloseContainerHandler {
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
dispatcher.dispatch(request.build());
Assert.assertTrue(
openContainerBlockMap.getKeyDataMap(testContainerID)
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()).getChunks().size() == 2);
}
@ -235,14 +235,14 @@ public class TestCloseContainerHandler {
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
Container container = containerSet.getContainer(testContainerID);
KeyData keyData = openContainerBlockMap.
getKeyDataMap(testContainerID).get(blockID.getLocalID());
BlockData blockData = openContainerBlockMap.
getBlockDataMap(testContainerID).get(blockID.getLocalID());
// the key should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getKeyDataMap(testContainerID)
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
Assert.assertTrue(
keyData.getChunks().size() == chunkList.size());
blockData.getChunks().size() == chunkList.size());
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CloseContainer);
@ -253,8 +253,9 @@ public class TestCloseContainerHandler {
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
dispatcher.dispatch(request.build());
Assert.assertNull(
openContainerBlockMap.getKeyDataMap(testContainerID));
openContainerBlockMap.getBlockDataMap(testContainerID));
// Make sure the key got committed
Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID));
Assert.assertNotNull(handler.getBlockManager()
.getBlock(container, blockID));
}
}

View File

@ -29,20 +29,20 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataStore;
import org.junit.After;
@ -97,7 +97,7 @@ public class TestContainerPersistence {
private static ContainerSet containerSet;
private static VolumeSet volumeSet;
private static VolumeChoosingPolicy volumeChoosingPolicy;
private static KeyManager keyManager;
private static BlockManager blockManager;
private static ChunkManager chunkManager;
@Rule
public ExpectedException exception = ExpectedException.none();
@ -126,7 +126,7 @@ public class TestContainerPersistence {
public void setupPaths() throws IOException {
containerSet = new ContainerSet();
volumeSet = new VolumeSet(DATANODE_UUID, conf);
keyManager = new KeyManagerImpl(conf);
blockManager = new BlockManagerImpl(conf);
chunkManager = new ChunkManagerImpl();
for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
@ -152,15 +152,15 @@ public class TestContainerPersistence {
return ContainerTestHelper.getTestContainerID();
}
private Container addContainer(ContainerSet containerSet, long containerID)
private Container addContainer(ContainerSet cSet, long cID)
throws IOException {
KeyValueContainerData data = new KeyValueContainerData(containerID,
KeyValueContainerData data = new KeyValueContainerData(cID,
ContainerTestHelper.CONTAINER_MAX_SIZE);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
containerSet.addContainer(container);
cSet.addContainer(container);
return container;
}
@ -184,7 +184,7 @@ public class TestContainerPersistence {
MetadataStore store = null;
try {
store = KeyUtils.getDB(kvData, conf);
store = BlockUtils.getDB(kvData, conf);
Assert.assertNotNull(store);
} finally {
if (store != null) {
@ -227,19 +227,19 @@ public class TestContainerPersistence {
Assert.assertFalse(containerSet.getContainerMap()
.containsKey(testContainerID1));
// Adding key to a deleted container should fail.
// Adding block to a deleted container should fail.
exception.expect(StorageContainerException.class);
exception.expectMessage("Error opening DB.");
BlockID blockID1 = ContainerTestHelper.getTestBlockID(testContainerID1);
KeyData someKey1 = new KeyData(blockID1);
BlockData someKey1 = new BlockData(blockID1);
someKey1.setChunks(new LinkedList<ContainerProtos.ChunkInfo>());
keyManager.putKey(container1, someKey1);
blockManager.putBlock(container1, someKey1);
// Deleting a non-empty container should fail.
BlockID blockID2 = ContainerTestHelper.getTestBlockID(testContainerID2);
KeyData someKey2 = new KeyData(blockID2);
BlockData someKey2 = new BlockData(blockID2);
someKey2.setChunks(new LinkedList<ContainerProtos.ChunkInfo>());
keyManager.putKey(container2, someKey2);
blockManager.putBlock(container2, someKey2);
exception.expect(StorageContainerException.class);
exception.expectMessage(
@ -325,7 +325,8 @@ public class TestContainerPersistence {
if (container == null) {
container = addContainer(containerSet, testContainerID);
}
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
ChunkInfo info = getChunk(
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@ -348,8 +349,8 @@ public class TestContainerPersistence {
}
/**
* Writes many chunks of the same key into different chunk files and verifies
* that we have that data in many files.
* Writes many chunks of the same block into different chunk files and
* verifies that we have that data in many files.
*
* @throws IOException
* @throws NoSuchAlgorithmException
@ -425,7 +426,8 @@ public class TestContainerPersistence {
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
ChunkInfo info = getChunk(
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@ -456,7 +458,8 @@ public class TestContainerPersistence {
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
ChunkInfo info = getChunk(
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@ -500,7 +503,8 @@ public class TestContainerPersistence {
for (int x = 0; x < chunkCount; x++) {
// we are writing to the same chunk file but at different offsets.
long offset = x * datalen;
ChunkInfo info = getChunk(blockID.getLocalID(), 0, offset, datalen);
ChunkInfo info = getChunk(
blockID.getLocalID(), 0, offset, datalen);
byte[] data = getData(datalen);
oldSha.update(data);
setDataChecksum(info, data);
@ -531,7 +535,8 @@ public class TestContainerPersistence {
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
ChunkInfo info = getChunk(
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@ -542,37 +547,38 @@ public class TestContainerPersistence {
}
/**
* Tests a put key and read key.
* Tests a put block and read block.
*
* @throws IOException
* @throws NoSuchAlgorithmException
*/
@Test
public void testPutKey() throws IOException, NoSuchAlgorithmException {
public void testPutBlock() throws IOException, NoSuchAlgorithmException {
long testContainerID = getTestContainerID();
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = writeChunkHelper(blockID);
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
keyManager.putKey(container, keyData);
KeyData readKeyData = keyManager.getKey(container, keyData.getBlockID());
blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID());
ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(0));
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
}
/**
* Tests a put key and read key.
* Tests a put block and read block.
*
* @throws IOException
* @throws NoSuchAlgorithmException
*/
@Test
public void testPutKeyWithLotsOfChunks() throws IOException,
public void testPutBlockWithLotsOfChunks() throws IOException,
NoSuchAlgorithmException {
final int chunkCount = 2;
final int datalen = 1024;
@ -603,66 +609,67 @@ public class TestContainerPersistence {
long writeCount = container.getContainerData().getWriteCount();
Assert.assertEquals(chunkCount, writeCount);
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
for (ChunkInfo i : chunkList) {
chunkProtoList.add(i.getProtoBufMessage());
}
keyData.setChunks(chunkProtoList);
keyManager.putKey(container, keyData);
KeyData readKeyData = keyManager.getKey(container, keyData.getBlockID());
blockData.setChunks(chunkProtoList);
blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID());
ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(readKeyData
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
.getChunks().size() - 1));
Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum());
}
/**
* Deletes a key and tries to read it back.
* Deletes a block and tries to read it back.
*
* @throws IOException
* @throws NoSuchAlgorithmException
*/
@Test
public void testDeleteKey() throws IOException, NoSuchAlgorithmException {
public void testDeleteBlock() throws IOException, NoSuchAlgorithmException {
long testContainerID = getTestContainerID();
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = writeChunkHelper(blockID);
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
keyManager.putKey(container, keyData);
keyManager.deleteKey(container, blockID);
blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData);
blockManager.deleteBlock(container, blockID);
exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the key.");
keyManager.getKey(container, keyData.getBlockID());
exception.expectMessage("Unable to find the block.");
blockManager.getBlock(container, blockData.getBlockID());
}
/**
* Tries to Deletes a key twice.
* Tries to Deletes a block twice.
*
* @throws IOException
* @throws NoSuchAlgorithmException
*/
@Test
public void testDeleteKeyTwice() throws IOException,
public void testDeleteBlockTwice() throws IOException,
NoSuchAlgorithmException {
long testContainerID = getTestContainerID();
Container container = addContainer(containerSet, testContainerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = writeChunkHelper(blockID);
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
keyManager.putKey(container, keyData);
keyManager.deleteKey(container, blockID);
blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData);
blockManager.deleteBlock(container, blockID);
exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the key.");
keyManager.deleteKey(container, blockID);
exception.expectMessage("Unable to find the block.");
blockManager.deleteBlock(container, blockID);
}
/**
@ -722,8 +729,9 @@ public class TestContainerPersistence {
try {
container.update(newMetadata, false);
} catch (StorageContainerException ex) {
Assert.assertEquals("Updating a closed container without force option " +
"is not allowed. ContainerID: " + testContainerID, ex.getMessage());
Assert.assertEquals("Updating a closed container without " +
"force option is not allowed. ContainerID: " +
testContainerID, ex.getMessage());
}
// Update with force flag, it should be success.
@ -741,53 +749,55 @@ public class TestContainerPersistence {
}
private KeyData writeKeyHelper(BlockID blockID)
private BlockData writeBlockHelper(BlockID blockID)
throws IOException, NoSuchAlgorithmException {
ChunkInfo info = writeChunkHelper(blockID);
KeyData keyData = new KeyData(blockID);
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
return keyData;
blockData.setChunks(chunkList);
return blockData;
}
@Test
public void testListKey() throws Exception {
public void testListBlock() throws Exception {
long testContainerID = getTestContainerID();
Container container = addContainer(containerSet, testContainerID);
List<BlockID> expectedKeys = new ArrayList<>();
List<BlockID> expectedBlocks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
BlockID blockID = new BlockID(testContainerID, i);
expectedKeys.add(blockID);
KeyData kd = writeKeyHelper(blockID);
keyManager.putKey(container, kd);
expectedBlocks.add(blockID);
BlockData kd = writeBlockHelper(blockID);
blockManager.putBlock(container, kd);
}
// List all keys
List<KeyData> result = keyManager.listKey(container, 0, 100);
// List all blocks
List<BlockData> result = blockManager.listBlock(
container, 0, 100);
Assert.assertEquals(10, result.size());
int index = 0;
for (int i = index; i < result.size(); i++) {
KeyData data = result.get(i);
BlockData data = result.get(i);
Assert.assertEquals(testContainerID, data.getContainerID());
Assert.assertEquals(expectedKeys.get(i).getLocalID(), data.getLocalID());
Assert.assertEquals(expectedBlocks.get(i).getLocalID(),
data.getLocalID());
index++;
}
// List key with startKey filter
long k6 = expectedKeys.get(6).getLocalID();
result = keyManager.listKey(container, k6, 100);
// List block with startBlock filter
long k6 = expectedBlocks.get(6).getLocalID();
result = blockManager.listBlock(container, k6, 100);
Assert.assertEquals(4, result.size());
for (int i = 6; i < 10; i++) {
Assert.assertEquals(expectedKeys.get(i).getLocalID(),
Assert.assertEquals(expectedBlocks.get(i).getLocalID(),
result.get(i - 6).getLocalID());
}
// Count must be >0
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Count must be a positive number.");
keyManager.listKey(container, 0, -1);
blockManager.listBlock(container, 0, -1);
}
}

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@ -69,6 +69,9 @@ import static org.apache.hadoop.hdds
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
/**
* Tests for Block deletion.
*/
public class TestBlockDeletion {
private static OzoneConfiguration conf = null;
private static ObjectStore store;
@ -229,7 +232,7 @@ public class TestBlockDeletion {
throws IOException {
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
try {
MetadataStore db = KeyUtils.getDB((KeyValueContainerData)
MetadataStore db = BlockUtils.getDB((KeyValueContainerData)
dnContainerSet.getContainer(blockID.getContainerID())
.getContainerData(), conf);
Assert.assertNotNull(db.get(Longs.toByteArray(blockID.getLocalID())));
@ -244,7 +247,7 @@ public class TestBlockDeletion {
throws IOException {
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
try {
MetadataStore db = KeyUtils.getDB((KeyValueContainerData)
MetadataStore db = BlockUtils.getDB((KeyValueContainerData)
dnContainerSet.getContainer(blockID.getContainerID())
.getContainerData(), conf);
Assert.assertNull(db.get(Longs.toByteArray(blockID.getLocalID())));

View File

@ -68,7 +68,8 @@ public class TestOzoneContainer {
conf.set(HDDS_DATANODE_DIR_KEY, tempFolder.getRoot().getPath());
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
conf.setBoolean(
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
container = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf, null);
@ -129,7 +130,7 @@ public class TestOzoneContainer {
static void runTestOzoneContainerViaDataNode(
long testContainerID, XceiverClientSpi client) throws Exception {
ContainerProtos.ContainerCommandRequestProto
request, writeChunkRequest, putKeyRequest,
request, writeChunkRequest, putBlockRequest,
updateRequest1, updateRequest2;
ContainerProtos.ContainerCommandResponseProto response,
updateResponse1, updateResponse2;
@ -138,46 +139,50 @@ public class TestOzoneContainer {
Pipeline pipeline = client.getPipeline();
createContainerForTesting(client, testContainerID);
writeChunkRequest = writeChunkForContainer(client, testContainerID, 1024);
writeChunkRequest = writeChunkForContainer(client, testContainerID,
1024);
// Read Chunk
request = ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
request = ContainerTestHelper.getReadChunkRequest(
pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Put Key
putKeyRequest = ContainerTestHelper.getPutKeyRequest(pipeline, writeChunkRequest
.getWriteChunk());
// Put Block
putBlockRequest = ContainerTestHelper.getPutBlockRequest(
pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest);
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert
.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
Assert.assertTrue(putBlockRequest.getTraceID()
.equals(response.getTraceID()));
// Get Key
request = ContainerTestHelper.getKeyRequest(pipeline, putKeyRequest.getPutKey());
// Get Block
request = ContainerTestHelper.
getBlockRequest(pipeline, putBlockRequest.getPutBlock());
response = client.sendCommand(request);
int chunksCount = putKeyRequest.getPutKey().getKeyData().getChunksCount();
ContainerTestHelper.verifyGetKey(request, response, chunksCount);
int chunksCount = putBlockRequest.getPutBlock().getBlockData().
getChunksCount();
ContainerTestHelper.verifyGetBlock(request, response, chunksCount);
// Delete Key
// Delete Block
request =
ContainerTestHelper.getDeleteKeyRequest(pipeline, putKeyRequest.getPutKey());
ContainerTestHelper.getDeleteBlockRequest(
pipeline, putBlockRequest.getPutBlock());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
//Delete Chunk
request = ContainerTestHelper.getDeleteChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
request = ContainerTestHelper.getDeleteChunkRequest(
pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
@ -249,7 +254,7 @@ public class TestOzoneContainer {
final ContainerProtos.ContainerCommandRequestProto getSmallFileRequest
= ContainerTestHelper.getReadSmallFileRequest(client.getPipeline(),
smallFileRequest.getPutSmallFile().getKey());
smallFileRequest.getPutSmallFile().getBlock());
response = client.sendCommand(getSmallFileRequest);
Assert.assertArrayEquals(
smallFileRequest.getPutSmallFile().getData().toByteArray(),
@ -269,7 +274,7 @@ public class TestOzoneContainer {
XceiverClientGrpc client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto
writeChunkRequest, putKeyRequest, request;
writeChunkRequest, putBlockRequest, request;
try {
OzoneConfiguration conf = newOzoneConfiguration();
@ -283,18 +288,19 @@ public class TestOzoneContainer {
long containerID = ContainerTestHelper.getTestContainerID();
createContainerForTesting(client, containerID);
writeChunkRequest = writeChunkForContainer(client, containerID, 1024);
writeChunkRequest = writeChunkForContainer(client, containerID,
1024);
putKeyRequest = ContainerTestHelper.getPutKeyRequest(client.getPipeline(),
writeChunkRequest.getWriteChunk());
// Put key before closing.
response = client.sendCommand(putKeyRequest);
putBlockRequest = ContainerTestHelper.getPutBlockRequest(
client.getPipeline(), writeChunkRequest.getWriteChunk());
// Put block before closing.
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putKeyRequest.getTraceID().equals(response.getTraceID()));
putBlockRequest.getTraceID().equals(response.getTraceID()));
// Close the contianer.
request = ContainerTestHelper.getCloseContainer(
@ -325,25 +331,26 @@ public class TestOzoneContainer {
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Put key will fail on a closed container.
response = client.sendCommand(putKeyRequest);
// Put block will fail on a closed container.
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert
.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
Assert.assertTrue(putBlockRequest.getTraceID()
.equals(response.getTraceID()));
// Get key must work on the closed container.
request = ContainerTestHelper.getKeyRequest(client.getPipeline(),
putKeyRequest.getPutKey());
// Get block must work on the closed container.
request = ContainerTestHelper.getBlockRequest(client.getPipeline(),
putBlockRequest.getPutBlock());
response = client.sendCommand(request);
int chunksCount = putKeyRequest.getPutKey().getKeyData().getChunksCount();
ContainerTestHelper.verifyGetKey(request, response, chunksCount);
int chunksCount = putBlockRequest.getPutBlock().getBlockData()
.getChunksCount();
ContainerTestHelper.verifyGetBlock(request, response, chunksCount);
// Delete Key must fail on a closed container.
// Delete block must fail on a closed container.
request =
ContainerTestHelper.getDeleteKeyRequest(client.getPipeline(),
putKeyRequest.getPutKey());
ContainerTestHelper.getDeleteBlockRequest(client.getPipeline(),
putBlockRequest.getPutBlock());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
@ -365,7 +372,7 @@ public class TestOzoneContainer {
XceiverClientGrpc client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto request,
writeChunkRequest, putKeyRequest;
writeChunkRequest, putBlockRequest;
try {
OzoneConfiguration conf = newOzoneConfiguration();
@ -378,17 +385,18 @@ public class TestOzoneContainer {
long containerID = ContainerTestHelper.getTestContainerID();
createContainerForTesting(client, containerID);
writeChunkRequest = writeChunkForContainer(client, containerID, 1024);
writeChunkRequest = writeChunkForContainer(
client, containerID, 1024);
putKeyRequest = ContainerTestHelper.getPutKeyRequest(client.getPipeline(),
writeChunkRequest.getWriteChunk());
putBlockRequest = ContainerTestHelper.getPutBlockRequest(
client.getPipeline(), writeChunkRequest.getWriteChunk());
// Put key before deleting.
response = client.sendCommand(putKeyRequest);
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putKeyRequest.getTraceID().equals(response.getTraceID()));
putBlockRequest.getTraceID().equals(response.getTraceID()));
// Container cannot be deleted forcibly because
// the container is not closed.
@ -529,7 +537,7 @@ public class TestOzoneContainer {
writeChunkForContainer(XceiverClientSpi client,
long containerID, int dataLen) throws Exception {
// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);;
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(client.getPipeline(),
blockID, dataLen);

View File

@ -89,7 +89,7 @@ public class TestContainerStateMachine {
// add putKey request
ContainerCommandRequestProto putKeyProto = ContainerTestHelper
.getPutKeyRequest(pipeline, writeChunkProto.getWriteChunk());
.getPutBlockRequest(pipeline, writeChunkProto.getWriteChunk());
RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
TransactionContext createContainerCtxt =

View File

@ -1209,8 +1209,8 @@ public class TestOzoneManager {
//Disabling this test
@Ignore("Disabling this test until Open Key is fixed.")
public void testExpiredOpenKey() throws Exception {
// BackgroundService openKeyCleanUpService = ((KeyManagerImpl)cluster
// .getOzoneManager().getKeyManager()).getOpenKeyCleanupService();
// BackgroundService openKeyCleanUpService = ((BlockManagerImpl)cluster
// .getOzoneManager().getBlockManager()).getOpenKeyCleanupService();
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);

View File

@ -103,7 +103,7 @@ public class TestContainerSmallFile {
}
@Test
public void testInvalidKeyRead() throws Exception {
public void testInvalidBlockRead() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
@ -116,7 +116,7 @@ public class TestContainerSmallFile {
container.getContainerInfo().getContainerID(), traceID);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the key");
thrown.expectMessage("Unable to find the block");
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());

View File

@ -107,7 +107,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
// Now, explicitly make a putKey request for the block.
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper
.getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
.getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
client.sendCommand(putKeyRequest);
response = ContainerProtocolCalls
.getCommittedBlockLength(client, blockID, traceID);
@ -155,7 +155,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
}
@Test
public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
@ -174,7 +174,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
Assert.assertTrue(sce.getMessage().contains("Unable to find the block"));
}
xceiverClientManager.releaseClient(client);
}
@ -216,7 +216,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
@Test
public void tesPutKeyResposne() throws Exception {
ContainerProtos.PutKeyResponseProto response;
ContainerProtos.PutBlockResponseProto response;
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
@ -239,8 +239,8 @@ public class TestGetCommittedBlockLengthAndPutKey {
// Now, explicitly make a putKey request for the block.
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper
.getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest).getPutKey();
.getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest).getPutBlock();
// make sure the block ids in the request and response are same.
// This will also ensure that closing the container committed the block
// on the Datanodes.

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@ -278,29 +278,29 @@ public class TestKeys {
}
static void runTestPutKey(PutHelper helper) throws Exception {
final ClientProtocol client = helper.client;
final ClientProtocol helperClient = helper.client;
helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
List<OzoneKey> keyList = client
List<OzoneKey> keyList = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(), null,
null, 10);
Assert.assertEquals(1, keyList.size());
// test list key using a more efficient call
String newkeyName = OzoneUtils.getRequestID().toLowerCase();
OzoneOutputStream ozoneOutputStream = client
OzoneOutputStream ozoneOutputStream = helperClient
.createKey(helper.getVol().getName(), helper.getBucket().getName(),
newkeyName, 0, replicationType, replicationFactor);
ozoneOutputStream.close();
keyList = client
keyList = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(), null,
null, 10);
Assert.assertEquals(2, keyList.size());
// test new put key with invalid volume/bucket name
try {
ozoneOutputStream = client
ozoneOutputStream = helperClient
.createKey("invalid-volume", helper.getBucket().getName(), newkeyName,
0, replicationType, replicationFactor);
ozoneOutputStream.close();
@ -312,7 +312,7 @@ public class TestKeys {
}
try {
ozoneOutputStream = client
ozoneOutputStream = helperClient
.createKey(helper.getVol().getName(), "invalid-bucket", newkeyName, 0,
replicationType, replicationFactor);
ozoneOutputStream.close();
@ -380,7 +380,7 @@ public class TestKeys {
}
static void runTestPutAndGetKey(PutHelper helper) throws Exception {
final ClientProtocol client = helper.client;
final ClientProtocol helperClient = helper.client;
String keyName = helper.putKey();
assertNotNull(helper.getBucket());
@ -427,7 +427,8 @@ public class TestKeys {
// test new get key with invalid volume/bucket name
try {
client.getKey("invalid-volume", helper.getBucket().getName(), keyName);
helperClient.getKey(
"invalid-volume", helper.getBucket().getName(), keyName);
fail("Get key should have thrown " + "when using invalid volume name.");
} catch (IOException e) {
GenericTestUtils
@ -435,7 +436,8 @@ public class TestKeys {
}
try {
client.getKey(helper.getVol().getName(), "invalid-bucket", keyName);
helperClient.getKey(
helper.getVol().getName(), "invalid-bucket", keyName);
fail("Get key should have thrown " + "when using invalid bucket name.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
@ -476,7 +478,7 @@ public class TestKeys {
}
static void runTestPutAndListKey(PutHelper helper) throws Exception {
ClientProtocol client = helper.client;
ClientProtocol helperClient = helper.client;
helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
@ -495,7 +497,7 @@ public class TestKeys {
List<OzoneKey> keyList1 =
IteratorUtils.toList(helper.getBucket().listKeys(null, null));
// test list key using a more efficient call
List<OzoneKey> keyList2 = client
List<OzoneKey> keyList2 = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(), null,
null, 100);
@ -515,7 +517,7 @@ public class TestKeys {
}
// test maxLength parameter of list keys
keyList2 = client
keyList2 = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(), null,
null, 1);
Assert.assertEquals(1, keyList2.size());
@ -523,7 +525,7 @@ public class TestKeys {
// test startKey parameter of list keys
keyList1 = IteratorUtils
.toList(helper.getBucket().listKeys("list-key", "list-key4"));
keyList2 = client
keyList2 = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(),
"list-key", "list-key4", 100);
Assert.assertEquals(5, keyList1.size());
@ -532,7 +534,7 @@ public class TestKeys {
// test prefix parameter of list keys
keyList1 =
IteratorUtils.toList(helper.getBucket().listKeys("list-key2", null));
keyList2 = client
keyList2 = helperClient
.listKeys(helper.getVol().getName(), helper.getBucket().getName(),
"list-key2", null, 100);
Assert.assertTrue(
@ -542,7 +544,7 @@ public class TestKeys {
// test new list keys with invalid volume/bucket name
try {
client.listKeys("invalid-volume", helper.getBucket().getName(),
helperClient.listKeys("invalid-volume", helper.getBucket().getName(),
null, null, 100);
fail("List keys should have thrown when using invalid volume name.");
} catch (IOException e) {
@ -551,7 +553,7 @@ public class TestKeys {
}
try {
client.listKeys(helper.getVol().getName(), "invalid-bucket", null,
helperClient.listKeys(helper.getVol().getName(), "invalid-bucket", null,
null, 100);
fail("List keys should have thrown when using invalid bucket name.");
} catch (IOException e) {
@ -697,10 +699,10 @@ public class TestKeys {
.KeyValueContainer);
KeyValueContainer container = (KeyValueContainer) cm.getContainerSet()
.getContainer(location.getBlockID().getContainerID());
KeyData blockInfo = keyValueHandler
.getKeyManager().getKey(container, location.getBlockID());
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
BlockData blockInfo = keyValueHandler
.getBlockManager().getBlock(container, location.getBlockID());
KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData();
File dataDir = new File(containerData.getChunksPath());
for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
File chunkFile = dataDir.toPath()

View File

@ -54,7 +54,7 @@ public class BucketManagerImpl implements BucketManager {
/**
* MetadataDB is maintained in MetadataManager and shared between
* BucketManager and VolumeManager. (and also by KeyManager)
* BucketManager and VolumeManager. (and also by BlockManager)
*
* BucketManager uses MetadataDB to store bucket level information.
*

View File

@ -102,7 +102,7 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
}
/**
* Returns Fake blocks to the KeyManager so we get blocks in the Database.
* Returns Fake blocks to the BlockManager so we get blocks in the Database.
* @param size - size of the block.
* @param type Replication Type
* @param factor - Replication factor

View File

@ -48,17 +48,16 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutKeyRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -141,7 +140,7 @@ public class BenchMarkDatanodeDispatcher {
long containerID = containers.get(y);
BlockID blockID = new BlockID(containerID, key);
dispatcher
.dispatch(getPutKeyCommand(blockID, chunkName));
.dispatch(getPutBlockCommand(blockID, chunkName));
dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName));
}
}
@ -213,38 +212,39 @@ public class BenchMarkDatanodeDispatcher {
return builder.build();
}
private ContainerCommandRequestProto getPutKeyCommand(
private ContainerCommandRequestProto getPutBlockCommand(
BlockID blockID, String chunkKey) {
PutKeyRequestProto.Builder putKeyRequest = PutKeyRequestProto
PutBlockRequestProto.Builder putBlockRequest = PutBlockRequestProto
.newBuilder()
.setKeyData(getKeyData(blockID, chunkKey));
.setBlockData(getBlockData(blockID, chunkKey));
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder();
request.setCmdType(ContainerProtos.Type.PutKey)
request.setCmdType(ContainerProtos.Type.PutBlock)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setPutKey(putKeyRequest);
.setPutBlock(putBlockRequest);
return request.build();
}
private ContainerCommandRequestProto getGetKeyCommand(BlockID blockID) {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto.newBuilder()
private ContainerCommandRequestProto getGetBlockCommand(BlockID blockID) {
GetBlockRequestProto.Builder readBlockRequest =
GetBlockRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(ContainerProtos.Type.GetKey)
.setCmdType(ContainerProtos.Type.GetBlock)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setGetKey(readKeyRequest);
.setGetBlock(readBlockRequest);
return request.build();
}
private ContainerProtos.KeyData getKeyData(
private ContainerProtos.BlockData getBlockData(
BlockID blockID, String chunkKey) {
ContainerProtos.KeyData.Builder builder = ContainerProtos.KeyData
ContainerProtos.BlockData.Builder builder = ContainerProtos.BlockData
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addChunks(getChunkInfo(blockID, chunkKey));
@ -275,16 +275,16 @@ public class BenchMarkDatanodeDispatcher {
}
@Benchmark
public void putKey(BenchMarkDatanodeDispatcher bmdd) {
public void putBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID();
String chunkKey = getNewChunkToWrite();
bmdd.dispatcher.dispatch(getPutKeyCommand(blockID, chunkKey));
bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey));
}
@Benchmark
public void getKey(BenchMarkDatanodeDispatcher bmdd) {
public void getBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID();
bmdd.dispatcher.dispatch(getGetKeyCommand(blockID));
bmdd.dispatcher.dispatch(getGetBlockCommand(blockID));
}
// Chunks writes from benchmark only reaches certain containers