diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 78f03d21333..c04105c3843 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -96,6 +96,7 @@ public class KeyOutputStream extends OutputStream { private ExcludeList excludeList; private final RetryPolicy retryPolicy; private int retryCount; + private long offset; /** * A constructor for testing purpose only. */ @@ -121,6 +122,7 @@ public class KeyOutputStream extends OutputStream { .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; retryCount = 0; + offset = 0; } @VisibleForTesting @@ -149,6 +151,10 @@ public class KeyOutputStream extends OutputStream { return locationInfoList; } + @VisibleForTesting + public int getRetryCount() { + return retryCount; + } @SuppressWarnings("parameternumber") public KeyOutputStream(OpenKeySession handler, @@ -316,6 +322,7 @@ public class KeyOutputStream extends OutputStream { current.writeOnRetry(len); } else { current.write(b, off, writeLen); + offset += writeLen; } } catch (IOException ioe) { // for the current iteration, totalDataWritten - currentPos gives the @@ -326,8 +333,12 @@ public class KeyOutputStream extends OutputStream { // The len specified here is the combined sum of the data length of // the buffers Preconditions.checkState(!retry || len <= streamBufferMaxSize); - writeLen = retry ? (int) len : - (int) (current.getWrittenDataLength() - currentPos); + int dataWritten = (int) (current.getWrittenDataLength() - currentPos); + writeLen = retry ? (int) len : dataWritten; + // In retry path, the data written is already accounted in offset. + if (!retry) { + offset += writeLen; + } LOG.debug("writeLen {}, total len {}", writeLen, len); handleException(current, currentStreamIndex, ioe); } @@ -345,20 +356,24 @@ public class KeyOutputStream extends OutputStream { * from the streamEntries list for the container which is closed. * @param containerID id of the closed container * @param pipelineId id of the associated pipeline + * @param streamIndex index of the stream */ private void discardPreallocatedBlocks(long containerID, - PipelineID pipelineId) { - // currentStreamIndex < streamEntries.size() signifies that, there are still + PipelineID pipelineId, int streamIndex) { + // streamIndex < streamEntries.size() signifies that, there are still // pre allocated blocks available. - if (currentStreamIndex < streamEntries.size()) { + + // This will be called only to discard the next subsequent unused blocks + // in the sreamEntryList. + if (streamIndex < streamEntries.size()) { ListIterator streamEntryIterator = - streamEntries.listIterator(currentStreamIndex); + streamEntries.listIterator(streamIndex); while (streamEntryIterator.hasNext()) { BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); + Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0); if (((pipelineId != null && streamEntry.getPipeline().getId() .equals(pipelineId)) || (containerID != -1 - && streamEntry.getBlockID().getContainerID() == containerID)) - && streamEntry.getCurrentPosition() == 0) { + && streamEntry.getBlockID().getContainerID() == containerID))) { streamEntryIterator.remove(); } } @@ -396,7 +411,7 @@ public class KeyOutputStream extends OutputStream { private void handleException(BlockOutputStreamEntry streamEntry, int streamIndex, IOException exception) throws IOException { Throwable t = checkForException(exception); - boolean retryFailure = checkForRetryFailure(exception); + boolean retryFailure = checkForRetryFailure(t); boolean closedContainerException = false; if (!retryFailure) { closedContainerException = checkIfContainerIsClosed(t); @@ -411,16 +426,14 @@ public class KeyOutputStream extends OutputStream { + "uncommitted data length is {}", exception, totalSuccessfulFlushedData, bufferedDataLen); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); - Preconditions.checkArgument( - streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData - == bufferedDataLen); + Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } - if (checkIfContainerIsClosed(t)) { + if (closedContainerException) { excludeList.addConatinerId(ContainerID.valueof(containerId)); } else if (retryFailure || t instanceof TimeoutException) { pipelineId = streamEntry.getPipeline().getId(); @@ -428,22 +441,15 @@ public class KeyOutputStream extends OutputStream { } // just clean up the current stream. streamEntry.cleanup(retryFailure); - if (bufferedDataLen > 0) { - // If the data is still cached in the underlying stream, we need to - // allocate new block and write this data in the datanode. - currentStreamIndex += 1; - handleRetry(exception, bufferedDataLen); - } - if (totalSuccessfulFlushedData == 0) { - streamEntries.remove(streamIndex); - currentStreamIndex -= 1; - } + // discard all sunsequent blocks the containers and pipelines which + // are in the exclude list so that, the very next retry should never + // write data on the closed container/pipeline if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), - null); + null, streamIndex + 1); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the @@ -451,7 +457,19 @@ public class KeyOutputStream extends OutputStream { // Next block allocation will happen with excluding this specific pipeline // This will ensure if 2 way commit happens , it cannot span over multiple // blocks - discardPreallocatedBlocks(-1, pipelineId); + discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1); + } + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + currentStreamIndex += 1; + handleRetry(exception, bufferedDataLen); + // reset the retryCount after handling the exception + retryCount = 0; + } + if (totalSuccessfulFlushedData == 0) { + streamEntries.remove(streamIndex); + currentStreamIndex -= 1; } } @@ -618,7 +636,9 @@ public class KeyOutputStream extends OutputStream { if (keyArgs != null) { // in test, this could be null removeEmptyBlocks(); - keyArgs.setDataSize(getKeyLength()); + long length = getKeyLength(); + Preconditions.checkArgument(offset == length); + keyArgs.setDataSize(length); keyArgs.setLocationInfoList(getLocationInfoList()); // When the key is multipart upload part file upload, we should not // commit the key, as this is not an actual key, this is a just a diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 671f16caa11..54cdff02287 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -235,6 +236,8 @@ public class TestBlockOutputStreamWithFailures { Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // commitInfoMap will remain intact as there is no server failure Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); // now close the stream, It will update the ack length after watchForCommit @@ -372,6 +375,8 @@ public class TestBlockOutputStreamWithFailures { key.close(); Assert .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); @@ -506,6 +511,8 @@ public class TestBlockOutputStreamWithFailures { key.flush(); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof AlreadyClosedException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -534,11 +541,698 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, data1); } + @Test + public void testFailureWithPrimeSizedData() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = 167; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + == pendingWriteChunkCount + 1); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + == pendingPutBlockCount); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 1, + metrics.getTotalOpCount()); + + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(0, + blockOutputStream.getTotalDataFlushedLength()); + + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); + + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // Close the containers on the Datanode and write more data + ContainerTestHelper.waitForContainerClose(key, cluster); + key.write(data1); + + // As a part of handling the exception, 2 failed writeChunks will be + // rewritten plus 1 putBlocks for flush + // and one flush for partial chunk + key.flush(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof ContainerNotOpenException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + + // commitInfoMap will remain intact as there is no server failure + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 6, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 9, + metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + // Written the same data twice + String dataString = new String(data1, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void testExceptionDuringClose() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = 167; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + == pendingWriteChunkCount + 1); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + == pendingPutBlockCount); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 1, + metrics.getTotalOpCount()); + + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(0, + blockOutputStream.getTotalDataFlushedLength()); + + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0); + + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // Close the containers on the Datanode and write more data + ContainerTestHelper.waitForContainerClose(key, cluster); + key.write(data1); + + // commitInfoMap will remain intact as there is no server failure + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // now close the stream, It will hit exception + key.close(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof ContainerNotOpenException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 6, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 9, + metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + // Written the same data twice + String dataString = new String(data1, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void testWatchForCommitWithSingleNodeRatis() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 4 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // flush is a sync call, all pending operations will complete + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + // Close the containers on the Datanode and write more data + ContainerTestHelper.waitForContainerClose(key, cluster); + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof ContainerNotOpenException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // commitInfoMap will remain intact as there is no server failure + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + // Written the same data twice + String dataString = new String(data1, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void testDatanodeFailureWithSingleNodeRatis() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes at least putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast flushSize worth of data buffer + // where each entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + + // again write data with more than max buffer limit. This will call + // watchForCommit again. No write will happen in the current block and + // data will be rewritten to the next block. + + key.write(data1); + + key.flush(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof AlreadyClosedException); + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // in total, there are 14 full write chunks, 5 before the failure injection, + // 4 chunks after which we detect the failure and then 5 again on the next + // block + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // 3 flushes at flushSize boundaries before failure injection + 2 + // flush failed + 3 more flushes for the next block + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + // Written the same data twice + String dataString = new String(data1, UTF_8); + cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void testDatanodeFailureWithPreAllocation() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, 3 * blockSize, + ReplicationFactor.ONE); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes at least putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast flushSize worth of data buffer + // where each entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + + // again write data with more than max buffer limit. This will call + // watchForCommit again. No write will happen and + + key.write(data1); + + key.flush(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof AlreadyClosedException); + + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // in total, there are 14 full write chunks, 5 before the failure injection, + // 4 chunks after which we detect the failure and then 5 again on the next + // block + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + + // 3 flushes at flushSize boundaries before failure injection + 2 + // flush failed + 3 more flushes for the next block + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + // Written the same data twice + String dataString = new String(data1, UTF_8); + cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { - return ContainerTestHelper - .createKey(keyName, type, size, objectStore, volumeName, bucketName); + return createKey(keyName, type, size, ReplicationFactor.THREE); } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size, ReplicationFactor factor) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, factor, size, objectStore, volumeName, + bucketName); + } + private void validateData(String keyName, byte[] data) throws Exception { ContainerTestHelper .validateData(keyName, data, objectStore, volumeName, bucketName); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7f2d93d0710..0b618a0d705 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -700,6 +700,15 @@ public final class ContainerTestHelper { .createKey(keyName, size, type, factor, new HashMap<>()); } + public static OzoneOutputStream createKey(String keyName, + ReplicationType type, + org.apache.hadoop.hdds.client.ReplicationFactor factor, long size, + ObjectStore objectStore, String volumeName, String bucketName) + throws Exception { + return objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, size, type, factor, new HashMap<>()); + } + public static void validateData(String keyName, byte[] data, ObjectStore objectStore, String volumeName, String bucketName) throws Exception {