HDDS-247. Handle CLOSED_CONTAINER_IO exception in ozoneClient. Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-08-28 07:11:36 +05:30
parent 26c2a97c56
commit 3974427f67
6 changed files with 630 additions and 61 deletions

View File

@ -94,6 +94,10 @@ public class ChunkOutputStream extends OutputStream {
this.chunkIndex = 0;
}
public ByteBuffer getBuffer() {
return buffer;
}
@Override
public synchronized void write(int b) throws IOException {
checkOpen();
@ -106,7 +110,8 @@ public class ChunkOutputStream extends OutputStream {
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
public synchronized void write(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
@ -143,25 +148,28 @@ public class ChunkOutputStream extends OutputStream {
@Override
public synchronized void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null &&
buffer != null) {
try {
if (xceiverClientManager != null && xceiverClient != null
&& buffer != null) {
if (buffer.position() > 0) {
writeChunkToContainer();
}
try {
putKey(xceiverClient, containerKeyData.build(), traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
} finally {
cleanup();
}
}
}
public synchronized void cleanup() {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager = null;
xceiverClient = null;
buffer = null;
}
}
}
/**
* Checks if the stream is open. If not, throws an exception.

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@ -46,8 +47,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@ -111,6 +114,11 @@ public class ChunkGroupOutputStream extends OutputStream {
return streamEntries;
}
@VisibleForTesting
public int getOpenID() {
return openID;
}
public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
@ -220,26 +228,9 @@ public class ChunkGroupOutputStream extends OutputStream {
@Override
public synchronized void write(int b) throws IOException {
checkNotClosed();
if (streamEntries.size() <= currentStreamIndex) {
Preconditions.checkNotNull(omClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
try {
allocateNewBlock(currentStreamIndex);
} catch (IOException ioe) {
LOG.error("Allocate block fail when writing.");
throw ioe;
}
}
ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
entry.write(b);
incrementBlockLength(currentStreamIndex, 1);
if (entry.getRemaining() <= 0) {
currentStreamIndex += 1;
}
byteOffset += 1;
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
}
/**
@ -258,7 +249,10 @@ public class ChunkGroupOutputStream extends OutputStream {
public synchronized void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();
handleWrite(b, off, len);
}
private void handleWrite(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
@ -289,9 +283,20 @@ public class ChunkGroupOutputStream extends OutputStream {
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
int writeLen = Math.min(len, (int) current.getRemaining());
try {
current.write(b, off, writeLen);
} catch (IOException ioe) {
if (checkIfContainerIsClosed(ioe)) {
handleCloseContainerException(current, currentStreamIndex);
continue;
} else {
throw ioe;
}
}
incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(true);
currentStreamIndex += 1;
}
len -= writeLen;
@ -300,6 +305,90 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
* datanode.
* b. Reads the data from the underlying buffer and writes it the next stream.
*
* @param streamEntry StreamEntry
* @param streamIndex Index of the entry
* @throws IOException Throws IOexception if Write fails
*/
private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
int streamIndex) throws IOException {
// TODO : If the block is still not committed and is in the
// pending openBlock Map, it will return BLOCK_NOT_COMMITTED
// exception. We should handle this by retrying the same operation
// n times and update the OzoneManager with the actual block length
// written. At this point of time, we also need to allocate new blocks
// from a different container and may need to nullify
// all the remaining pre-allocated blocks in case they were
// pre-allocated on the same container which got closed now.This needs
// caching the closed container list on the client itself.
long committedLength = 0;
ByteBuffer buffer = streamEntry.getBuffer();
if (buffer == null) {
// the buffer here will be null only when closeContainerException is
// hit while calling putKey during close on chunkOutputStream.
// Since closeContainer auto commit pending keys, no need to do
// anything here.
return;
}
// In case where not a single chunk of data has been written to the Datanode
// yet. This block does not yet exist on the datanode but cached on the
// outputStream buffer. No need to call GetCommittedBlockLength here
// for this block associated with the stream here.
if (streamEntry.currentPosition >= chunkSize
|| streamEntry.currentPosition != buffer.position()) {
ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
ContainerProtocolCalls
.getCommittedBlockLength(streamEntry.xceiverClient,
streamEntry.blockID, requestID);
committedLength = responseProto.getBlockLength();
// update the length of the current stream
locationInfoList.get(streamIndex).setLength(committedLength);
}
if (buffer.position() > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode. The cached
// data in the buffer does not exceed chunkSize.
Preconditions.checkState(buffer.position() < chunkSize);
currentStreamIndex += 1;
// readjust the byteOffset value to the length actually been written.
byteOffset -= buffer.position();
handleWrite(buffer.array(), 0, buffer.position());
}
// just clean up the current stream. Since the container is already closed,
// it will be auto committed. No need to call close again here.
streamEntry.cleanup();
// This case will arise when while writing the first chunk itself fails.
// In such case, the current block associated with the stream has no data
// written. Remove it from the current stream list.
if (committedLength == 0) {
streamEntries.remove(streamIndex);
locationInfoList.remove(streamIndex);
Preconditions.checkArgument(currentStreamIndex != 0);
currentStreamIndex -= 1;
}
}
private boolean checkIfContainerIsClosed(IOException ioe) {
return Optional.of(ioe.getCause())
.filter(e -> e instanceof StorageContainerException)
.map(e -> (StorageContainerException) e)
.filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
.isPresent();
}
private long getKeyLength() {
return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
.sum();
}
/**
* Contact OM to get a new block. Set the new block with the index (e.g.
* first block has index = 0, second has index = 1 etc.)
@ -317,11 +406,41 @@ public class ChunkGroupOutputStream extends OutputStream {
@Override
public synchronized void flush() throws IOException {
checkNotClosed();
handleFlushOrClose(false);
}
/**
* Close or Flush the latest outputStream.
* @param close Flag which decides whether to call close or flush on the
* outputStream.
* @throws IOException In case, flush or close fails with exception.
*/
private void handleFlushOrClose(boolean close) throws IOException {
if (streamEntries.size() == 0) {
return;
}
for (int i = 0; i <= currentStreamIndex; i++) {
streamEntries.get(i).flush();
int size = streamEntries.size();
int streamIndex =
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
ChunkOutputStreamEntry entry = streamEntries.get(streamIndex);
if (entry != null) {
try {
if (close) {
entry.close();
} else {
entry.flush();
}
} catch (IOException ioe) {
if (checkIfContainerIsClosed(ioe)) {
// This call will allocate a new streamEntry and write the Data.
// Close needs to be retried on the newly allocated streamEntry as
// as well.
handleCloseContainerException(entry, streamIndex);
handleFlushOrClose(close);
} else {
throw ioe;
}
}
}
}
@ -336,16 +455,11 @@ public class ChunkGroupOutputStream extends OutputStream {
return;
}
closed = true;
for (ChunkOutputStreamEntry entry : streamEntries) {
if (entry != null) {
entry.close();
}
}
handleFlushOrClose(true);
if (keyArgs != null) {
// in test, this could be null
long length =
locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum();
Preconditions.checkState(byteOffset == length);
Preconditions.checkState(streamEntries.size() == locationInfoList.size());
Preconditions.checkState(byteOffset == getKeyLength());
keyArgs.setDataSize(byteOffset);
keyArgs.setLocationInfoList(locationInfoList);
omClient.commitKey(keyArgs, openID);
@ -506,6 +620,23 @@ public class ChunkGroupOutputStream extends OutputStream {
this.outputStream.close();
}
}
ByteBuffer getBuffer() throws IOException {
if (this.outputStream instanceof ChunkOutputStream) {
ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
return out.getBuffer();
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
public void cleanup() {
checkStream();
if (this.outputStream instanceof ChunkOutputStream) {
ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
out.cleanup();
}
}
}
/**

View File

@ -125,19 +125,16 @@ public final class OmKeyInfo {
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
List<OmKeyLocationInfo> currentList =
keyLocationInfoGroup.getLocationList();
Preconditions.checkNotNull(keyLocationInfoGroup);
Preconditions.checkState(locationInfoList.size() <= currentList.size());
for (OmKeyLocationInfo current : currentList) {
// For Versioning, while committing the key for the newer version,
// we just need to update the lengths for new blocks. Need to iterate over
// and find the new blocks added in the latest version.
for (OmKeyLocationInfo info : locationInfoList) {
if (info.getBlockID().equals(current.getBlockID())) {
current.setLength(info.getLength());
break;
}
}
}
List<OmKeyLocationInfo> latestVersionList =
keyLocationInfoGroup.getBlocksLatestVersionOnly();
// Updates the latest locationList in the latest version only with
// given locationInfoList here.
// TODO : The original allocated list and the updated list here may vary
// as the containers on the Datanode on which the blocks were pre allocated
// might get closed. The diff of blocks between these two lists here
// need to be garbage collected in case the ozone client dies.
currentList.removeAll(latestVersionList);
currentList.addAll(locationInfoList);
}
/**

View File

@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
/**
* Tests Close Container Exception handling by Ozone Client.
*/
public class TestCloseContainerHandlingByClient {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static OzoneClient client;
private static ObjectStore objectStore;
private static int chunkSize;
private static int blockSize;
private static String volumeName;
private static String bucketName;
private static String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
chunkSize = (int)OzoneConsts.MB;
blockSize = 4 * chunkSize;
conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "closecontainerexceptionhandlingtest";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
private static String fixedLengthString(String string, int length) {
return String.format("%1$"+length+ "s", string);
}
@Test
public void testBlockWritesWithFlushAndClose() throws Exception {
String keyName = "standalone";
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, 0);
// write data more than 1 chunk
byte[] data =
fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
key.write(data);
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
key.write(data);
key.flush();
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
//we have written two blocks
Assert.assertEquals(2, keyLocationInfos.size());
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
Assert.assertEquals(data.length - (data.length % chunkSize),
omKeyLocationInfo.getLength());
Assert.assertEquals(data.length + (data.length % chunkSize),
keyLocationInfos.get(1).getLength());
Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
// Written the same data twice
String dataString = new String(data);
dataString.concat(dataString);
validateData(keyName, dataString.getBytes());
}
@Test
public void testBlockWritesCloseConsistency() throws Exception {
String keyName = "standalone2";
OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
// write data more than 1 chunk
byte[] data =
fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
key.write(data);
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
// Though we have written only block initially, the close will hit
// closeContainerException and remaining data in the chunkOutputStream
// buffer will be copied into a different allocated block and will be
// committed.
Assert.assertEquals(2, keyLocationInfos.size());
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
Assert.assertEquals(data.length - (data.length % chunkSize),
omKeyLocationInfo.getLength());
Assert.assertEquals(data.length % chunkSize,
keyLocationInfos.get(1).getLength());
Assert.assertEquals(data.length, keyInfo.getDataSize());
validateData(keyName, data);
}
@Test
public void testMultiBlockWrites() throws Exception {
String keyName = "standalone3";
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
// With the initial size provided, it should have preallocated 3 blocks
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
// write data more than 1 chunk
byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
Assert.assertEquals(data.length, 3 * blockSize);
key.write(data);
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
waitForContainerClose(keyName, key,
HddsProtos.ReplicationType.STAND_ALONE);
// write 1 more block worth of data. It will fail and new block will be
// allocated
key.write(fixedLengthString(keyString, blockSize).getBytes());
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
// Though we have written only block initially, the close will hit
// closeContainerException and remaining data in the chunkOutputStream
// buffer will be copied into a different allocated block and will be
// committed.
Assert.assertEquals(4, keyLocationInfos.size());
Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
Assert.assertEquals(blockSize, locationInfo.getLength());
}
}
@Test
public void testMultiBlockWrites2() throws Exception {
String keyName = "standalone4";
long dataLength = 0;
OzoneOutputStream key =
createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
// With the initial size provided, it should have pre allocated 4 blocks
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
String dataString = fixedLengthString(keyString, (3 * blockSize));
byte[] data = dataString.getBytes();
key.write(data);
// 3 block are completely written to the DataNode in 3 blocks.
// Data of length half of chunkSize resides in the chunkOutput stream buffer
String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2);
key.write(dataString2.getBytes());
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
// Though we have written only block initially, the close will hit
// closeContainerException and remaining data in the chunkOutputStream
// buffer will be copied into a different allocated block and will be
// committed.
Assert.assertEquals(4, keyLocationInfos.size());
dataLength = 3 * blockSize + (long) (0.5 * chunkSize);
Assert.assertEquals(dataLength, keyInfo.getDataSize());
validateData(keyName, dataString.concat(dataString2).getBytes());
}
private void waitForContainerClose(String keyName,
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
throws Exception {
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) outputStream.getOutputStream();
int clientId = groupOutputStream.getOpenID();
OMMetadataManager metadataManager =
cluster.getOzoneManager().getMetadataManager();
String objectKey =
metadataManager.getKeyWithDBPrefix(volumeName, bucketName, keyName);
byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientId);
byte[] openKeyData = metadataManager.get(openKey);
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
List<OmKeyLocationInfo> locationInfoList =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
List<Long> containerIdList = new ArrayList<>();
List<Pipeline> pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
for (long containerID : containerIdList) {
Pipeline pipeline =
cluster.getStorageContainerManager().getScmContainerManager()
.getContainerWithPipeline(containerID).getPipeline();
pipelineList.add(pipeline);
List<DatanodeDetails> datanodes = pipeline.getMachines();
for (DatanodeDetails details : datanodes) {
Assert.assertFalse(ContainerTestHelper
.isContainerClosed(cluster, containerID, details));
// send the order to close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(details.getUuid(),
new CloseContainerCommand(containerID, type, pipeline.getId()));
}
}
int index = 0;
for (long containerID : containerIdList) {
Pipeline pipeline = pipelineList.get(index);
List<DatanodeDetails> datanodes = pipeline.getMachines();
for (DatanodeDetails datanodeDetails : datanodes) {
GenericTestUtils.waitFor(() -> ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails), 500,
15 * 1000);
//double check if it's really closed (waitFor also throws an exception)
Assert.assertTrue(ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails));
}
index++;
}
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
ReplicationFactor factor =
type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE :
ReplicationFactor.THREE;
return objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey(keyName, size, type, factor);
}
private void validateData(String keyName, byte[] data) throws Exception {
byte[] readData = new byte[data.length];
OzoneInputStream is =
objectStore.getVolume(volumeName).getBucket(bucketName)
.readKey(keyName);
is.read(readData);
MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
sha1.update(data);
MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
sha2.update(readData);
Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
is.close();
}
@Test
public void testBlockWriteViaRatis() throws Exception {
String keyName = "ratis";
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
byte[] data =
fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
key.write(data);
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.THREE)
.setKeyName(keyName).build();
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
// Again Write the Data. This will throw an exception which will be handled
// and new blocks will be allocated
key.write(data);
key.flush();
// The write will fail but exception will be handled and length will be
// updated correctly in OzoneManager once the steam is closed
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
//we have written two blocks
Assert.assertEquals(2, keyLocationInfos.size());
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
Assert.assertEquals(data.length - (data.length % chunkSize),
omKeyLocationInfo.getLength());
Assert.assertEquals(data.length + (data.length % chunkSize),
keyLocationInfos.get(1).getLength());
Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
String dataString = new String(data);
dataString.concat(dataString);
validateData(keyName, dataString.getBytes());
}
}

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
@ -604,4 +608,21 @@ public final class ContainerTestHelper {
public static long getTestContainerID() {
return Time.getUtcTime();
}
public static boolean isContainerClosed(MiniOzoneCluster cluster,
long containerID, DatanodeDetails datanode) {
ContainerData containerData;
for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
if (datanode.equals(datanodeService.getDatanodeDetails())) {
Container container =
datanodeService.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
if (container != null) {
containerData = container.getContainerData();
return containerData.isClosed();
}
}
}
return false;
}
}

