HDDS-629. Make ApplyTransaction calls in ContainerStateMachine idempotent. Contributed by Shashikant Banerjee.

This commit is contained in:
Jitendra Pandey 2018-10-15 11:52:38 -07:00
parent a619d120a6
commit b51bc6b93f
6 changed files with 203 additions and 23 deletions

View File

@ -241,9 +241,12 @@ public class KeyValueHandler extends Handler {
newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
containerSet.addContainer(newContainer);
} else {
throw new StorageContainerException("Container already exists with " +
"container Id " + containerID, ContainerProtos.Result
.CONTAINER_EXISTS);
// The create container request for an already existing container can
// arrive in case the ContainerStateMachine reapplies the transaction
// on datanode restart. Just log a warning msg here.
LOG.warn("Container already exists." +
"container Id " + containerID);
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -370,6 +373,7 @@ public class KeyValueHandler extends Handler {
/**
* Handles Close Container Request. An open container is closed.
* Close Container call is idempotent.
*/
ContainerCommandResponseProto handleCloseContainer(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

View File

@ -231,21 +231,18 @@ public final class ChunkUtils {
*
* @param chunkFile - chunkFile to write data into.
* @param info - chunk info.
* @return boolean isOverwrite
* @throws StorageContainerException
* @return true if the chunkFile exists and chunkOffset < chunkFile length,
* false otherwise.
*/
public static boolean validateChunkForOverwrite(File chunkFile,
ChunkInfo info) throws StorageContainerException {
ChunkInfo info) {
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (isOverWriteRequested(chunkFile, info)) {
if (!isOverWritePermitted(info)) {
log.error("Rejecting write chunk request. Chunk overwrite " +
log.warn("Duplicate write chunk request. Chunk overwrite " +
"without explicit request. {}", info.toString());
throw new StorageContainerException("Rejecting write chunk request. " +
"OverWrite flag required." + info.toString(),
OVERWRITE_FLAG_REQUIRED);
}
return true;
}

View File

@ -89,11 +89,33 @@ public class BlockManagerImpl implements BlockManager {
Preconditions.checkNotNull(db, "DB cannot be null here");
long blockCommitSequenceId = data.getBlockCommitSequenceId();
byte[] blockCommitSequenceIdKey =
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey);
// default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well
if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) {
if (blockCommitSequenceId <= Longs
.fromByteArray(blockCommitSequenceIdValue)) {
// Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db.
// just log and return
LOG.warn("blockCommitSequenceId " + Longs
.fromByteArray(blockCommitSequenceIdValue)
+ " in the Container Db is greater than" + " the supplied value "
+ blockCommitSequenceId + " .Ignoring it");
return data.getSize();
}
}
// update the blockData as well as BlockCommitSequenceId here
BatchOperation batch = new BatchOperation();
batch.put(Longs.toByteArray(data.getLocalID()),
data.getProtoBufMessage().toByteArray());
batch.put(DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX),
batch.put(blockCommitSequenceIdKey,
Longs.toByteArray(blockCommitSequenceId));
db.writeBatch(batch);
container.updateBlockCommitSequenceId(blockCommitSequenceId);

View File

@ -87,6 +87,32 @@ public class ChunkManagerImpl implements ChunkManager {
switch (stage) {
case WRITE_DATA:
if (isOverwrite) {
// if the actual chunk file already exists here while writing the temp
// chunk file, then it means the same ozone client request has
// generated two raft log entries. This can happen either because
// retryCache expired in Ratis (or log index mismatch/corruption in
// Ratis). This can be solved by two approaches as of now:
// 1. Read the complete data in the actual chunk file ,
// verify the data integrity and in case it mismatches , either
// 2. Delete the chunk File and write the chunk again. For now,
// let's rewrite the chunk file
// TODO: once the checksum support for write chunks gets plugged in,
// the checksum needs to be verified for the actual chunk file and
// the data to be written here which should be efficient and
// it matches we can safely return without rewriting.
LOG.warn("ChunkFile already exists" + chunkFile + ".Deleting it.");
FileUtil.fullyDelete(chunkFile);
}
if (tmpChunkFile.exists()) {
// If the tmp chunk file already exists it means the raft log got
// appended, but later on the log entry got truncated in Ratis leaving
// behind garbage.
// TODO: once the checksum support for data chunks gets plugged in,
// instead of rewriting the chunk here, let's compare the checkSums
LOG.warn(
"tmpChunkFile already exists" + tmpChunkFile + "Overwriting it.");
}
// Initially writes to temporary chunk file.
ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats);
// No need to increment container stats here, as still data is not
@ -95,6 +121,15 @@ public class ChunkManagerImpl implements ChunkManager {
case COMMIT_DATA:
// commit the data, means move chunk data from temporary chunk file
// to actual chunk file.
if (isOverwrite) {
// if the actual chunk file already exists , it implies the write
// chunk transaction in the containerStateMachine is getting
// reapplied. This can happen when a node restarts.
// TODO: verify the checkSums for the existing chunkFile and the
// chunkInfo to be committed here
LOG.warn("ChunkFile already exists" + chunkFile);
return;
}
commitChunk(tmpChunkFile, chunkFile);
// Increment container stats here, as we commit the data.
containerData.incrBytesUsed(info.getLen());
@ -200,6 +235,14 @@ public class ChunkManagerImpl implements ChunkManager {
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
// if the chunk file does not exist, it might have already been deleted.
// The call might be because of reapply of transactions on datanode
// restart.
if (!chunkFile.exists()) {
LOG.warn("Chunk file doe not exist. chunk info :" + info.toString());
return;
}
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
FileUtil.fullyDelete(chunkFile);
containerData.decrBytesUsed(chunkFile.length());

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.protocolPB.
StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
/**
* Tests the idempotent operations in ContainerStateMachine.
*/
public class TestContainerStateMachineIdempotency {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration ozoneConfig;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static XceiverClientManager xceiverClientManager;
private static String containerOwner = "OZONE";
@BeforeClass
public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster =
MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
xceiverClientManager = new XceiverClientManager(ozoneConfig);
}
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
}
@Test
public void testContainerStateMachineIdempotency() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
try {
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
// call create Container again
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper
.getWriteChunkRequest(container.getPipeline(), blockID,
data.length);
client.sendCommand(writeChunkRequest);
//Make the write chunk request again without requesting for overWrite
client.sendCommand(writeChunkRequest);
// Now, explicitly make a putKey request for the block.
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper
.getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
client.sendCommand(putKeyRequest).getPutBlock();
// send the putBlock again
client.sendCommand(putKeyRequest);
// close container call
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
} catch (IOException ioe) {
Assert.fail("Container operation failed" + ioe);
}
xceiverClientManager.releaseClient(client);
}
}

View File

@ -462,15 +462,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
try {
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
} catch (StorageContainerException ex) {
Assert.assertTrue(ex.getMessage().contains(
"Rejecting write chunk request. OverWrite flag required"));
Assert.assertEquals(ex.getResult(),
ContainerProtos.Result.OVERWRITE_FLAG_REQUIRED);
}
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@ -478,7 +470,7 @@ public class TestContainerPersistence {
Assert.assertEquals(datalen, bytesUsed);
long bytesWrite = container.getContainerData().getWriteBytes();
Assert.assertEquals(datalen * 2, bytesWrite);
Assert.assertEquals(datalen * 3, bytesWrite);
}
/**
@ -748,10 +740,11 @@ public class TestContainerPersistence {
}
private BlockData writeBlockHelper(BlockID blockID)
private BlockData writeBlockHelper(BlockID blockID, int i)
throws IOException, NoSuchAlgorithmException {
ChunkInfo info = writeChunkHelper(blockID);
BlockData blockData = new BlockData(blockID);
blockData.setBlockCommitSequenceId((long) i);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(info.getProtoBufMessage());
blockData.setChunks(chunkList);
@ -766,7 +759,7 @@ public class TestContainerPersistence {
for (int i = 0; i < 10; i++) {
BlockID blockID = new BlockID(testContainerID, i);
expectedBlocks.add(blockID);
BlockData kd = writeBlockHelper(blockID);
BlockData kd = writeBlockHelper(blockID, i);
blockManager.putBlock(container, kd);
}