diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 00c4d027159..3a92cf475a1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,15 +261,24 @@ public class KeyOutputStream extends OutputStream { if (!retryFailure) { closedContainerException = checkIfContainerIsClosed(t); } - PipelineID pipelineId = null; + Pipeline pipeline = streamEntry.getPipeline(); + PipelineID pipelineId = pipeline.getId(); long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData(); - LOG.debug( - "Encountered exception {}. The last committed block length is {}, " - + "uncommitted data length is {} retry count {}", exception, - totalSuccessfulFlushedData, bufferedDataLen, retryCount); + if (closedContainerException) { + LOG.debug( + "Encountered exception {}. The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + totalSuccessfulFlushedData, bufferedDataLen, retryCount); + } else { + LOG.warn( + "Encountered exception {} on the pipeline {}. " + + "The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount); + } Preconditions.checkArgument( bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize()); Preconditions.checkArgument( @@ -282,8 +293,8 @@ public class KeyOutputStream extends OutputStream { if (closedContainerException) { excludeList.addConatinerId(ContainerID.valueof(containerId)); } else if (retryFailure || t instanceof TimeoutException - || t instanceof GroupMismatchException) { - pipelineId = streamEntry.getPipeline().getId(); + || t instanceof GroupMismatchException + || t instanceof NotReplicatedException) { excludeList.addPipeline(pipelineId); } // just clean up the current stream. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index dfccb98ea02..7a69e273821 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.ratis.protocol.GroupMismatchException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.junit.After; import org.junit.Assert; @@ -75,7 +76,8 @@ public class TestBlockOutputStreamWithFailures { * * @throws IOException */ - @Before public void init() throws Exception { + @Before + public void init() throws Exception { chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; @@ -110,13 +112,15 @@ public class TestBlockOutputStreamWithFailures { /** * Shutdown MiniDFSCluster. */ - @After public void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } } - @Test public void testWatchForCommitWithCloseContainerException() + @Test + public void testWatchForCommitWithCloseContainerException() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -256,7 +260,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testWatchForCommitDatanodeFailure() throws Exception { + @Test + public void testWatchForCommitDatanodeFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -388,7 +393,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void test2DatanodesFailure() throws Exception { + @Test + public void test2DatanodesFailure() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -494,8 +500,15 @@ public class TestBlockOutputStreamWithFailures { // rewritten plus one partial chunk plus two putBlocks for flushSize // and one flush for partial chunk key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream - .getIoException()) instanceof RaftRetryFailureException); + + // Since, 2 datanodes went down, if the pipeline gets destroyed quickly, + // it will hit GroupMismatchException else, it will fail with + // RaftRetryFailureException + Assert.assertTrue((HddsClientUtils. + checkForException(blockOutputStream + .getIoException()) instanceof RaftRetryFailureException) + || HddsClientUtils.checkForException( + blockOutputStream.getIoException()) instanceof GroupMismatchException); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit @@ -524,7 +537,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, data1); } - @Test public void testFailureWithPrimeSizedData() throws Exception { + @Test + public void testFailureWithPrimeSizedData() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -644,7 +658,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testExceptionDuringClose() throws Exception { + @Test + public void testExceptionDuringClose() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -758,7 +773,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception { + @Test + public void testWatchForCommitWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -898,7 +914,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception { + @Test + public void testDatanodeFailureWithSingleNodeRatis() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount = @@ -1037,7 +1054,8 @@ public class TestBlockOutputStreamWithFailures { validateData(keyName, dataString.concat(dataString).getBytes()); } - @Test public void testDatanodeFailureWithPreAllocation() throws Exception { + @Test + public void testDatanodeFailureWithPreAllocation() throws Exception { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long writeChunkCount =