View File

@ -39,12 +39,12 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Assert;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -124,8 +124,8 @@ public class TestOmBlockVersioning {
// 1st update, version 0
OpenKeySession openKey = ozoneManager.openKey(keyArgs);
// explicitly set the keyLocation list before committing the key.
keyArgs.setLocationInfoList(
openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
.getBlocksLatestVersionOnly());
ozoneManager.commitKey(keyArgs, openKey.getId());
OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
@ -139,8 +139,8 @@ public class TestOmBlockVersioning {
//OmKeyLocationInfo locationInfo =
// ozoneManager.allocateBlock(keyArgs, openKey.getId());
// explicitly set the keyLocation list before committing the key.
keyArgs.setLocationInfoList(
openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
.getBlocksLatestVersionOnly());
ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs);
@ -150,10 +150,14 @@ public class TestOmBlockVersioning {
// 3rd update, version 2
openKey = ozoneManager.openKey(keyArgs);
// this block will be appended to the latest version of version 2.
OmKeyLocationInfo locationInfo =
ozoneManager.allocateBlock(keyArgs, openKey.getId());
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
List<OmKeyLocationInfo> locationInfoList =
openKey.getKeyInfo().getLatestVersionLocations()
.getBlocksLatestVersionOnly();
Assert.assertTrue(locationInfoList.size() == 1);
locationInfoList.add(locationInfo);
keyArgs.setLocationInfoList(locationInfoList);
ozoneManager.commitKey(keyArgs, openKey.getId());