diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java index eb215d63a46..94a4b9486e0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java @@ -102,10 +102,4 @@ public class ExcludeList { }); return excludeList; } - - public void clear() { - datanodes.clear(); - containerIds.clear(); - pipelineIds.clear(); - } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c1f195f14ca..0d9529f8eae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -295,66 +295,60 @@ public class KeyOutputStream extends OutputStream { throws IOException { int succeededAllocates = 0; while (len > 0) { - try { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " - + "allocated " + succeededAllocates - + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); - - // length(len) will be in int range if the call is happening through - // write API of blockOutputStream. Length can be in long range if it comes - // via Exception path. - int writeLen = Math.min((int) len, (int) current.getRemaining()); - long currentPos = current.getWrittenDataLength(); + if (streamEntries.size() <= currentStreamIndex) { + Preconditions.checkNotNull(omClient); + // allocate a new block, if a exception happens, log an error and + // throw exception to the caller directly, and the write fails. try { - if (retry) { - current.writeOnRetry(len); - } else { - current.write(b, off, writeLen); - offset += writeLen; - } + allocateNewBlock(currentStreamIndex); + succeededAllocates += 1; } catch (IOException ioe) { - // for the current iteration, totalDataWritten - currentPos gives the - // amount of data already written to the buffer - - // In the retryPath, the total data to be written will always be equal - // to or less than the max length of the buffer allocated. - // The len specified here is the combined sum of the data length of - // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); - int dataWritten = (int) (current.getWrittenDataLength() - currentPos); - writeLen = retry ? (int) len : dataWritten; - // In retry path, the data written is already accounted in offset. - if (!retry) { - offset += writeLen; - } - LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, ioe); + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + " blocks for this write."); + throw ioe; } - if (current.getRemaining() <= 0) { - // since the current block is already written close the stream. - handleFlushOrClose(StreamAction.FULL); - } - len -= writeLen; - off += writeLen; - } catch (Exception e) { - markStreamClosed(); - throw e; } + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); + + // length(len) will be in int range if the call is happening through + // write API of blockOutputStream. Length can be in long range if it comes + // via Exception path. + int writeLen = Math.min((int)len, (int) current.getRemaining()); + long currentPos = current.getWrittenDataLength(); + try { + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + offset += writeLen; + } + } catch (IOException ioe) { + // for the current iteration, totalDataWritten - currentPos gives the + // amount of data already written to the buffer + + // In the retryPath, the total data to be written will always be equal + // to or less than the max length of the buffer allocated. + // The len specified here is the combined sum of the data length of + // the buffers + Preconditions.checkState(!retry || len <= streamBufferMaxSize); + int dataWritten = (int) (current.getWrittenDataLength() - currentPos); + writeLen = retry ? (int) len : dataWritten; + // In retry path, the data written is already accounted in offset. + if (!retry) { + offset += writeLen; + } + LOG.debug("writeLen {}, total len {}", writeLen, len); + handleException(current, currentStreamIndex, ioe); + } + if (current.getRemaining() <= 0) { + // since the current block is already written close the stream. + handleFlushOrClose(StreamAction.FULL); + } + len -= writeLen; + off += writeLen; } } @@ -371,7 +365,7 @@ public class KeyOutputStream extends OutputStream { // pre allocated blocks available. // This will be called only to discard the next subsequent unused blocks - // in the streamEntryList. + // in the sreamEntryList. if (streamIndex < streamEntries.size()) { ListIterator streamEntryIterator = streamEntries.listIterator(streamIndex); @@ -404,20 +398,6 @@ public class KeyOutputStream extends OutputStream { } } } - - private void cleanup() { - if (excludeList != null) { - excludeList.clear(); - excludeList = null; - } - if (bufferPool != null) { - bufferPool.clearBufferPool(); - } - - if (streamEntries != null) { - streamEntries.clear(); - } - } /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -438,7 +418,8 @@ public class KeyOutputStream extends OutputStream { closedContainerException = checkIfContainerIsClosed(t); } PipelineID pipelineId = null; - long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); + long totalSuccessfulFlushedData = + streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); @@ -469,8 +450,8 @@ public class KeyOutputStream extends OutputStream { if (closedContainerException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container - discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, - streamIndex + 1); + discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), + null, streamIndex + 1); } else { // In case there is timeoutException or Watch for commit happening over // majority or the client connection failure to the leader in the @@ -494,11 +475,6 @@ public class KeyOutputStream extends OutputStream { } } - private void markStreamClosed() { - cleanup(); - closed = true; - } - private void handleRetry(IOException exception, long len) throws IOException { RetryPolicy.RetryAction action; try { @@ -610,46 +586,40 @@ public class KeyOutputStream extends OutputStream { return; } while (true) { - try { - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); - if (entry != null) { - try { - Collection failedServers = - entry.getFailedServers(); - // failed servers can be null in case there is no data written in the - // stream - if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); - } - switch (op) { - case CLOSE: - entry.close(); - break; - case FULL: - if (entry.getRemaining() == 0) { - entry.close(); - currentStreamIndex++; - } - break; - case FLUSH: - entry.flush(); - break; - default: - throw new IOException("Invalid Operation"); - } - } catch (IOException ioe) { - handleException(entry, streamIndex, ioe); - continue; + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + if (entry != null) { + try { + Collection failedServers = entry.getFailedServers(); + // failed servers can be null in case there is no data written in the + // stream + if (failedServers != null && !failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); } + switch (op) { + case CLOSE: + entry.close(); + break; + case FULL: + if (entry.getRemaining() == 0) { + entry.close(); + currentStreamIndex++; + } + break; + case FLUSH: + entry.flush(); + break; + default: + throw new IOException("Invalid Operation"); + } + } catch (IOException ioe) { + handleException(entry, streamIndex, ioe); + continue; } - break; - } catch (Exception e) { - markStreamClosed(); - throw e; } + break; } } @@ -688,7 +658,7 @@ public class KeyOutputStream extends OutputStream { } catch (IOException ioe) { throw ioe; } finally { - cleanup(); + bufferPool.clearBufferPool(); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 399b977d333..32bef12ae70 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -189,7 +189,6 @@ public class TestBlockOutputStream { // flush ensures watchForCommit updates the total length acknowledged Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); @@ -209,7 +208,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -264,7 +263,6 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // flush is a sync call, all pending operations will complete Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -304,7 +302,7 @@ public class TestBlockOutputStream { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -399,7 +397,6 @@ public class TestBlockOutputStream { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -457,7 +454,6 @@ public class TestBlockOutputStream { blockOutputStream.getCommitIndex2flushedDataMap().size()); Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); key.close(); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); @@ -475,7 +471,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -540,7 +536,6 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -575,7 +570,7 @@ public class TestBlockOutputStream { .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -643,7 +638,6 @@ public class TestBlockOutputStream { // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); - Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -679,7 +673,7 @@ public class TestBlockOutputStream { metrics.getTotalOpCount()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 89a2af966a1..f228dad69ef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -234,7 +234,6 @@ public class TestBlockOutputStreamWithFailures { // and one flush for partial chunk key.flush(); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); @@ -250,7 +249,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -373,7 +372,6 @@ public class TestBlockOutputStreamWithFailures { key.flush(); Assert.assertEquals(2, raftClient.getCommitInfoMap().size()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -384,7 +382,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -517,14 +515,13 @@ public class TestBlockOutputStreamWithFailures { // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // now close the stream, It will update the ack length after watchForCommit - - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); key.close(); Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -541,7 +538,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); } @@ -640,7 +637,6 @@ public class TestBlockOutputStreamWithFailures { // and one flush for partial chunk key.flush(); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream .getIoException()) instanceof ContainerNotOpenException); // Make sure the retryCount is reset after the exception is handled @@ -656,6 +652,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -666,7 +663,7 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -777,6 +774,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -787,7 +785,7 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount()); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); // Written the same data twice String dataString = new String(data1, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes()); @@ -913,7 +911,6 @@ public class TestBlockOutputStreamWithFailures { Assert.assertTrue(keyOutputStream.getRetryCount() == 0); // commitInfoMap will remain intact as there is no server failure Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); // make sure the bufferPool is empty @@ -922,7 +919,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1049,7 +1046,6 @@ public class TestBlockOutputStreamWithFailures { Assert.assertEquals(1, raftClient.getCommitInfoMap().size()); // Make sure the retryCount is reset after the exception is handled Assert.assertTrue(keyOutputStream.getRetryCount() == 0); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert @@ -1058,7 +1054,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, @@ -1075,7 +1071,6 @@ public class TestBlockOutputStreamWithFailures { metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); // Written the same data twice String dataString = new String(data1, UTF_8); cluster.restartHddsDatanode(pipeline.getNodes().get(0), true); @@ -1203,7 +1198,7 @@ public class TestBlockOutputStreamWithFailures { Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); Assert.assertEquals(pendingPutBlockCount, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5cb6dbc0474..381cf14e4ce 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.client.rpc; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -68,7 +66,6 @@ public class TestOzoneClientRetriesOnException { private String volumeName; private String bucketName; private String keyString; - private XceiverClientManager xceiverClientManager; /** * Create a MiniDFSCluster for testing. @@ -87,6 +84,8 @@ public class TestOzoneClientRetriesOnException { conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); + conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s"); conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) @@ -101,7 +100,6 @@ public class TestOzoneClientRetriesOnException { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); objectStore = client.getObjectStore(); - xceiverClientManager = new XceiverClientManager(conf); keyString = UUID.randomUUID().toString(); volumeName = "testblockoutputstreamwithretries"; bucketName = volumeName; @@ -154,9 +152,8 @@ public class TestOzoneClientRetriesOnException { .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() .contains(pipeline.getId())); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); key.close(); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); validateData(keyName, data1); } @@ -174,8 +171,13 @@ public class TestOzoneClientRetriesOnException { byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); + key.write(data1); + + OutputStream stream = entries.get(0).getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + List pipelineList = new ArrayList<>(); long containerID; - List containerList = new ArrayList<>(); for (BlockOutputStreamEntry entry : entries) { containerID = entry.getBlockID().getContainerID(); ContainerInfo container = @@ -184,40 +186,18 @@ public class TestOzoneClientRetriesOnException { Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - if (!containerList.contains(containerID)) { - xceiverClient.sendCommand(ContainerTestHelper - .getCreateContainerRequest(containerID, pipeline)); - } - xceiverClientManager.releaseClient(xceiverClient, false); + pipelineList.add(pipeline.getId()); } - key.write(data1); - OutputStream stream = entries.get(0).getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - ContainerTestHelper.waitForContainerClose(key, cluster); + ContainerTestHelper.waitForPipelineClose(key, cluster, false); try { key.write(data1); - Assert.fail("Expected exception not thrown"); } catch (IOException ioe) { Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream - .getIoException()) instanceof ContainerNotOpenException); + .getIoException()) instanceof GroupMismatchException); Assert.assertTrue(ioe.getMessage().contains( "Retry request failed. retries get failed due to exceeded maximum " + "allowed retries number: 3")); } - try { - key.flush(); - Assert.fail("Expected exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getMessage().contains("Stream is closed")); - } - try { - key.close(); - } catch (IOException ioe) { - Assert.fail("Expected should not be thrown"); - } } private OzoneOutputStream createKey(String keyName, ReplicationType type, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java deleted file mode 100644 index 823ee485d52..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ /dev/null @@ -1,512 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.client.rpc; - -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.*; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.KeyOutputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.RaftRetryFailureException; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; - -/** - * This class verifies the watchForCommit Handling by client. - */ -public class TestWatchForCommit { - - private MiniOzoneCluster cluster; - private OzoneConfiguration conf; - private OzoneClient client; - private ObjectStore objectStore; - private String volumeName; - private String bucketName; - private String keyString; - private int chunkSize; - private int flushSize; - private int maxFlushSize; - private int blockSize; - private StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static String containerOwner = "OZONE"; - - /** - * Create a MiniDFSCluster for testing. - *

- * Ozone is made active by setting OZONE_ENABLED = true - * - * @throws IOException - */ - private void startCluster(OzoneConfiguration conf) throws Exception { - chunkSize = 100; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration( - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, - 1, TimeUnit.SECONDS); - - conf.setQuietMode(false); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(7) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) - .setStreamBufferMaxSize(maxFlushSize) - .setStreamBufferSizeUnit(StorageUnit.BYTES) - .build(); - cluster.waitForClusterToBeReady(); - //the easiest way to create an open container is creating a key - client = OzoneClientFactory.getClient(conf); - objectStore = client.getObjectStore(); - keyString = UUID.randomUUID().toString(); - volumeName = "watchforcommithandlingtest"; - bucketName = volumeName; - objectStore.createVolume(volumeName); - objectStore.getVolume(volumeName).createBucket(bucketName); - storageContainerLocationClient = cluster - .getStorageContainerLocationClient(); - } - - - /** - * Shutdown MiniDFSCluster. - */ - private void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - private String getKeyName() { - return UUID.randomUUID().toString(); - } - - @Test - public void testWatchForCommitWithKeyWrite() throws Exception { - // in this case, watch request should fail with RaftRetryFailureException - // and will be captured in keyOutputStream and the failover will happen - // to a different block - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2); - startCluster(conf); - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics( - ContainerProtos.Type.PutBlock); - long pendingWriteChunkCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.WriteChunk); - long pendingPutBlockCount = metrics.getContainerOpsMetrics( - ContainerProtos.Type.PutBlock); - long totalOpCount = metrics.getTotalOpCount(); - String keyName = getKeyName(); - OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 50; - // write data more than 1 chunk - byte[] data1 = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); - key.write(data1); - // since its hitting the full bufferCondition, it will call watchForCommit - // and completes atleast putBlock for first flushSize worth of data - Assert.assertTrue( - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) - <= pendingWriteChunkCount + 2); - Assert.assertTrue( - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) - <= pendingPutBlockCount + 1); - Assert.assertEquals(writeChunkCount + 4, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 2, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 6, - metrics.getTotalOpCount()); - Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); - - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - OutputStream stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - - // we have just written data more than flush Size(2 chunks), at this time - // buffer pool will have 3 buffers allocated worth of chunk size - - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); - // writtenDataLength as well flushedDataLength will be updated here - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - - Assert.assertEquals(maxFlushSize, - blockOutputStream.getTotalDataFlushedLength()); - - // since data equals to maxBufferSize is written, this will be a blocking - // call and hence will wait for atleast flushSize worth of data to get - // acked by all servers right here - Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); - - // watchForCommit will clean up atleast one entry from the map where each - // entry corresponds to flushSize worth of data - Assert.assertTrue( - blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); - - // Now do a flush. This will flush the data and update the flush length and - // the map. - key.flush(); - - Assert.assertEquals(pendingWriteChunkCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 5, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 3, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 8, - metrics.getTotalOpCount()); - - // Since the data in the buffer is already flushed, flush here will have - // no impact on the counters and data structures - - Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - - Assert.assertEquals(dataLength, - blockOutputStream.getTotalDataFlushedLength()); - // flush will make sure one more entry gets updated in the map - Assert.assertTrue( - blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); - - XceiverClientRatis raftClient = - (XceiverClientRatis) blockOutputStream.getXceiverClient(); - Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); - Pipeline pipeline = raftClient.getPipeline(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - // again write data with more than max buffer limit. This will call - // watchForCommit again. Since the commit will happen 2 way, the - // commitInfoMap will get updated for servers which are alive - - // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here - // once exception is hit - key.write(data1); - - // As a part of handling the exception, 4 failed writeChunks will be - // rewritten plus one partial chunk plus two putBlocks for flushSize - // and one flush for partial chunk - key.flush(); - Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream - .getIoException()) instanceof RaftRetryFailureException); - // Make sure the retryCount is reset after the exception is handled - Assert.assertTrue(keyOutputStream.getRetryCount() == 0); - // now close the stream, It will update the ack length after watchForCommit - key.close(); - Assert - .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); - Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - Assert.assertEquals(pendingWriteChunkCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, - metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(writeChunkCount + 14, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 8, - metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 22, - metrics.getTotalOpCount()); - Assert - .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); - // make sure the bufferPool is empty - Assert - .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); - Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); - validateData(keyName, data1); - shutdown(); - } - - @Test - public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - startCluster(conf); - XceiverClientManager clientManager = new XceiverClientManager(conf); - ContainerWithPipeline container1 = - storageContainerLocationClient.allocateContainer( - HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - containerOwner); - XceiverClientSpi client = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - client.getPipeline())); - reply.getResponse().get(); - long index = reply.getLogIndex(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - try { - // just watch for a lo index which in not updated in the commitInfo Map - client.watchForCommit(index + 1, 3000); - Assert.fail("expected exception not thrown"); - } catch (Exception e) { - Assert.assertTrue( - HddsClientUtils.checkForException(e) instanceof TimeoutException); - } - // After releasing the client, this connection should be closed - // and any container operations should fail - clientManager.releaseClient(client, false); - shutdown(); - } - - @Test - public void testWatchForCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); - startCluster(conf); - XceiverClientManager clientManager = new XceiverClientManager(conf); - ContainerWithPipeline container1 = - storageContainerLocationClient.allocateContainer( - HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - containerOwner); - XceiverClientSpi client = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - client.getPipeline())); - reply.getResponse().get(); - long index = reply.getLogIndex(); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); - // again write data with more than max buffer limit. This wi - try { - // just watch for a lo index which in not updated in the commitInfo Map - client.watchForCommit(index + 1, 20000); - Assert.fail("expected exception not thrown"); - } catch (Exception e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof RaftRetryFailureException); - } - clientManager.releaseClient(client, false); - shutdown(); - } - - @Test - public void test2WayCommitForRetryfailure() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = - storageContainerLocationClient.allocateContainer( - HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - containerOwner); - XceiverClientSpi client = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; - XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - client.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = client.sendCommandAsync(ContainerTestHelper - .getCloseContainer(pipeline, - container1.getContainerInfo().getContainerID())); - reply.getResponse().get(); - client.watchForCommit(reply.getLogIndex(), 20000); - - // commitInfo Map will be reduced to 2 here - Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(client, false); - Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert.assertTrue( - logCapturer.getOutput().contains("RaftRetryFailureException")); - Assert - .assertTrue(logCapturer.getOutput().contains("Committed by majority")); - logCapturer.stopCapturing(); - shutdown(); - } - - @Test - public void test2WayCommitForTimeoutException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = - storageContainerLocationClient.allocateContainer( - HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - containerOwner); - XceiverClientSpi client = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; - XceiverClientReply reply = - client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest( - container1.getContainerInfo().getContainerID(), - client.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); - reply = client.sendCommandAsync(ContainerTestHelper - .getCloseContainer(pipeline, - container1.getContainerInfo().getContainerID())); - reply.getResponse().get(); - client.watchForCommit(reply.getLogIndex(), 3000); - - // commitInfo Map will be reduced to 2 here - Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); - clientManager.releaseClient(client, false); - Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed")); - Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException")); - Assert - .assertTrue(logCapturer.getOutput().contains("Committed by majority")); - logCapturer.stopCapturing(); - shutdown(); - } - - @Test - public void testWatchForCommitForGroupMismatchException() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, - TimeUnit.SECONDS); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); - - // mark the node stale early so that pipeline gets destroyed quickly - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - startCluster(conf); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - ContainerWithPipeline container1 = - storageContainerLocationClient.allocateContainer( - HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - containerOwner); - XceiverClientSpi client = clientManager - .acquireClient(container1.getPipeline()); - Assert.assertEquals(1, client.getRefcount()); - Assert.assertEquals(container1.getPipeline(), - client.getPipeline()); - Pipeline pipeline = client.getPipeline(); - XceiverClientRatis ratisClient = (XceiverClientRatis) client; - long containerId = container1.getContainerInfo().getContainerID(); - XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper - .getCreateContainerRequest(containerId, client.getPipeline())); - reply.getResponse().get(); - Assert.assertEquals(3, ratisClient.getCommitInfoMap().size()); - List pipelineList = new ArrayList<>(); - pipelineList.add(pipeline); - ContainerTestHelper.waitForPipelineClose(pipelineList, cluster); - try { - // just watch for a lo index which in not updated in the commitInfo Map - //client.watchForCommit(reply.getLogIndex() + 1, 20000); - reply = client.sendCommandAsync(ContainerTestHelper - .getCreateContainerRequest(containerId, client.getPipeline())); - reply.getResponse().get(); - Assert.fail("Expected exception not thrown"); - } catch(Exception e) { - Assert.assertTrue(HddsClientUtils - .checkForException(e) instanceof GroupMismatchException); - } - clientManager.releaseClient(client, false); - shutdown(); - } - - private OzoneOutputStream createKey(String keyName, ReplicationType type, - long size) throws Exception { - return ContainerTestHelper - .createKey(keyName, type, size, objectStore, volumeName, bucketName); - } - - private void validateData(String keyName, byte[] data) throws Exception { - ContainerTestHelper - .validateData(keyName, data, objectStore, volumeName, bucketName); - } -} - diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index a1fd17ccd63..93807b4c2cd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -727,9 +727,7 @@ public final class ContainerTestHelper { keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { - long id = info.getContainerID(); - if (!containerIdList.contains(id)) - containerIdList.add(id); + containerIdList.add(info.getContainerID()); } Assert.assertTrue(!containerIdList.isEmpty()); waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));