HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.

(cherry picked from commit ef5de29243)
This commit is contained in:
Shashikant Banerjee 2019-04-01 16:09:58 +05:30
parent f0640e2eba
commit bb20c80955
3 changed files with 751 additions and 28 deletions

View File

@ -96,6 +96,7 @@ public class KeyOutputStream extends OutputStream {
private ExcludeList excludeList; private ExcludeList excludeList;
private final RetryPolicy retryPolicy; private final RetryPolicy retryPolicy;
private int retryCount; private int retryCount;
private long offset;
/** /**
* A constructor for testing purpose only. * A constructor for testing purpose only.
*/ */
@ -121,6 +122,7 @@ public class KeyOutputStream extends OutputStream {
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
retryCount = 0; retryCount = 0;
offset = 0;
} }
@VisibleForTesting @VisibleForTesting
@ -149,6 +151,10 @@ public class KeyOutputStream extends OutputStream {
return locationInfoList; return locationInfoList;
} }
@VisibleForTesting
public int getRetryCount() {
return retryCount;
}
@SuppressWarnings("parameternumber") @SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler, public KeyOutputStream(OpenKeySession handler,
@ -316,6 +322,7 @@ public class KeyOutputStream extends OutputStream {
current.writeOnRetry(len); current.writeOnRetry(len);
} else { } else {
current.write(b, off, writeLen); current.write(b, off, writeLen);
offset += writeLen;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the // for the current iteration, totalDataWritten - currentPos gives the
@ -326,8 +333,12 @@ public class KeyOutputStream extends OutputStream {
// The len specified here is the combined sum of the data length of // The len specified here is the combined sum of the data length of
// the buffers // the buffers
Preconditions.checkState(!retry || len <= streamBufferMaxSize); Preconditions.checkState(!retry || len <= streamBufferMaxSize);
writeLen = retry ? (int) len : int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
(int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
if (!retry) {
offset += writeLen;
}
LOG.debug("writeLen {}, total len {}", writeLen, len); LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, currentStreamIndex, ioe); handleException(current, currentStreamIndex, ioe);
} }
@ -345,20 +356,24 @@ public class KeyOutputStream extends OutputStream {
* from the streamEntries list for the container which is closed. * from the streamEntries list for the container which is closed.
* @param containerID id of the closed container * @param containerID id of the closed container
* @param pipelineId id of the associated pipeline * @param pipelineId id of the associated pipeline
* @param streamIndex index of the stream
*/ */
private void discardPreallocatedBlocks(long containerID, private void discardPreallocatedBlocks(long containerID,
PipelineID pipelineId) { PipelineID pipelineId, int streamIndex) {
// currentStreamIndex < streamEntries.size() signifies that, there are still // streamIndex < streamEntries.size() signifies that, there are still
// pre allocated blocks available. // pre allocated blocks available.
if (currentStreamIndex < streamEntries.size()) {
// This will be called only to discard the next subsequent unused blocks
// in the sreamEntryList.
if (streamIndex < streamEntries.size()) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator = ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(currentStreamIndex); streamEntries.listIterator(streamIndex);
while (streamEntryIterator.hasNext()) { while (streamEntryIterator.hasNext()) {
BlockOutputStreamEntry streamEntry = streamEntryIterator.next(); BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
if (((pipelineId != null && streamEntry.getPipeline().getId() if (((pipelineId != null && streamEntry.getPipeline().getId()
.equals(pipelineId)) || (containerID != -1 .equals(pipelineId)) || (containerID != -1
&& streamEntry.getBlockID().getContainerID() == containerID)) && streamEntry.getBlockID().getContainerID() == containerID))) {
&& streamEntry.getCurrentPosition() == 0) {
streamEntryIterator.remove(); streamEntryIterator.remove();
} }
} }
@ -396,7 +411,7 @@ public class KeyOutputStream extends OutputStream {
private void handleException(BlockOutputStreamEntry streamEntry, private void handleException(BlockOutputStreamEntry streamEntry,
int streamIndex, IOException exception) throws IOException { int streamIndex, IOException exception) throws IOException {
Throwable t = checkForException(exception); Throwable t = checkForException(exception);
boolean retryFailure = checkForRetryFailure(exception); boolean retryFailure = checkForRetryFailure(t);
boolean closedContainerException = false; boolean closedContainerException = false;
if (!retryFailure) { if (!retryFailure) {
closedContainerException = checkIfContainerIsClosed(t); closedContainerException = checkIfContainerIsClosed(t);
@ -411,16 +426,14 @@ public class KeyOutputStream extends OutputStream {
+ "uncommitted data length is {}", exception, + "uncommitted data length is {}", exception,
totalSuccessfulFlushedData, bufferedDataLen); totalSuccessfulFlushedData, bufferedDataLen);
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
Preconditions.checkArgument( Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
== bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID(); long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers(); Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers); Preconditions.checkNotNull(failedServers);
if (!failedServers.isEmpty()) { if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers); excludeList.addDatanodes(failedServers);
} }
if (checkIfContainerIsClosed(t)) { if (closedContainerException) {
excludeList.addConatinerId(ContainerID.valueof(containerId)); excludeList.addConatinerId(ContainerID.valueof(containerId));
} else if (retryFailure || t instanceof TimeoutException) { } else if (retryFailure || t instanceof TimeoutException) {
pipelineId = streamEntry.getPipeline().getId(); pipelineId = streamEntry.getPipeline().getId();
@ -428,22 +441,15 @@ public class KeyOutputStream extends OutputStream {
} }
// just clean up the current stream. // just clean up the current stream.
streamEntry.cleanup(retryFailure); streamEntry.cleanup(retryFailure);
if (bufferedDataLen > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
currentStreamIndex += 1;
handleRetry(exception, bufferedDataLen);
}
if (totalSuccessfulFlushedData == 0) {
streamEntries.remove(streamIndex);
currentStreamIndex -= 1;
}
// discard all sunsequent blocks the containers and pipelines which
// are in the exclude list so that, the very next retry should never
// write data on the closed container/pipeline
if (closedContainerException) { if (closedContainerException) {
// discard subsequent pre allocated blocks from the streamEntries list // discard subsequent pre allocated blocks from the streamEntries list
// from the closed container // from the closed container
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
null); null, streamIndex + 1);
} else { } else {
// In case there is timeoutException or Watch for commit happening over // In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the // majority or the client connection failure to the leader in the
@ -451,7 +457,19 @@ public class KeyOutputStream extends OutputStream {
// Next block allocation will happen with excluding this specific pipeline // Next block allocation will happen with excluding this specific pipeline
// This will ensure if 2 way commit happens , it cannot span over multiple // This will ensure if 2 way commit happens , it cannot span over multiple
// blocks // blocks
discardPreallocatedBlocks(-1, pipelineId); discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
}
if (bufferedDataLen > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
currentStreamIndex += 1;
handleRetry(exception, bufferedDataLen);
// reset the retryCount after handling the exception
retryCount = 0;
}
if (totalSuccessfulFlushedData == 0) {
streamEntries.remove(streamIndex);
currentStreamIndex -= 1;
} }
} }
@ -618,7 +636,9 @@ public class KeyOutputStream extends OutputStream {
if (keyArgs != null) { if (keyArgs != null) {
// in test, this could be null // in test, this could be null
removeEmptyBlocks(); removeEmptyBlocks();
keyArgs.setDataSize(getKeyLength()); long length = getKeyLength();
Preconditions.checkArgument(offset == length);
keyArgs.setDataSize(length);
keyArgs.setLocationInfoList(getLocationInfoList()); keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not // When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a // commit the key, as this is not an actual key, this is a just a

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc; package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -235,6 +236,8 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException); .getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure // commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
@ -372,6 +375,8 @@ public class TestBlockOutputStreamWithFailures {
key.close(); key.close();
Assert Assert
.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// make sure the bufferPool is empty // make sure the bufferPool is empty
Assert Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@ -506,6 +511,8 @@ public class TestBlockOutputStreamWithFailures {
key.flush(); key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException); .getIoException()) instanceof AlreadyClosedException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();
Assert Assert
@ -534,11 +541,698 @@ public class TestBlockOutputStreamWithFailures {
validateData(keyName, data1); validateData(keyName, data1);
} }
@Test
public void testFailureWithPrimeSizedData() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = 167;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
== pendingWriteChunkCount + 1);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
== pendingPutBlockCount);
Assert.assertEquals(writeChunkCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 1,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(0,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// Close the containers on the Datanode and write more data
ContainerTestHelper.waitForContainerClose(key, cluster);
key.write(data1);
// As a part of handling the exception, 2 failed writeChunks will be
// rewritten plus 1 putBlocks for flush
// and one flush for partial chunk
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 6,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void testExceptionDuringClose() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = 167;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
== pendingWriteChunkCount + 1);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
== pendingPutBlockCount);
Assert.assertEquals(writeChunkCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 1,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(0,
blockOutputStream.getTotalDataFlushedLength());
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 1,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// Close the containers on the Datanode and write more data
ContainerTestHelper.waitForContainerClose(key, cluster);
key.write(data1);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// now close the stream, It will hit exception
key.close();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 6,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void testWatchForCommitWithSingleNodeRatis() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 4 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast one entry from the map where each
// entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// Close the containers on the Datanode and write more data
ContainerTestHelper.waitForContainerClose(key, cluster);
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
// once exception is hit
key.write(data1);
// As a part of handling the exception, 4 failed writeChunks will be
// rewritten plus one partial chunk plus two putBlocks for flushSize
// and one flush for partial chunk
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// commitInfoMap will remain intact as there is no server failure
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast flushSize worth of data buffer
// where each entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
Pipeline pipeline = raftClient.getPipeline();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
// again write data with more than max buffer limit. This will call
// watchForCommit again. No write will happen in the current block and
// data will be rewritten to the next block.
key.write(data1);
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException);
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// in total, there are 14 full write chunks, 5 before the failure injection,
// 4 chunks after which we detect the failure and then 5 again on the next
// block
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
// 3 flushes at flushSize boundaries before failure injection + 2
// flush failed + 3 more flushes for the next block
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
validateData(keyName, dataString.concat(dataString).getBytes());
}
@Test
public void testDatanodeFailureWithPreAllocation() throws Exception {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long putBlockCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.PutBlock);
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.WriteChunk);
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
ContainerProtos.Type.PutBlock);
long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
ReplicationFactor.ONE);
int dataLength = maxFlushSize + 50;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
<= pendingWriteChunkCount + 2);
Assert.assertTrue(
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
Assert.assertEquals(writeChunkCount + 4,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 6,
metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
.getOutputStream();
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
// we have just written data more than flush Size(2 chunks), at this time
// buffer pool will have 3 buffers allocated worth of chunk size
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(maxFlushSize,
blockOutputStream.getTotalDataFlushedLength());
// since data equals to maxBufferSize is written, this will be a blocking
// call and hence will wait for atleast flushSize worth of data to get
// ack'd by all servers right here
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
// watchForCommit will clean up atleast flushSize worth of data buffer
// where each entry corresponds to flushSize worth of data
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(writeChunkCount + 5,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(putBlockCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
// flush will make sure one more entry gets updated in the map
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
Pipeline pipeline = raftClient.getPipeline();
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
// again write data with more than max buffer limit. This will call
// watchForCommit again. No write will happen and
key.write(data1);
key.flush();
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
.getIoException()) instanceof AlreadyClosedException);
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
// in total, there are 14 full write chunks, 5 before the failure injection,
// 4 chunks after which we detect the failure and then 5 again on the next
// block
Assert.assertEquals(writeChunkCount + 14,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
// 3 flushes at flushSize boundaries before failure injection + 2
// flush failed + 3 more flushes for the next block
Assert.assertEquals(putBlockCount + 8,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 22,
metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
validateData(keyName, dataString.concat(dataString).getBytes());
}
private OzoneOutputStream createKey(String keyName, ReplicationType type, private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception { long size) throws Exception {
return ContainerTestHelper return createKey(keyName, type, size, ReplicationFactor.THREE);
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
} }
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size, ReplicationFactor factor) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, factor, size, objectStore, volumeName,
bucketName);
}
private void validateData(String keyName, byte[] data) throws Exception { private void validateData(String keyName, byte[] data) throws Exception {
ContainerTestHelper ContainerTestHelper
.validateData(keyName, data, objectStore, volumeName, bucketName); .validateData(keyName, data, objectStore, volumeName, bucketName);

View File

@ -700,6 +700,15 @@ public final class ContainerTestHelper {
.createKey(keyName, size, type, factor, new HashMap<>()); .createKey(keyName, size, type, factor, new HashMap<>());
} }
public static OzoneOutputStream createKey(String keyName,
ReplicationType type,
org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
ObjectStore objectStore, String volumeName, String bucketName)
throws Exception {
return objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey(keyName, size, type, factor, new HashMap<>());
}
public static void validateData(String keyName, byte[] data, public static void validateData(String keyName, byte[] data,
ObjectStore objectStore, String volumeName, String bucketName) ObjectStore objectStore, String volumeName, String bucketName)
throws Exception { throws Exception {