diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java index 94a4b9486e0..eb215d63a46 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java @@ -102,4 +102,10 @@ public class ExcludeList { }); return excludeList; } + + public void clear() { + datanodes.clear(); + containerIds.clear(); + pipelineIds.clear(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 0d9529f8eae..c1f195f14ca 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -295,60 +295,66 @@ public class KeyOutputStream extends OutputStream { throws IOException { int succeededAllocates = 0; while (len > 0) { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - - // length(len) will be in int range if the call is happening through - // write API of blockOutputStream. Length can be in long range if it comes - // via Exception path. - int writeLen = Math.min((int)len, (int) current.getRemaining()); - long currentPos = current.getWrittenDataLength(); try { - if (retry) { - current.writeOnRetry(len); - } else { - current.write(b, off, writeLen); - offset += writeLen; + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. + try { + allocateNewBlock(currentStreamIndex); + succeededAllocates += 1; + } catch (IOException ioe) { + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + + " blocks for this write."); + throw ioe; + } } - } catch (IOException ioe) { - // for the current iteration, totalDataWritten - currentPos gives the - // amount of data already written to the buffer + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - // In the retryPath, the total data to be written will always be equal - // to or less than the max length of the buffer allocated. - // The len specified here is the combined sum of the data length of - // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); - int dataWritten = (int) (current.getWrittenDataLength() - currentPos); - writeLen = retry ? (int) len : dataWritten; - // In retry path, the data written is already accounted in offset. - if (!retry) { - offset += writeLen; + // length(len) will be in int range if the call is happening through + // write API of blockOutputStream. Length can be in long range if it comes + // via Exception path. + int writeLen = Math.min((int) len, (int) current.getRemaining()); + long currentPos = current.getWrittenDataLength(); + try { + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + offset += writeLen; + } + } catch (IOException ioe) { + // for the current iteration, totalDataWritten - currentPos gives the + // amount of data already written to the buffer + + // In the retryPath, the total data to be written will always be equal + // to or less than the max length of the buffer allocated. + // The len specified here is the combined sum of the data length of + // the buffers + Preconditions.checkState(!retry || len <= streamBufferMaxSize); + int dataWritten = (int) (current.getWrittenDataLength() - currentPos); + writeLen = retry ? (int) len : dataWritten; + // In retry path, the data written is already accounted in offset. + if (!retry) { + offset += writeLen; + } + LOG.debug("writeLen {}, total len {}", writeLen, len); + handleException(current, currentStreamIndex, ioe); } - LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, ioe); + if (current.getRemaining() <= 0) { + // since the current block is already written close the stream. + handleFlushOrClose(StreamAction.FULL); + } + len -= writeLen; + off += writeLen; + } catch (Exception e) { + markStreamClosed(); + throw e; } - if (current.getRemaining() <= 0) { - // since the current block is already written close the stream. - handleFlushOrClose(StreamAction.FULL); - } - len -= writeLen; - off += writeLen; } } @@ -365,7 +371,7 @@ public class KeyOutputStream extends OutputStream { // pre allocated blocks available. // This will be called only to discard the next subsequent unused blocks - // in the sreamEntryList. + // in the streamEntryList. if (streamIndex < streamEntries.size()) { ListIterator streamEntryIterator = streamEntries.listIterator(streamIndex); @@ -398,6 +404,20 @@ public class KeyOutputStream extends OutputStream { } } } + + private void cleanup() { + if (excludeList != null) { + excludeList.clear(); + excludeList = null; + } + if (bufferPool != null) { + bufferPool.clearBufferPool(); + } + + if (streamEntries != null) { + streamEntries.clear(); + } + } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -418,8 +438,7 @@ public class KeyOutputStream extends OutputStream { closedContainerException = checkIfContainerIsClosed(t); } PipelineID pipelineId = null; - long totalSuccessfulFlushedData = - streamEntry.getTotalAckDataLength(); + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); @@ -450,8 +469,8 @@ public class KeyOutputStream extends OutputStream { if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), - null, streamIndex + 1); + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, + streamIndex + 1); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the @@ -475,6 +494,11 @@ public class KeyOutputStream extends OutputStream { } } + private void markStreamClosed() { + cleanup(); + closed = true; + } + private void handleRetry(IOException exception, long len) throws IOException { RetryPolicy.RetryAction action; try { @@ -586,40 +610,46 @@ public class KeyOutputStream extends OutputStream { return; } while (true) { - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); - if (entry != null) { - try { - Collection failedServers = entry.getFailedServers(); - // failed servers can be null in case there is no data written in the - // stream - if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); - } - switch (op) { - case CLOSE: - entry.close(); - break; - case FULL: - if (entry.getRemaining() == 0) { - entry.close(); - currentStreamIndex++; + try { + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + if (entry != null) { + try { + Collection failedServers = + entry.getFailedServers(); + // failed servers can be null in case there is no data written in the + // stream + if (failedServers != null && !failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); } - break; - case FLUSH: - entry.flush(); - break; - default: - throw new IOException("Invalid Operation"); + switch (op) { + case CLOSE: + entry.close(); + break; + case FULL: + if (entry.getRemaining() == 0) { + entry.close(); + currentStreamIndex++; + } + break; + case FLUSH: + entry.flush(); + break; + default: + throw new IOException("Invalid Operation"); + } + } catch (IOException ioe) { + handleException(entry, streamIndex, ioe); + continue; } - } catch (IOException ioe) { - handleException(entry, streamIndex, ioe); - continue; } + break; + } catch (Exception e) { + markStreamClosed(); + throw e; } - break; } } @@ -658,7 +688,7 @@ public class KeyOutputStream extends OutputStream { } catch (IOException ioe) { throw ioe; } finally { - bufferPool.clearBufferPool(); + cleanup(); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 32bef12ae70..399b977d333 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -189,6 +189,7 @@ public class TestBlockOutputStream { // flush ensures watchForCommit updates the total length acknowledged Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); @@ -208,7 +209,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -263,6 +264,7 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -302,7 +304,7 @@ public class TestBlockOutputStream { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -397,6 +399,7 @@ public class TestBlockOutputStream { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -454,6 +457,7 @@ public class TestBlockOutputStream { blockOutputStream.getCommitIndex2flushedDataMap().size()); Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); key.close(); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -471,7 +475,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -536,6 +540,7 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -570,7 +575,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -638,6 +643,7 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -673,7 +679,7 @@ public class TestBlockOutputStream { metrics.getTotalOpCount()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index f228dad69ef..89a2af966a1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -234,6 +234,7 @@ public class TestBlockOutputStreamWithFailures { // and one flush for partial chunk key.flush(); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); @@ -249,7 +250,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -372,6 +373,7 @@ public class TestBlockOutputStreamWithFailures { key.flush(); Assert.assertEquals(2, raftClient.getCommitInfoMap().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -382,7 +384,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -515,13 +517,14 @@ public class TestBlockOutputStreamWithFailures { // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit + + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -538,7 +541,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -637,6 +640,7 @@ public class TestBlockOutputStreamWithFailures { // and one flush for partial chunk key.flush(); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -652,7 +656,6 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -663,7 +666,7 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -774,7 +777,6 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -785,7 +787,7 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -911,6 +913,7 @@ public class TestBlockOutputStreamWithFailures { Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // commitInfoMap will remain intact as there is no server failure Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); // make sure the bufferPool is empty @@ -919,7 +922,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1046,6 +1049,7 @@ public class TestBlockOutputStreamWithFailures { Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -1054,7 +1058,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1071,6 +1075,7 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); @@ -1198,7 +1203,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 381cf14e4ce..5cb6dbc0474 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -19,10 +19,12 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -66,6 +68,7 @@ public class TestOzoneClientRetriesOnException { private String volumeName; private String bucketName; private String keyString; + private XceiverClientManager xceiverClientManager; /** * Create a MiniDFSCluster for testing. @@ -84,8 +87,6 @@ public class TestOzoneClientRetriesOnException { conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); - conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) @@ -100,6 +101,7 @@ public class TestOzoneClientRetriesOnException { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); keyString = UUID.randomUUID().toString(); volumeName = "testblockoutputstreamwithretries"; bucketName = volumeName; @@ -152,8 +154,9 @@ public class TestOzoneClientRetriesOnException { .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); - key.close(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + key.close(); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); validateData(keyName, data1); } @@ -171,13 +174,8 @@ public class TestOzoneClientRetriesOnException { byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); - key.write(data1); - - OutputStream stream = entries.get(0).getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - List pipelineList = new ArrayList<>(); long containerID; + List containerList = new ArrayList<>(); for (BlockOutputStreamEntry entry : entries) { containerID = entry.getBlockID().getContainerID(); ContainerInfo container = @@ -186,18 +184,40 @@ public class TestOzoneClientRetriesOnException { Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); - pipelineList.add(pipeline.getId()); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + if (!containerList.contains(containerID)) { + xceiverClient.sendCommand(ContainerTestHelper + .getCreateContainerRequest(containerID, pipeline)); + } + xceiverClientManager.releaseClient(xceiverClient, false); } - ContainerTestHelper.waitForPipelineClose(key, cluster, false); + key.write(data1); + OutputStream stream = entries.get(0).getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + ContainerTestHelper.waitForContainerClose(key, cluster); try { key.write(data1); + Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof GroupMismatchException); + .getIoException()) instanceof ContainerNotOpenException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " + "allowed retries number: 3")); } + try { + key.flush(); + Assert.fail("Expected exception not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains("Stream is closed")); + } + try { + key.close(); + } catch (IOException ioe) { + Assert.fail("Expected should not be thrown"); + } } private OzoneOutputStream createKey(String keyName, ReplicationType type, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java new file mode 100644 index 00000000000..823ee485d52 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.*; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.GroupMismatchException; +import org.apache.ratis.protocol.RaftRetryFailureException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * This class verifies the watchForCommit Handling by client. + */ +public class TestWatchForCommit { + + private MiniOzoneCluster cluster; + private OzoneConfiguration conf; + private OzoneClient client; + private ObjectStore objectStore; + private String volumeName; + private String bucketName; + private String keyString; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static String containerOwner = "OZONE"; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + private void startCluster(OzoneConfiguration conf) throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + + conf.setQuietMode(false); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "watchforcommithandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + storageContainerLocationClient = cluster + .getStorageContainerLocationClient(); + } + + + /** + * Shutdown MiniDFSCluster. + */ + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + @Test + public void testWatchForCommitWithKeyWrite() throws Exception { + // in this case, watch request should fail with RaftRetryFailureException + // and will be captured in keyOutputStream and the failover will happen + // to a different block + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); + startCluster(conf); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // acked by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This will call + // watchForCommit again. Since the commit will happen 2 way, the + // commitInfoMap will get updated for servers which are alive + + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + .getIoException()) instanceof RaftRetryFailureException); + // Make sure the retryCount is reset after the exception is handled + Assert.assertTrue(keyOutputStream.getRetryCount() == 0); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + shutdown(); + } + + @Test + public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, + containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + try { + // just watch for a lo index which in not updated in the commitInfo Map + client.watchForCommit(index + 1, 3000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + HddsClientUtils.checkForException(e) instanceof TimeoutException); + } + // After releasing the client, this connection should be closed + // and any container operations should fail + clientManager.releaseClient(client, false); + shutdown(); + } + + @Test + public void testWatchForCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, + containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This wi + try { + // just watch for a lo index which in not updated in the commitInfo Map + client.watchForCommit(index + 1, 20000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof RaftRetryFailureException); + } + clientManager.releaseClient(client, false); + shutdown(); + } + + @Test + public void test2WayCommitForRetryfailure() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, + containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = client.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + client.watchForCommit(reply.getLogIndex(), 20000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(client, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert.assertTrue( + logCapturer.getOutput().contains("RaftRetryFailureException")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void test2WayCommitForTimeoutException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, + containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + XceiverClientReply reply = + client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + reply = client.sendCommandAsync(ContainerTestHelper + .getCloseContainer(pipeline, + container1.getContainerInfo().getContainerID())); + reply.getResponse().get(); + client.watchForCommit(reply.getLogIndex(), 3000); + + // commitInfo Map will be reduced to 2 here + Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); + clientManager.releaseClient(client, false); + Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); + Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); + Assert + .assertTrue(logCapturer.getOutput().contains("Committed by majority")); + logCapturer.stopCapturing(); + shutdown(); + } + + @Test + public void testWatchForCommitForGroupMismatchException() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + + // mark the node stale early so that pipeline gets destroyed quickly + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + startCluster(conf); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + ContainerWithPipeline container1 = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, + containerOwner); + XceiverClientSpi client = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, client.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + client.getPipeline()); + Pipeline pipeline = client.getPipeline(); + XceiverClientRatis ratisClient = (XceiverClientRatis) client; + long containerId = container1.getContainerInfo().getContainerID(); + XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper + .getCreateContainerRequest(containerId, client.getPipeline())); + reply.getResponse().get(); + Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); + List pipelineList = new ArrayList<>(); + pipelineList.add(pipeline); + ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); + try { + // just watch for a lo index which in not updated in the commitInfo Map + //client.watchForCommit(reply.getLogIndex() + 1, 20000); + reply = client.sendCommandAsync(ContainerTestHelper + .getCreateContainerRequest(containerId, client.getPipeline())); + reply.getResponse().get(); + Assert.fail("Expected exception not thrown"); + } catch(Exception e) { + Assert.assertTrue(HddsClientUtils + .checkForException(e) instanceof GroupMismatchException); + } + clientManager.releaseClient(client, false); + shutdown(); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} + diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 93807b4c2cd..a1fd17ccd63 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -727,7 +727,9 @@ public final class ContainerTestHelper { keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { - containerIdList.add(info.getContainerID()); + long id = info.getContainerID(); + if (!containerIdList.contains(id)) + containerIdList.add(id); } Assert.assertTrue(!containerIdList.isEmpty()); waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));