HDDS-697. update and validate the BCSID for PutSmallFile/GetSmallFile command. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-11-01 10:21:25 +05:30
parent c5eb237e3e
commit b13c56742a
8 changed files with 85 additions and 32 deletions

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
PutSmallFileResponseProto;
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
@ -231,10 +233,11 @@ public final class ContainerProtocolCalls {
* @param blockID - ID of the block
* @param data - Data to be written into the container.
* @param traceID - Trace ID for logging purpose.
* @return container protocol writeSmallFile response
* @throws IOException
*/
public static void writeSmallFile(XceiverClientSpi client,
BlockID blockID, byte[] data, String traceID)
public static PutSmallFileResponseProto writeSmallFile(
XceiverClientSpi client, BlockID blockID, byte[] data, String traceID)
throws IOException {
BlockData containerBlockData =
@ -268,6 +271,7 @@ public final class ContainerProtocolCalls {
.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response);
return response.getPutSmallFile();
}
/**

View File

@ -413,7 +413,7 @@ message PutSmallFileRequestProto {
message PutSmallFileResponseProto {
required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}
message GetSmallFileRequestProto {

View File

@ -481,10 +481,11 @@ public class ContainerStateMachine extends BaseStateMachine {
getRequestProto(trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType();
CompletableFuture<Message> future;
if (cmdType == Type.PutBlock) {
if (cmdType == Type.PutBlock || cmdType == Type.PutSmallFile) {
BlockData blockData;
ContainerProtos.BlockData blockDataProto =
requestProto.getPutBlock().getBlockData();
ContainerProtos.BlockData blockDataProto = cmdType == Type.PutBlock ?
requestProto.getPutBlock().getBlockData() :
requestProto.getPutSmallFile().getBlock().getBlockData();
// set the blockCommitSequenceId
try {
@ -499,9 +500,20 @@ public class ContainerStateMachine extends BaseStateMachine {
ContainerProtos.PutBlockRequestProto
.newBuilder(requestProto.getPutBlock())
.setBlockData(blockData.getProtoBufMessage()).build();
ContainerCommandRequestProto containerCommandRequestProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setPutBlock(putBlockRequestProto).build();
ContainerCommandRequestProto containerCommandRequestProto;
if (cmdType == Type.PutSmallFile) {
ContainerProtos.PutSmallFileRequestProto smallFileRequestProto =
ContainerProtos.PutSmallFileRequestProto
.newBuilder(requestProto.getPutSmallFile())
.setBlock(putBlockRequestProto).build();
containerCommandRequestProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setPutSmallFile(smallFileRequestProto).build();
} else {
containerCommandRequestProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setPutBlock(putBlockRequestProto).build();
}
future = CompletableFuture
.supplyAsync(() -> runCommand(containerCommandRequestProto),
getCommandExecutor(requestProto));

View File

@ -756,8 +756,6 @@ public class KeyValueHandler extends Handler {
try {
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
.getBlockID());
// TODO: add bcsId as a part of getSmallFile transaction
// by default its 0
BlockData responseData = blockManager.getBlock(kvContainer, blockID);
ContainerProtos.ChunkInfo chunkInfo = null;

View File

@ -188,7 +188,7 @@ public final class BlockUtils {
return builder.build();
}
private static GetCommittedBlockLengthResponseProto.Builder
public static GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder(long blockLength,
ContainerProtos.DatanodeBlockID blockID) {
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder

View File

@ -47,6 +47,13 @@ public final class SmallFileUtils {
ContainerCommandRequestProto msg) {
ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
ContainerProtos.PutSmallFileResponseProto.newBuilder();
ContainerProtos.BlockData blockData =
msg.getPutSmallFile().getBlock().getBlockData();
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder = BlockUtils
.getCommittedBlockLengthResponseBuilder(blockData.getSize(),
blockData.getBlockID());
getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder);
ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setCmdType(ContainerProtos.Type.PutSmallFile);

View File

@ -23,16 +23,9 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerAction.Action;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerAction.Reason;
import org.apache.hadoop.hdds.scm.container.
common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
@ -88,7 +81,6 @@ public class TestContainerStateMachineFailures {
File baseDir = new File(path);
baseDir.mkdirs();
chunkSize = (int) OzoneConsts.MB;
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
@ -140,7 +132,6 @@ public class TestContainerStateMachineFailures {
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
long containerID = omKeyLocationInfo.getContainerID();
// delete the container dir
FileUtil.fullyDelete(new File(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
@ -171,15 +162,5 @@ public class TestContainerStateMachineFailures {
Assert.assertTrue(((StorageContainerException) ioe.getCause()).getResult()
== ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
StorageContainerDatanodeProtocolProtos.ContainerAction action =
StorageContainerDatanodeProtocolProtos.ContainerAction.newBuilder()
.setContainerID(containerID).setAction(Action.CLOSE)
.setReason(Reason.CONTAINER_UNHEALTHY)
.build();
// Make sure the container close action is initiated to SCM.
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext()
.getAllPendingContainerActions().contains(action));
}
}
}

View File

@ -152,7 +152,58 @@ public class TestContainerSmallFile {
xceiverClientManager.releaseClient(client);
}
@Test
public void testReadWriteWithBCSId() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID);
BlockID blockID1 = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
ContainerProtos.PutSmallFileResponseProto responseProto =
ContainerProtocolCalls
.writeSmallFile(client, blockID1, "data123".getBytes(), traceID);
long bcsId = responseProto.getCommittedBlockLength().getBlockID()
.getBlockCommitSequenceId();
try {
blockID1.setBlockCommitSequenceId(bcsId + 1);
//read a file with higher bcsId than the container bcsId
ContainerProtocolCalls
.readSmallFile(client, blockID1, traceID);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert
.assertTrue(sce.getResult() == ContainerProtos.Result.UNKNOWN_BCSID);
}
// write a new block again to bump up the container bcsId
BlockID blockID2 = ContainerTestHelper
.getTestBlockID(container.getContainerInfo().getContainerID());
ContainerProtocolCalls
.writeSmallFile(client, blockID2, "data123".getBytes(), traceID);
try {
blockID1.setBlockCommitSequenceId(bcsId + 1);
//read a file with higher bcsId than the committed bcsId for the block
ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert
.assertTrue(sce.getResult() == ContainerProtos.Result.BCSID_MISMATCH);
}
blockID1.setBlockCommitSequenceId(bcsId);
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client);
}
}