HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
509f31b109
commit
ef5de29243
|
@ -96,6 +96,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
private ExcludeList excludeList;
|
||||
private final RetryPolicy retryPolicy;
|
||||
private int retryCount;
|
||||
private long offset;
|
||||
/**
|
||||
* A constructor for testing purpose only.
|
||||
*/
|
||||
|
@ -121,6 +122,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
|
||||
this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
retryCount = 0;
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -149,6 +151,10 @@ public class KeyOutputStream extends OutputStream {
|
|||
return locationInfoList;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
@SuppressWarnings("parameternumber")
|
||||
public KeyOutputStream(OpenKeySession handler,
|
||||
|
@ -316,6 +322,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
current.writeOnRetry(len);
|
||||
} else {
|
||||
current.write(b, off, writeLen);
|
||||
offset += writeLen;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// for the current iteration, totalDataWritten - currentPos gives the
|
||||
|
@ -326,8 +333,12 @@ public class KeyOutputStream extends OutputStream {
|
|||
// The len specified here is the combined sum of the data length of
|
||||
// the buffers
|
||||
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
|
||||
writeLen = retry ? (int) len :
|
||||
(int) (current.getWrittenDataLength() - currentPos);
|
||||
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
|
||||
writeLen = retry ? (int) len : dataWritten;
|
||||
// In retry path, the data written is already accounted in offset.
|
||||
if (!retry) {
|
||||
offset += writeLen;
|
||||
}
|
||||
LOG.debug("writeLen {}, total len {}", writeLen, len);
|
||||
handleException(current, currentStreamIndex, ioe);
|
||||
}
|
||||
|
@ -345,20 +356,24 @@ public class KeyOutputStream extends OutputStream {
|
|||
* from the streamEntries list for the container which is closed.
|
||||
* @param containerID id of the closed container
|
||||
* @param pipelineId id of the associated pipeline
|
||||
* @param streamIndex index of the stream
|
||||
*/
|
||||
private void discardPreallocatedBlocks(long containerID,
|
||||
PipelineID pipelineId) {
|
||||
// currentStreamIndex < streamEntries.size() signifies that, there are still
|
||||
PipelineID pipelineId, int streamIndex) {
|
||||
// streamIndex < streamEntries.size() signifies that, there are still
|
||||
// pre allocated blocks available.
|
||||
if (currentStreamIndex < streamEntries.size()) {
|
||||
|
||||
// This will be called only to discard the next subsequent unused blocks
|
||||
// in the sreamEntryList.
|
||||
if (streamIndex < streamEntries.size()) {
|
||||
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
|
||||
streamEntries.listIterator(currentStreamIndex);
|
||||
streamEntries.listIterator(streamIndex);
|
||||
while (streamEntryIterator.hasNext()) {
|
||||
BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
|
||||
Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
|
||||
if (((pipelineId != null && streamEntry.getPipeline().getId()
|
||||
.equals(pipelineId)) || (containerID != -1
|
||||
&& streamEntry.getBlockID().getContainerID() == containerID))
|
||||
&& streamEntry.getCurrentPosition() == 0) {
|
||||
&& streamEntry.getBlockID().getContainerID() == containerID))) {
|
||||
streamEntryIterator.remove();
|
||||
}
|
||||
}
|
||||
|
@ -396,7 +411,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
private void handleException(BlockOutputStreamEntry streamEntry,
|
||||
int streamIndex, IOException exception) throws IOException {
|
||||
Throwable t = checkForException(exception);
|
||||
boolean retryFailure = checkForRetryFailure(exception);
|
||||
boolean retryFailure = checkForRetryFailure(t);
|
||||
boolean closedContainerException = false;
|
||||
if (!retryFailure) {
|
||||
closedContainerException = checkIfContainerIsClosed(t);
|
||||
|
@ -411,16 +426,14 @@ public class KeyOutputStream extends OutputStream {
|
|||
+ "uncommitted data length is {}", exception,
|
||||
totalSuccessfulFlushedData, bufferedDataLen);
|
||||
Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
|
||||
Preconditions.checkArgument(
|
||||
streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
|
||||
== bufferedDataLen);
|
||||
Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
|
||||
long containerId = streamEntry.getBlockID().getContainerID();
|
||||
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
|
||||
Preconditions.checkNotNull(failedServers);
|
||||
if (!failedServers.isEmpty()) {
|
||||
excludeList.addDatanodes(failedServers);
|
||||
}
|
||||
if (checkIfContainerIsClosed(t)) {
|
||||
if (closedContainerException) {
|
||||
excludeList.addConatinerId(ContainerID.valueof(containerId));
|
||||
} else if (retryFailure || t instanceof TimeoutException) {
|
||||
pipelineId = streamEntry.getPipeline().getId();
|
||||
|
@ -428,22 +441,15 @@ public class KeyOutputStream extends OutputStream {
|
|||
}
|
||||
// just clean up the current stream.
|
||||
streamEntry.cleanup(retryFailure);
|
||||
if (bufferedDataLen > 0) {
|
||||
// If the data is still cached in the underlying stream, we need to
|
||||
// allocate new block and write this data in the datanode.
|
||||
currentStreamIndex += 1;
|
||||
handleRetry(exception, bufferedDataLen);
|
||||
}
|
||||
if (totalSuccessfulFlushedData == 0) {
|
||||
streamEntries.remove(streamIndex);
|
||||
currentStreamIndex -= 1;
|
||||
}
|
||||
|
||||
// discard all sunsequent blocks the containers and pipelines which
|
||||
// are in the exclude list so that, the very next retry should never
|
||||
// write data on the closed container/pipeline
|
||||
if (closedContainerException) {
|
||||
// discard subsequent pre allocated blocks from the streamEntries list
|
||||
// from the closed container
|
||||
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
|
||||
null);
|
||||
null, streamIndex + 1);
|
||||
} else {
|
||||
// In case there is timeoutException or Watch for commit happening over
|
||||
// majority or the client connection failure to the leader in the
|
||||
|
@ -451,7 +457,19 @@ public class KeyOutputStream extends OutputStream {
|
|||
// Next block allocation will happen with excluding this specific pipeline
|
||||
// This will ensure if 2 way commit happens , it cannot span over multiple
|
||||
// blocks
|
||||
discardPreallocatedBlocks(-1, pipelineId);
|
||||
discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
|
||||
}
|
||||
if (bufferedDataLen > 0) {
|
||||
// If the data is still cached in the underlying stream, we need to
|
||||
// allocate new block and write this data in the datanode.
|
||||
currentStreamIndex += 1;
|
||||
handleRetry(exception, bufferedDataLen);
|
||||
// reset the retryCount after handling the exception
|
||||
retryCount = 0;
|
||||
}
|
||||
if (totalSuccessfulFlushedData == 0) {
|
||||
streamEntries.remove(streamIndex);
|
||||
currentStreamIndex -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -618,7 +636,9 @@ public class KeyOutputStream extends OutputStream {
|
|||
if (keyArgs != null) {
|
||||
// in test, this could be null
|
||||
removeEmptyBlocks();
|
||||
keyArgs.setDataSize(getKeyLength());
|
||||
long length = getKeyLength();
|
||||
Preconditions.checkArgument(offset == length);
|
||||
keyArgs.setDataSize(length);
|
||||
keyArgs.setLocationInfoList(getLocationInfoList());
|
||||
// When the key is multipart upload part file upload, we should not
|
||||
// commit the key, as this is not an actual key, this is a just a
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.hadoop.ozone.client.rpc;
|
||||
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
|
@ -235,6 +236,8 @@ public class TestBlockOutputStreamWithFailures {
|
|||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof ContainerNotOpenException);
|
||||
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// commitInfoMap will remain intact as there is no server failure
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
|
@ -372,6 +375,8 @@ public class TestBlockOutputStreamWithFailures {
|
|||
key.close();
|
||||
Assert
|
||||
.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
|
@ -506,6 +511,8 @@ public class TestBlockOutputStreamWithFailures {
|
|||
key.flush();
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof AlreadyClosedException);
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
key.close();
|
||||
Assert
|
||||
|
@ -534,11 +541,698 @@ public class TestBlockOutputStreamWithFailures {
|
|||
validateData(keyName, data1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWithPrimeSizedData() throws Exception {
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long totalOpCount = metrics.getTotalOpCount();
|
||||
String keyName = getKeyName();
|
||||
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
|
||||
int dataLength = 167;
|
||||
// write data more than 1 chunk
|
||||
byte[] data1 =
|
||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||
.getBytes(UTF_8);
|
||||
key.write(data1);
|
||||
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
||||
== pendingWriteChunkCount + 1);
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
||||
== pendingPutBlockCount);
|
||||
Assert.assertEquals(writeChunkCount + 1,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 1,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
|
||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
||||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
|
||||
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(0,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
|
||||
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 2,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 1,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 3,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
// Close the containers on the Datanode and write more data
|
||||
ContainerTestHelper.waitForContainerClose(key, cluster);
|
||||
key.write(data1);
|
||||
|
||||
// As a part of handling the exception, 2 failed writeChunks will be
|
||||
// rewritten plus 1 putBlocks for flush
|
||||
// and one flush for partial chunk
|
||||
key.flush();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof ContainerNotOpenException);
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
|
||||
// commitInfoMap will remain intact as there is no server failure
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
key.close();
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
Assert
|
||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 6,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 3,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 9,
|
||||
metrics.getTotalOpCount());
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
||||
// Written the same data twice
|
||||
String dataString = new String(data1, UTF_8);
|
||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionDuringClose() throws Exception {
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long totalOpCount = metrics.getTotalOpCount();
|
||||
String keyName = getKeyName();
|
||||
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
|
||||
int dataLength = 167;
|
||||
// write data more than 1 chunk
|
||||
byte[] data1 =
|
||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||
.getBytes(UTF_8);
|
||||
key.write(data1);
|
||||
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
||||
== pendingWriteChunkCount + 1);
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
||||
== pendingPutBlockCount);
|
||||
Assert.assertEquals(writeChunkCount + 1,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 1,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
|
||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
||||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
|
||||
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(0,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
|
||||
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 2,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 1,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 3,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
// Close the containers on the Datanode and write more data
|
||||
ContainerTestHelper.waitForContainerClose(key, cluster);
|
||||
key.write(data1);
|
||||
|
||||
// commitInfoMap will remain intact as there is no server failure
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
// now close the stream, It will hit exception
|
||||
key.close();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof ContainerNotOpenException);
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
Assert
|
||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 6,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 3,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 9,
|
||||
metrics.getTotalOpCount());
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
||||
// Written the same data twice
|
||||
String dataString = new String(data1, UTF_8);
|
||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWatchForCommitWithSingleNodeRatis() throws Exception {
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long totalOpCount = metrics.getTotalOpCount();
|
||||
String keyName = getKeyName();
|
||||
OzoneOutputStream key =
|
||||
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
|
||||
int dataLength = maxFlushSize + 50;
|
||||
// write data more than 1 chunk
|
||||
byte[] data1 =
|
||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||
.getBytes(UTF_8);
|
||||
key.write(data1);
|
||||
|
||||
// since its hitting the full bufferCondition, it will call watchForCommit
|
||||
// and completes atleast putBlock for first flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
||||
<= pendingWriteChunkCount + 2);
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
||||
<= pendingPutBlockCount + 1);
|
||||
Assert.assertEquals(writeChunkCount + 4,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 2,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 6,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
|
||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
||||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
// we have just written data more than flush Size(2 chunks), at this time
|
||||
// buffer pool will have 4 buffers allocated worth of chunk size
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
// writtenDataLength as well flushedDataLength will be updated here
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(maxFlushSize,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
// since data equals to maxBufferSize is written, this will be a blocking
|
||||
// call and hence will wait for atleast flushSize worth of data to get
|
||||
// ack'd by all servers right here
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
|
||||
|
||||
// watchForCommit will clean up atleast one entry from the map where each
|
||||
// entry corresponds to flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 5,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 3,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 8,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// flush is a sync call, all pending operations will complete
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
// Close the containers on the Datanode and write more data
|
||||
ContainerTestHelper.waitForContainerClose(key, cluster);
|
||||
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
|
||||
// once exception is hit
|
||||
key.write(data1);
|
||||
|
||||
// As a part of handling the exception, 4 failed writeChunks will be
|
||||
// rewritten plus one partial chunk plus two putBlocks for flushSize
|
||||
// and one flush for partial chunk
|
||||
key.flush();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof ContainerNotOpenException);
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// commitInfoMap will remain intact as there is no server failure
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
key.close();
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
Assert
|
||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 14,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 8,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 22,
|
||||
metrics.getTotalOpCount());
|
||||
// Written the same data twice
|
||||
String dataString = new String(data1, UTF_8);
|
||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long totalOpCount = metrics.getTotalOpCount();
|
||||
String keyName = getKeyName();
|
||||
OzoneOutputStream key =
|
||||
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
|
||||
int dataLength = maxFlushSize + 50;
|
||||
// write data more than 1 chunk
|
||||
byte[] data1 =
|
||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||
.getBytes(UTF_8);
|
||||
key.write(data1);
|
||||
// since its hitting the full bufferCondition, it will call watchForCommit
|
||||
// and completes at least putBlock for first flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
||||
<= pendingWriteChunkCount + 2);
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
||||
<= pendingPutBlockCount + 1);
|
||||
Assert.assertEquals(writeChunkCount + 4,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 2,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 6,
|
||||
metrics.getTotalOpCount());
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
|
||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
||||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
// we have just written data more than flush Size(2 chunks), at this time
|
||||
// buffer pool will have 3 buffers allocated worth of chunk size
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
// writtenDataLength as well flushedDataLength will be updated here
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(maxFlushSize,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
// since data equals to maxBufferSize is written, this will be a blocking
|
||||
// call and hence will wait for atleast flushSize worth of data to get
|
||||
// ack'd by all servers right here
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
|
||||
|
||||
// watchForCommit will clean up atleast flushSize worth of data buffer
|
||||
// where each entry corresponds to flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 5,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 3,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 8,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
Pipeline pipeline = raftClient.getPipeline();
|
||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
||||
|
||||
// again write data with more than max buffer limit. This will call
|
||||
// watchForCommit again. No write will happen in the current block and
|
||||
// data will be rewritten to the next block.
|
||||
|
||||
key.write(data1);
|
||||
|
||||
key.flush();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof AlreadyClosedException);
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
key.close();
|
||||
Assert
|
||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
|
||||
// in total, there are 14 full write chunks, 5 before the failure injection,
|
||||
// 4 chunks after which we detect the failure and then 5 again on the next
|
||||
// block
|
||||
Assert.assertEquals(writeChunkCount + 14,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
// 3 flushes at flushSize boundaries before failure injection + 2
|
||||
// flush failed + 3 more flushes for the next block
|
||||
Assert.assertEquals(putBlockCount + 8,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 22,
|
||||
metrics.getTotalOpCount());
|
||||
// Written the same data twice
|
||||
String dataString = new String(data1, UTF_8);
|
||||
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
|
||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatanodeFailureWithPreAllocation() throws Exception {
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.WriteChunk);
|
||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
||||
ContainerProtos.Type.PutBlock);
|
||||
long totalOpCount = metrics.getTotalOpCount();
|
||||
String keyName = getKeyName();
|
||||
OzoneOutputStream key =
|
||||
createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
|
||||
ReplicationFactor.ONE);
|
||||
int dataLength = maxFlushSize + 50;
|
||||
// write data more than 1 chunk
|
||||
byte[] data1 =
|
||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||
.getBytes(UTF_8);
|
||||
key.write(data1);
|
||||
// since its hitting the full bufferCondition, it will call watchForCommit
|
||||
// and completes at least putBlock for first flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
||||
<= pendingWriteChunkCount + 2);
|
||||
Assert.assertTrue(
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
||||
<= pendingPutBlockCount + 1);
|
||||
Assert.assertEquals(writeChunkCount + 4,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 2,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 6,
|
||||
metrics.getTotalOpCount());
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
|
||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
||||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
// we have just written data more than flush Size(2 chunks), at this time
|
||||
// buffer pool will have 3 buffers allocated worth of chunk size
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
// writtenDataLength as well flushedDataLength will be updated here
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(maxFlushSize,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
// since data equals to maxBufferSize is written, this will be a blocking
|
||||
// call and hence will wait for atleast flushSize worth of data to get
|
||||
// ack'd by all servers right here
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
|
||||
|
||||
// watchForCommit will clean up atleast flushSize worth of data buffer
|
||||
// where each entry corresponds to flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(writeChunkCount + 5,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(putBlockCount + 3,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 8,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
Pipeline pipeline = raftClient.getPipeline();
|
||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
||||
|
||||
// again write data with more than max buffer limit. This will call
|
||||
// watchForCommit again. No write will happen and
|
||||
|
||||
key.write(data1);
|
||||
|
||||
key.flush();
|
||||
|
||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||
.getIoException()) instanceof AlreadyClosedException);
|
||||
|
||||
// Make sure the retryCount is reset after the exception is handled
|
||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||
|
||||
// now close the stream, It will update the ack length after watchForCommit
|
||||
key.close();
|
||||
Assert
|
||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||
// make sure the bufferPool is empty
|
||||
Assert
|
||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
||||
|
||||
// in total, there are 14 full write chunks, 5 before the failure injection,
|
||||
// 4 chunks after which we detect the failure and then 5 again on the next
|
||||
// block
|
||||
Assert.assertEquals(writeChunkCount + 14,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
||||
|
||||
// 3 flushes at flushSize boundaries before failure injection + 2
|
||||
// flush failed + 3 more flushes for the next block
|
||||
Assert.assertEquals(putBlockCount + 8,
|
||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 22,
|
||||
metrics.getTotalOpCount());
|
||||
// Written the same data twice
|
||||
String dataString = new String(data1, UTF_8);
|
||||
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
|
||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||
}
|
||||
|
||||
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
||||
long size) throws Exception {
|
||||
return ContainerTestHelper
|
||||
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
|
||||
return createKey(keyName, type, size, ReplicationFactor.THREE);
|
||||
}
|
||||
|
||||
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
||||
long size, ReplicationFactor factor) throws Exception {
|
||||
return ContainerTestHelper
|
||||
.createKey(keyName, type, factor, size, objectStore, volumeName,
|
||||
bucketName);
|
||||
}
|
||||
|
||||
private void validateData(String keyName, byte[] data) throws Exception {
|
||||
ContainerTestHelper
|
||||
.validateData(keyName, data, objectStore, volumeName, bucketName);
|
||||
|
|
|
@ -700,6 +700,15 @@ public final class ContainerTestHelper {
|
|||
.createKey(keyName, size, type, factor, new HashMap<>());
|
||||
}
|
||||
|
||||
public static OzoneOutputStream createKey(String keyName,
|
||||
ReplicationType type,
|
||||
org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
|
||||
ObjectStore objectStore, String volumeName, String bucketName)
|
||||
throws Exception {
|
||||
return objectStore.getVolume(volumeName).getBucket(bucketName)
|
||||
.createKey(keyName, size, type, factor, new HashMap<>());
|
||||
}
|
||||
|
||||
public static void validateData(String keyName, byte[] data,
|
||||
ObjectStore objectStore, String volumeName, String bucketName)
|
||||
throws Exception {
|
||||
|
|
Loading…
Reference in New Issue