HDDS-708. Validate BCSID while reading blocks from containers in datanodes. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-10-23 16:52:17 +05:30
parent 0b62983c5a
commit b61846392e
11 changed files with 116 additions and 29 deletions

View File

@ -81,14 +81,17 @@ public final class ContainerProtocolCalls {
* @param xceiverClient client to perform call * @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container * @param datanodeBlockID blockID to identify container
* @param traceID container protocol call args * @param traceID container protocol call args
* @param blockCommitSequenceId latest commit Id of the block
* @return container protocol get block response * @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID, String traceID) throws IOException { DatanodeBlockID datanodeBlockID, String traceID,
long blockCommitSequenceId) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder() .newBuilder()
.setBlockID(datanodeBlockID); .setBlockID(datanodeBlockID)
.setBlockCommitSequenceId(blockCommitSequenceId);
String id = xceiverClient.getPipeline().getLeader().getUuidString(); String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto ContainerCommandRequestProto request = ContainerCommandRequestProto

View File

@ -140,6 +140,8 @@ enum Result {
UNKNOWN_CONTAINER_TYPE = 34; UNKNOWN_CONTAINER_TYPE = 34;
BLOCK_NOT_COMMITTED = 35; BLOCK_NOT_COMMITTED = 35;
CONTAINER_UNHEALTHY = 36; CONTAINER_UNHEALTHY = 36;
UNKNOWN_BCSID = 37;
BCSID_MISMATCH = 38;
} }
/** /**
@ -315,6 +317,7 @@ message PutBlockResponseProto {
message GetBlockRequestProto { message GetBlockRequestProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
optional uint64 blockCommitSequenceId = 2 [default = 0];
} }
message GetBlockResponseProto { message GetBlockResponseProto {
@ -333,7 +336,7 @@ message GetCommittedBlockLengthRequestProto {
message GetCommittedBlockLengthResponseProto { message GetCommittedBlockLengthResponseProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
required int64 blockLength = 2; required int64 blockLength = 2;
optional uint64 blockCommitSequenceId = 3; optional uint64 blockCommitSequenceId = 3 [default = 0];
} }
message DeleteBlockResponseProto { message DeleteBlockResponseProto {

View File

@ -483,7 +483,8 @@ public class KeyValueHandler extends Handler {
try { try {
BlockID blockID = BlockID.getFromProtobuf( BlockID blockID = BlockID.getFromProtobuf(
request.getGetBlock().getBlockID()); request.getGetBlock().getBlockID());
responseData = blockManager.getBlock(kvContainer, blockID); responseData = blockManager.getBlock(kvContainer, blockID,
request.getGetBlock().getBlockCommitSequenceId());
long numBytes = responseData.getProtoBufMessage().toByteArray().length; long numBytes = responseData.getProtoBufMessage().toByteArray().length;
metrics.incContainerBytesStats(Type.GetBlock, numBytes); metrics.incContainerBytesStats(Type.GetBlock, numBytes);
@ -722,6 +723,7 @@ public class KeyValueHandler extends Handler {
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage()); chunks.add(chunkInfo.getProtoBufMessage());
blockData.setChunks(chunks); blockData.setChunks(chunks);
// TODO: add bcsId as a part of putSmallFile transaction
blockManager.putBlock(kvContainer, blockData); blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.length); metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
@ -755,7 +757,9 @@ public class KeyValueHandler extends Handler {
try { try {
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
.getBlockID()); .getBlockID());
BlockData responseData = blockManager.getBlock(kvContainer, blockID); // TODO: add bcsId as a part of getSmallFile transaction
// by default its 0
BlockData responseData = blockManager.getBlock(kvContainer, blockID, 0);
ContainerProtos.ChunkInfo chunkInfo = null; ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY; ByteString dataBuf = ByteString.EMPTY;

View File

@ -45,7 +45,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
/** /**
* This class is for performing block related operations on the KeyValue * This class is for performing block related operations on the KeyValue
* Container. * Container.
@ -90,24 +91,23 @@ public class BlockManagerImpl implements BlockManager {
// Should never fail. // Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here"); Preconditions.checkNotNull(db, "DB cannot be null here");
long blockCommitSequenceId = data.getBlockCommitSequenceId(); long bcsId = data.getBlockCommitSequenceId();
byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey); long containerBCSId = ((KeyValueContainerData) container.getContainerData())
.getBlockCommitSequenceId();
// default blockCommitSequenceId for any block is 0. It the putBlock // default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0. // request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well // In such cases, we should overwrite the block as well
if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) { if (bcsId != 0) {
if (blockCommitSequenceId <= Longs if (bcsId <= containerBCSId) {
.fromByteArray(blockCommitSequenceIdValue)) {
// Since the blockCommitSequenceId stored in the db is greater than // Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock // equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart. // transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db. // It also implies that the given block must already exist in the db.
// just log and return // just log and return
LOG.warn("blockCommitSequenceId " + Longs LOG.warn("blockCommitSequenceId " + containerBCSId
.fromByteArray(blockCommitSequenceIdValue)
+ " in the Container Db is greater than" + " the supplied value " + " in the Container Db is greater than" + " the supplied value "
+ blockCommitSequenceId + " .Ignoring it"); + bcsId + " .Ignoring it");
return data.getSize(); return data.getSize();
} }
} }
@ -116,9 +116,9 @@ public class BlockManagerImpl implements BlockManager {
batch.put(Longs.toByteArray(data.getLocalID()), batch.put(Longs.toByteArray(data.getLocalID()),
data.getProtoBufMessage().toByteArray()); data.getProtoBufMessage().toByteArray());
batch.put(blockCommitSequenceIdKey, batch.put(blockCommitSequenceIdKey,
Longs.toByteArray(blockCommitSequenceId)); Longs.toByteArray(bcsId));
db.writeBatch(batch); db.writeBatch(batch);
container.updateBlockCommitSequenceId(blockCommitSequenceId); container.updateBlockCommitSequenceId(bcsId);
// Increment keycount here // Increment keycount here
container.getContainerData().incrKeyCount(); container.getContainerData().incrKeyCount();
return data.getSize(); return data.getSize();
@ -129,10 +129,12 @@ public class BlockManagerImpl implements BlockManager {
* *
* @param container - Container from which block need to be fetched. * @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block. * @param blockID - BlockID of the block.
* @param bcsId latest commit Id of the block
* @return Key Data. * @return Key Data.
* @throws IOException * @throws IOException
*/ */
public BlockData getBlock(Container container, BlockID blockID) @Override
public BlockData getBlock(Container container, BlockID blockID, long bcsId)
throws IOException { throws IOException {
Preconditions.checkNotNull(blockID, Preconditions.checkNotNull(blockID,
"BlockID cannot be null in GetBlock request"); "BlockID cannot be null in GetBlock request");
@ -145,6 +147,14 @@ public class BlockManagerImpl implements BlockManager {
// This is a post condition that acts as a hint to the user. // This is a post condition that acts as a hint to the user.
// Should never fail. // Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here"); Preconditions.checkNotNull(db, "DB cannot be null here");
long containerBCSId = containerData.getBlockCommitSequenceId();
if (containerBCSId < bcsId) {
throw new StorageContainerException(
"Unable to find the block with bcsID " + bcsId + " .Container "
+ container.getContainerData().getContainerID() + " bcsId is "
+ containerBCSId + ".", UNKNOWN_BCSID);
}
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) { if (kData == null) {
throw new StorageContainerException("Unable to find the block.", throw new StorageContainerException("Unable to find the block.",
@ -152,6 +162,12 @@ public class BlockManagerImpl implements BlockManager {
} }
ContainerProtos.BlockData blockData = ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData); ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockCommitSequenceId();
if (id < bcsId) {
throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id "
+ id + " for block " + blockID + ".", BCSID_MISMATCH);
}
return BlockData.getFromProtoBuf(blockData); return BlockData.getFromProtoBuf(blockData);
} }

View File

@ -45,10 +45,12 @@ public interface BlockManager {
* *
* @param container - Container from which block need to be get. * @param container - Container from which block need to be get.
* @param blockID - BlockID of the Block. * @param blockID - BlockID of the Block.
* @param bcsId latest commit id of the block
* @return Block Data. * @return Block Data.
* @throws IOException * @throws IOException
*/ */
BlockData getBlock(Container container, BlockID blockID) throws IOException; BlockData getBlock(Container container, BlockID blockID, long bcsId)
throws IOException;
/** /**
* Deletes an existing block. * Deletes an existing block.

View File

@ -113,7 +113,7 @@ public class TestBlockManagerImpl {
assertEquals(1, keyValueContainer.getContainerData().getKeyCount()); assertEquals(1, keyValueContainer.getContainerData().getKeyCount());
//Get Block //Get Block
BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer, BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer,
blockData.getBlockID()); blockData.getBlockID(), 0);
assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID()); assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID());
assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID()); assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID());
@ -139,7 +139,7 @@ public class TestBlockManagerImpl {
assertEquals(0, assertEquals(0,
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
blockManager.getBlock(keyValueContainer, blockID); blockManager.getBlock(keyValueContainer, blockID, 0);
fail("testDeleteBlock"); fail("testDeleteBlock");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
@ -197,7 +197,7 @@ public class TestBlockManagerImpl {
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
//Since the block has been deleted, we should not be able to find it //Since the block has been deleted, we should not be able to find it
blockManager.getBlock(keyValueContainer, blockID); blockManager.getBlock(keyValueContainer, blockID, 0);
fail("testGetNoSuchBlock failed"); fail("testGetNoSuchBlock failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(

View File

@ -294,7 +294,8 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf(); .getDatanodeBlockIDProtobuf();
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId); .getBlock(xceiverClient, datanodeBlockID, requestId,
omKeyLocationInfo.getBlockCommitSequenceId());
List<ContainerProtos.ChunkInfo> chunks = List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList(); response.getBlockData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) { for (ContainerProtos.ChunkInfo chunk : chunks) {

View File

@ -151,7 +151,7 @@ public class TestContainerReplication {
.getHandler(ContainerType.KeyValueContainer); .getHandler(ContainerType.KeyValueContainer);
BlockData key = handler.getBlockManager() BlockData key = handler.getBlockManager()
.getBlock(container, BlockID.getFromProtobuf(blockID)); .getBlock(container, BlockID.getFromProtobuf(blockID), 0);
Assert.assertNotNull(key); Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size()); Assert.assertEquals(1, key.getChunks().size());

View File

@ -256,6 +256,6 @@ public class TestCloseContainerHandler {
openContainerBlockMap.getBlockDataMap(testContainerID)); openContainerBlockMap.getBlockDataMap(testContainerID));
// Make sure the key got committed // Make sure the key got committed
Assert.assertNotNull(handler.getBlockManager() Assert.assertNotNull(handler.getBlockManager()
.getBlock(container, blockID)); .getBlock(container, blockID, 0));
} }
} }

View File

@ -73,6 +73,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
@ -556,7 +558,63 @@ public class TestContainerPersistence {
blockData.setChunks(chunkList); blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID()); getBlock(container, blockData.getBlockID(), 0);
ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
}
/**
* Tests a put block and read block with invalid bcsId.
*
* @throws IOException
* @throws NoSuchAlgorithmException
*/
@Test
public void testPutBlockWithInvalidBCSId()
throws IOException, NoSuchAlgorithmException {
long testContainerID = getTestContainerID();
Container container = addContainer(containerSet, testContainerID);
BlockID blockID1 = ContainerTestHelper.getTestBlockID(testContainerID);
ChunkInfo info = writeChunkHelper(blockID1);
BlockData blockData = new BlockData(blockID1);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
blockData.setChunks(chunkList);
blockData.setBlockCommitSequenceId(3);
blockManager.putBlock(container, blockData);
chunkList.clear();
// write a 2nd block
BlockID blockID2 = ContainerTestHelper.getTestBlockID(testContainerID);
info = writeChunkHelper(blockID2);
blockData = new BlockData(blockID2);
chunkList.add(info.getProtoBufMessage());
blockData.setChunks(chunkList);
blockData.setBlockCommitSequenceId(4);
blockManager.putBlock(container, blockData);
BlockData readBlockData;
try {
// read with bcsId higher than container bcsId
blockManager.
getBlock(container, blockID1, 5);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert.assertTrue(sce.getResult() == UNKNOWN_BCSID);
}
try {
// read with bcsId lower than container bcsId but greater than committed
// bcsId.
blockManager.
getBlock(container, blockID1, 4);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert.assertTrue(sce.getResult() == BCSID_MISMATCH);
}
readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 4);
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
@ -608,7 +666,7 @@ public class TestContainerPersistence {
blockData.setChunks(chunkProtoList); blockData.setChunks(chunkProtoList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID()); getBlock(container, blockData.getBlockID(), 0);
ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
@ -636,7 +694,7 @@ public class TestContainerPersistence {
blockManager.deleteBlock(container, blockID); blockManager.deleteBlock(container, blockID);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the block."); exception.expectMessage("Unable to find the block.");
blockManager.getBlock(container, blockData.getBlockID()); blockManager.getBlock(container, blockData.getBlockID(), 0);
} }
/** /**

View File

@ -699,8 +699,8 @@ public class TestKeys {
.KeyValueContainer); .KeyValueContainer);
KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() KeyValueContainer container = (KeyValueContainer) cm.getContainerSet()
.getContainer(location.getBlockID().getContainerID()); .getContainer(location.getBlockID().getContainerID());
BlockData blockInfo = keyValueHandler BlockData blockInfo = keyValueHandler.getBlockManager()
.getBlockManager().getBlock(container, location.getBlockID()); .getBlock(container, location.getBlockID(), 0);
KeyValueContainerData containerData = KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData(); (KeyValueContainerData) container.getContainerData();
File dataDir = new File(containerData.getChunksPath()); File dataDir = new File(containerData.getChunksPath());