From 6883fe860f484da2b835f9f57307b84165ed7f6f Mon Sep 17 00:00:00 2001 From: Tsz Wo Nicholas Sze Date: Tue, 4 Sep 2018 17:10:10 -0700 Subject: [PATCH] HDDS-383. Ozone Client should discard preallocated blocks from closed containers. Contributed by Shashikant Banerjee --- .../client/io/ChunkGroupOutputStream.java | 86 ++++++++++++------- .../hadoop/ozone/om/helpers/OmKeyInfo.java | 4 + .../TestCloseContainerHandlingByClient.java | 64 +++++++++----- 3 files changed, 102 insertions(+), 52 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 21406b52c94..3742a9a5d18 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -53,6 +53,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.ListIterator; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -81,7 +82,6 @@ public class ChunkGroupOutputStream extends OutputStream { private final int chunkSize; private final String requestID; private boolean closed; - private List locationInfoList; private final RetryPolicy retryPolicy; /** * A constructor for testing purpose only. @@ -97,7 +97,6 @@ public class ChunkGroupOutputStream extends OutputStream { chunkSize = 0; requestID = null; closed = false; - locationInfoList = null; retryPolicy = null; } @@ -118,9 +117,16 @@ public class ChunkGroupOutputStream extends OutputStream { return streamEntries; } - @VisibleForTesting - public long getOpenID() { - return openID; + public List getLocationInfoList() { + List locationInfoList = new ArrayList<>(); + for (ChunkOutputStreamEntry streamEntry : streamEntries) { + OmKeyLocationInfo info = + new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) + .setShouldCreateContainer(false) + .setLength(streamEntry.currentPosition).setOffset(0).build(); + locationInfoList.add(info); + } + return locationInfoList; } public ChunkGroupOutputStream( @@ -146,7 +152,6 @@ public class ChunkGroupOutputStream extends OutputStream { this.xceiverClientManager = xceiverClientManager; this.chunkSize = chunkSize; this.requestID = requestId; - this.locationInfoList = new ArrayList<>(); this.retryPolicy = retryPolicy; LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); @@ -211,18 +216,6 @@ public class ChunkGroupOutputStream extends OutputStream { streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength())); - // reset the original length to zero here. It will be updated as and when - // the data gets written. - subKeyInfo.setLength(0); - locationInfoList.add(subKeyInfo); - } - - private void incrementBlockLength(int index, long length) { - if (locationInfoList != null) { - OmKeyLocationInfo locationInfo = locationInfoList.get(index); - long originalLength = locationInfo.getLength(); - locationInfo.setLength(originalLength + length); - } } @VisibleForTesting @@ -298,7 +291,6 @@ public class ChunkGroupOutputStream extends OutputStream { throw ioe; } } - incrementBlockLength(currentStreamIndex, writeLen); if (current.getRemaining() <= 0) { // since the current block is already written close the stream. handleFlushOrClose(true); @@ -316,12 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream { ContainerProtos.GetCommittedBlockLengthResponseProto responseProto; RetryPolicy.RetryAction action; int numRetries = 0; - - // TODO : At this point of time, we also need to allocate new blocks - // from a different container and may need to nullify - // all the remaining pre-allocated blocks in case they were - // pre-allocated on the same container which got closed now.This needs - // caching the closed container list on the client itself. while (true) { try { responseProto = ContainerProtocolCalls @@ -366,6 +352,43 @@ public class ChunkGroupOutputStream extends OutputStream { } } + /** + * Discards the subsequent pre allocated blocks and removes the streamEntries + * from the streamEntries list for the container which is closed. + * @param containerID id of the closed container + */ + private void discardPreallocatedBlocks(long containerID) { + // currentStreamIndex < streamEntries.size() signifies that, there are still + // pre allocated blocks available. + if (currentStreamIndex < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex); + while (streamEntryIterator.hasNext()) { + if (streamEntryIterator.next().blockID.getContainerID() + == containerID) { + streamEntryIterator.remove(); + } + } + } + } + + /** + * It might be possible that the blocks pre allocated might never get written + * while the stream gets closed normally. In such cases, it would be a good + * idea to trim down the locationInfoList by removing the unused blocks if any + * so as only the used block info gets updated on OzoneManager during close. + */ + private void removeEmptyBlocks() { + if (currentStreamIndex < streamEntries.size()) { + ListIterator streamEntryIterator = + streamEntries.listIterator(currentStreamIndex); + while (streamEntryIterator.hasNext()) { + if (streamEntryIterator.next().currentPosition == 0) { + streamEntryIterator.remove(); + } + } + } + } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -396,7 +419,7 @@ public class ChunkGroupOutputStream extends OutputStream { || streamEntry.currentPosition != buffer.position()) { committedLength = getCommittedBlockLength(streamEntry); // update the length of the current stream - locationInfoList.get(streamIndex).setLength(committedLength); + streamEntry.currentPosition = committedLength; } if (buffer.position() > 0) { @@ -418,10 +441,12 @@ public class ChunkGroupOutputStream extends OutputStream { // written. Remove it from the current stream list. if (committedLength == 0) { streamEntries.remove(streamIndex); - locationInfoList.remove(streamIndex); Preconditions.checkArgument(currentStreamIndex != 0); currentStreamIndex -= 1; } + // discard subsequent pre allocated blocks from the streamEntries list + // from the closed container + discardPreallocatedBlocks(streamEntry.blockID.getContainerID()); } private boolean checkIfContainerIsClosed(IOException ioe) { @@ -433,7 +458,7 @@ public class ChunkGroupOutputStream extends OutputStream { } private long getKeyLength() { - return locationInfoList.parallelStream().mapToLong(e -> e.getLength()) + return streamEntries.parallelStream().mapToLong(e -> e.currentPosition) .sum(); } @@ -506,12 +531,11 @@ public class ChunkGroupOutputStream extends OutputStream { handleFlushOrClose(true); if (keyArgs != null) { // in test, this could be null - Preconditions.checkState(streamEntries.size() == locationInfoList.size()); + removeEmptyBlocks(); Preconditions.checkState(byteOffset == getKeyLength()); keyArgs.setDataSize(byteOffset); - keyArgs.setLocationInfoList(locationInfoList); + keyArgs.setLocationInfoList(getLocationInfoList()); omClient.commitKey(keyArgs, openID); - locationInfoList = null; } else { LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index f6e426539ad..50f4b17508a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -122,6 +122,7 @@ public final class OmKeyInfo { * @throws IOException */ public void updateLocationInfoList(List locationInfoList) { + long latestVersion = getLatestVersionLocations().getVersion(); OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); List currentList = keyLocationInfoGroup.getLocationList(); @@ -134,6 +135,9 @@ public final class OmKeyInfo { // might get closed. The diff of blocks between these two lists here // need to be garbage collected in case the ozone client dies. currentList.removeAll(latestVersionList); + // set each of the locationInfo object to the latest version + locationInfoList.stream().forEach(omKeyLocationInfo -> omKeyLocationInfo + .setCreateVersion(latestVersion)); currentList.addAll(locationInfoList); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 9f126333811..cf38982a6b6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -38,12 +38,10 @@ import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -247,9 +245,8 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites2() throws Exception { - String keyName = "standalone4"; - long dataLength = 0; + long dataLength; OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize); ChunkGroupOutputStream groupOutputStream = @@ -293,11 +290,10 @@ public class TestCloseContainerHandlingByClient { private void waitForContainerClose(String keyName, OzoneOutputStream outputStream, HddsProtos.ReplicationType type) throws Exception { - ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) outputStream.getOutputStream(); List locationInfoList = - getLocationInfos(groupOutputStream, keyName); + groupOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); List pipelineList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { @@ -335,6 +331,46 @@ public class TestCloseContainerHandlingByClient { } } + @Test + public void testDiscardPreallocatedBlocks() throws Exception { + String keyName = "discardpreallocatedblocks"; + OzoneOutputStream key = + createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + // With the initial size provided, it should have pre allocated 4 blocks + Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size()); + String dataString = fixedLengthString(keyString, (1 * blockSize)); + byte[] data = dataString.getBytes(); + key.write(data); + List locationInfos = + new ArrayList<>(groupOutputStream.getLocationInfoList()); + long containerID = locationInfos.get(0).getContainerID(); + List datanodes = + cluster.getStorageContainerManager().getScmContainerManager() + .getContainerWithPipeline(containerID).getPipeline().getMachines(); + Assert.assertEquals(1, datanodes.size()); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + dataString = fixedLengthString(keyString, (1 * blockSize)); + data = dataString.getBytes(); + key.write(data); + Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); + + // the 1st block got written. Now all the containers are closed, so the 2nd + // pre allocated block will be removed from the list and new block should + // have been allocated + Assert.assertTrue( + groupOutputStream.getLocationInfoList().get(0).getBlockID() + .equals(locationInfos.get(0).getBlockID())); + Assert.assertFalse( + groupOutputStream.getLocationInfoList().get(1).getBlockID() + .equals(locationInfos.get(1).getBlockID())); + key.close(); + } + private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { ReplicationFactor factor = @@ -344,20 +380,6 @@ public class TestCloseContainerHandlingByClient { .createKey(keyName, size, type, factor); } - private List getLocationInfos( - ChunkGroupOutputStream groupOutputStream, String keyName) - throws IOException { - long clientId = groupOutputStream.getOpenID(); - OMMetadataManager metadataManager = - cluster.getOzoneManager().getMetadataManager(); - byte[] openKey = metadataManager - .getOpenKeyBytes(volumeName, bucketName, keyName, clientId); - byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey); - OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf( - OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData)); - return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - } - private void validateData(String keyName, byte[] data) throws Exception { byte[] readData = new byte[data.length]; OzoneInputStream is = @@ -427,7 +449,7 @@ public class TestCloseContainerHandlingByClient { String dataString = fixedLengthString(keyString, (3 * chunkSize)); key.write(dataString.getBytes()); List locationInfos = - getLocationInfos(groupOutputStream, keyName); + groupOutputStream.getLocationInfoList(); long containerID = locationInfos.get(0).getContainerID(); List datanodes = cluster.getStorageContainerManager().getScmContainerManager()