Revert "HDDS-1373. KeyOutputStream, close after write request fails after retries, runs into IllegalArgumentException..(#729)"
This reverts commit df2ae27f3e
.
This commit is contained in:
parent
df2ae27f3e
commit
082f1e0437
|
@ -102,10 +102,4 @@ public class ExcludeList {
|
||||||
});
|
});
|
||||||
return excludeList;
|
return excludeList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
|
||||||
datanodes.clear();
|
|
||||||
containerIds.clear();
|
|
||||||
pipelineIds.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,66 +295,60 @@ public class KeyOutputStream extends OutputStream {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int succeededAllocates = 0;
|
int succeededAllocates = 0;
|
||||||
while (len > 0) {
|
while (len > 0) {
|
||||||
try {
|
if (streamEntries.size() <= currentStreamIndex) {
|
||||||
if (streamEntries.size() <= currentStreamIndex) {
|
Preconditions.checkNotNull(omClient);
|
||||||
Preconditions.checkNotNull(omClient);
|
// allocate a new block, if a exception happens, log an error and
|
||||||
// allocate a new block, if a exception happens, log an error and
|
// throw exception to the caller directly, and the write fails.
|
||||||
// throw exception to the caller directly, and the write fails.
|
|
||||||
try {
|
|
||||||
allocateNewBlock(currentStreamIndex);
|
|
||||||
succeededAllocates += 1;
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Try to allocate more blocks for write failed, already "
|
|
||||||
+ "allocated " + succeededAllocates
|
|
||||||
+ " blocks for this write.");
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// in theory, this condition should never violate due the check above
|
|
||||||
// still do a sanity check.
|
|
||||||
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
|
|
||||||
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
|
|
||||||
|
|
||||||
// length(len) will be in int range if the call is happening through
|
|
||||||
// write API of blockOutputStream. Length can be in long range if it comes
|
|
||||||
// via Exception path.
|
|
||||||
int writeLen = Math.min((int) len, (int) current.getRemaining());
|
|
||||||
long currentPos = current.getWrittenDataLength();
|
|
||||||
try {
|
try {
|
||||||
if (retry) {
|
allocateNewBlock(currentStreamIndex);
|
||||||
current.writeOnRetry(len);
|
succeededAllocates += 1;
|
||||||
} else {
|
|
||||||
current.write(b, off, writeLen);
|
|
||||||
offset += writeLen;
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// for the current iteration, totalDataWritten - currentPos gives the
|
LOG.error("Try to allocate more blocks for write failed, already "
|
||||||
// amount of data already written to the buffer
|
+ "allocated " + succeededAllocates + " blocks for this write.");
|
||||||
|
throw ioe;
|
||||||
// In the retryPath, the total data to be written will always be equal
|
|
||||||
// to or less than the max length of the buffer allocated.
|
|
||||||
// The len specified here is the combined sum of the data length of
|
|
||||||
// the buffers
|
|
||||||
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
|
|
||||||
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
|
|
||||||
writeLen = retry ? (int) len : dataWritten;
|
|
||||||
// In retry path, the data written is already accounted in offset.
|
|
||||||
if (!retry) {
|
|
||||||
offset += writeLen;
|
|
||||||
}
|
|
||||||
LOG.debug("writeLen {}, total len {}", writeLen, len);
|
|
||||||
handleException(current, currentStreamIndex, ioe);
|
|
||||||
}
|
}
|
||||||
if (current.getRemaining() <= 0) {
|
|
||||||
// since the current block is already written close the stream.
|
|
||||||
handleFlushOrClose(StreamAction.FULL);
|
|
||||||
}
|
|
||||||
len -= writeLen;
|
|
||||||
off += writeLen;
|
|
||||||
} catch (Exception e) {
|
|
||||||
markStreamClosed();
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
|
// in theory, this condition should never violate due the check above
|
||||||
|
// still do a sanity check.
|
||||||
|
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
|
||||||
|
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
|
||||||
|
|
||||||
|
// length(len) will be in int range if the call is happening through
|
||||||
|
// write API of blockOutputStream. Length can be in long range if it comes
|
||||||
|
// via Exception path.
|
||||||
|
int writeLen = Math.min((int)len, (int) current.getRemaining());
|
||||||
|
long currentPos = current.getWrittenDataLength();
|
||||||
|
try {
|
||||||
|
if (retry) {
|
||||||
|
current.writeOnRetry(len);
|
||||||
|
} else {
|
||||||
|
current.write(b, off, writeLen);
|
||||||
|
offset += writeLen;
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// for the current iteration, totalDataWritten - currentPos gives the
|
||||||
|
// amount of data already written to the buffer
|
||||||
|
|
||||||
|
// In the retryPath, the total data to be written will always be equal
|
||||||
|
// to or less than the max length of the buffer allocated.
|
||||||
|
// The len specified here is the combined sum of the data length of
|
||||||
|
// the buffers
|
||||||
|
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
|
||||||
|
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
|
||||||
|
writeLen = retry ? (int) len : dataWritten;
|
||||||
|
// In retry path, the data written is already accounted in offset.
|
||||||
|
if (!retry) {
|
||||||
|
offset += writeLen;
|
||||||
|
}
|
||||||
|
LOG.debug("writeLen {}, total len {}", writeLen, len);
|
||||||
|
handleException(current, currentStreamIndex, ioe);
|
||||||
|
}
|
||||||
|
if (current.getRemaining() <= 0) {
|
||||||
|
// since the current block is already written close the stream.
|
||||||
|
handleFlushOrClose(StreamAction.FULL);
|
||||||
|
}
|
||||||
|
len -= writeLen;
|
||||||
|
off += writeLen;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +365,7 @@ public class KeyOutputStream extends OutputStream {
|
||||||
// pre allocated blocks available.
|
// pre allocated blocks available.
|
||||||
|
|
||||||
// This will be called only to discard the next subsequent unused blocks
|
// This will be called only to discard the next subsequent unused blocks
|
||||||
// in the streamEntryList.
|
// in the sreamEntryList.
|
||||||
if (streamIndex < streamEntries.size()) {
|
if (streamIndex < streamEntries.size()) {
|
||||||
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
|
ListIterator<BlockOutputStreamEntry> streamEntryIterator =
|
||||||
streamEntries.listIterator(streamIndex);
|
streamEntries.listIterator(streamIndex);
|
||||||
|
@ -404,20 +398,6 @@ public class KeyOutputStream extends OutputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanup() {
|
|
||||||
if (excludeList != null) {
|
|
||||||
excludeList.clear();
|
|
||||||
excludeList = null;
|
|
||||||
}
|
|
||||||
if (bufferPool != null) {
|
|
||||||
bufferPool.clearBufferPool();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamEntries != null) {
|
|
||||||
streamEntries.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* It performs following actions :
|
* It performs following actions :
|
||||||
* a. Updates the committed length at datanode for the current stream in
|
* a. Updates the committed length at datanode for the current stream in
|
||||||
|
@ -438,7 +418,8 @@ public class KeyOutputStream extends OutputStream {
|
||||||
closedContainerException = checkIfContainerIsClosed(t);
|
closedContainerException = checkIfContainerIsClosed(t);
|
||||||
}
|
}
|
||||||
PipelineID pipelineId = null;
|
PipelineID pipelineId = null;
|
||||||
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
|
long totalSuccessfulFlushedData =
|
||||||
|
streamEntry.getTotalAckDataLength();
|
||||||
//set the correct length for the current stream
|
//set the correct length for the current stream
|
||||||
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
|
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
|
||||||
long bufferedDataLen = computeBufferData();
|
long bufferedDataLen = computeBufferData();
|
||||||
|
@ -469,8 +450,8 @@ public class KeyOutputStream extends OutputStream {
|
||||||
if (closedContainerException) {
|
if (closedContainerException) {
|
||||||
// discard subsequent pre allocated blocks from the streamEntries list
|
// discard subsequent pre allocated blocks from the streamEntries list
|
||||||
// from the closed container
|
// from the closed container
|
||||||
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
|
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
|
||||||
streamIndex + 1);
|
null, streamIndex + 1);
|
||||||
} else {
|
} else {
|
||||||
// In case there is timeoutException or Watch for commit happening over
|
// In case there is timeoutException or Watch for commit happening over
|
||||||
// majority or the client connection failure to the leader in the
|
// majority or the client connection failure to the leader in the
|
||||||
|
@ -494,11 +475,6 @@ public class KeyOutputStream extends OutputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markStreamClosed() {
|
|
||||||
cleanup();
|
|
||||||
closed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleRetry(IOException exception, long len) throws IOException {
|
private void handleRetry(IOException exception, long len) throws IOException {
|
||||||
RetryPolicy.RetryAction action;
|
RetryPolicy.RetryAction action;
|
||||||
try {
|
try {
|
||||||
|
@ -610,46 +586,40 @@ public class KeyOutputStream extends OutputStream {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
int size = streamEntries.size();
|
||||||
int size = streamEntries.size();
|
int streamIndex =
|
||||||
int streamIndex =
|
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
|
||||||
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
|
BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
|
||||||
BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
|
if (entry != null) {
|
||||||
if (entry != null) {
|
try {
|
||||||
try {
|
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
|
||||||
Collection<DatanodeDetails> failedServers =
|
// failed servers can be null in case there is no data written in the
|
||||||
entry.getFailedServers();
|
// stream
|
||||||
// failed servers can be null in case there is no data written in the
|
if (failedServers != null && !failedServers.isEmpty()) {
|
||||||
// stream
|
excludeList.addDatanodes(failedServers);
|
||||||
if (failedServers != null && !failedServers.isEmpty()) {
|
|
||||||
excludeList.addDatanodes(failedServers);
|
|
||||||
}
|
|
||||||
switch (op) {
|
|
||||||
case CLOSE:
|
|
||||||
entry.close();
|
|
||||||
break;
|
|
||||||
case FULL:
|
|
||||||
if (entry.getRemaining() == 0) {
|
|
||||||
entry.close();
|
|
||||||
currentStreamIndex++;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case FLUSH:
|
|
||||||
entry.flush();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IOException("Invalid Operation");
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
handleException(entry, streamIndex, ioe);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
switch (op) {
|
||||||
|
case CLOSE:
|
||||||
|
entry.close();
|
||||||
|
break;
|
||||||
|
case FULL:
|
||||||
|
if (entry.getRemaining() == 0) {
|
||||||
|
entry.close();
|
||||||
|
currentStreamIndex++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case FLUSH:
|
||||||
|
entry.flush();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IOException("Invalid Operation");
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
handleException(entry, streamIndex, ioe);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
} catch (Exception e) {
|
|
||||||
markStreamClosed();
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,7 +658,7 @@ public class KeyOutputStream extends OutputStream {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
cleanup();
|
bufferPool.clearBufferPool();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -189,7 +189,6 @@ public class TestBlockOutputStream {
|
||||||
// flush ensures watchForCommit updates the total length acknowledged
|
// flush ensures watchForCommit updates the total length acknowledged
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
|
|
||||||
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
// now close the stream, It will update the ack length after watchForCommit
|
||||||
key.close();
|
key.close();
|
||||||
|
|
||||||
|
@ -209,7 +208,7 @@ public class TestBlockOutputStream {
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,7 +263,6 @@ public class TestBlockOutputStream {
|
||||||
// Now do a flush. This will flush the data and update the flush length and
|
// Now do a flush. This will flush the data and update the flush length and
|
||||||
// the map.
|
// the map.
|
||||||
key.flush();
|
key.flush();
|
||||||
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
|
||||||
// flush is a sync call, all pending operations will complete
|
// flush is a sync call, all pending operations will complete
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
|
@ -304,7 +302,7 @@ public class TestBlockOutputStream {
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||||
Assert.assertEquals(totalOpCount + 3,
|
Assert.assertEquals(totalOpCount + 3,
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,7 +397,6 @@ public class TestBlockOutputStream {
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||||
Assert.assertEquals(totalOpCount + 3,
|
Assert.assertEquals(totalOpCount + 3,
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,7 +454,6 @@ public class TestBlockOutputStream {
|
||||||
blockOutputStream.getCommitIndex2flushedDataMap().size());
|
blockOutputStream.getCommitIndex2flushedDataMap().size());
|
||||||
|
|
||||||
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
|
||||||
key.close();
|
key.close();
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
|
@ -475,7 +471,7 @@ public class TestBlockOutputStream {
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -540,7 +536,6 @@ public class TestBlockOutputStream {
|
||||||
// Now do a flush. This will flush the data and update the flush length and
|
// Now do a flush. This will flush the data and update the flush length and
|
||||||
// the map.
|
// the map.
|
||||||
key.flush();
|
key.flush();
|
||||||
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -575,7 +570,7 @@ public class TestBlockOutputStream {
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,7 +638,6 @@ public class TestBlockOutputStream {
|
||||||
// Now do a flush. This will flush the data and update the flush length and
|
// Now do a flush. This will flush the data and update the flush length and
|
||||||
// the map.
|
// the map.
|
||||||
key.flush();
|
key.flush();
|
||||||
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -679,7 +673,7 @@ public class TestBlockOutputStream {
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -234,7 +234,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
// and one flush for partial chunk
|
// and one flush for partial chunk
|
||||||
key.flush();
|
key.flush();
|
||||||
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||||
.getIoException()) instanceof ContainerNotOpenException);
|
.getIoException()) instanceof ContainerNotOpenException);
|
||||||
|
|
||||||
|
@ -250,7 +249,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -373,7 +372,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
key.flush();
|
key.flush();
|
||||||
Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
|
Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
|
||||||
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
// now close the stream, It will update the ack length after watchForCommit
|
||||||
key.close();
|
key.close();
|
||||||
Assert
|
Assert
|
||||||
|
@ -384,7 +382,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -517,14 +515,13 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
// Make sure the retryCount is reset after the exception is handled
|
// Make sure the retryCount is reset after the exception is handled
|
||||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
// now close the stream, It will update the ack length after watchForCommit
|
||||||
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
key.close();
|
key.close();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -541,7 +538,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -640,7 +637,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
// and one flush for partial chunk
|
// and one flush for partial chunk
|
||||||
key.flush();
|
key.flush();
|
||||||
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||||
.getIoException()) instanceof ContainerNotOpenException);
|
.getIoException()) instanceof ContainerNotOpenException);
|
||||||
// Make sure the retryCount is reset after the exception is handled
|
// Make sure the retryCount is reset after the exception is handled
|
||||||
|
@ -656,6 +652,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -666,7 +663,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||||
Assert.assertEquals(totalOpCount + 9,
|
Assert.assertEquals(totalOpCount + 9,
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
|
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
||||||
// Written the same data twice
|
// Written the same data twice
|
||||||
String dataString = new String(data1, UTF_8);
|
String dataString = new String(data1, UTF_8);
|
||||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||||
|
@ -777,6 +774,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -787,7 +785,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||||
Assert.assertEquals(totalOpCount + 9,
|
Assert.assertEquals(totalOpCount + 9,
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
|
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
||||||
// Written the same data twice
|
// Written the same data twice
|
||||||
String dataString = new String(data1, UTF_8);
|
String dataString = new String(data1, UTF_8);
|
||||||
validateData(keyName, dataString.concat(dataString).getBytes());
|
validateData(keyName, dataString.concat(dataString).getBytes());
|
||||||
|
@ -913,7 +911,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||||
// commitInfoMap will remain intact as there is no server failure
|
// commitInfoMap will remain intact as there is no server failure
|
||||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
// now close the stream, It will update the ack length after watchForCommit
|
||||||
key.close();
|
key.close();
|
||||||
// make sure the bufferPool is empty
|
// make sure the bufferPool is empty
|
||||||
|
@ -922,7 +919,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -1049,7 +1046,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
|
||||||
// Make sure the retryCount is reset after the exception is handled
|
// Make sure the retryCount is reset after the exception is handled
|
||||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
// now close the stream, It will update the ack length after watchForCommit
|
||||||
key.close();
|
key.close();
|
||||||
Assert
|
Assert
|
||||||
|
@ -1058,7 +1054,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
@ -1075,7 +1071,6 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||||
Assert.assertEquals(totalOpCount + 22,
|
Assert.assertEquals(totalOpCount + 22,
|
||||||
metrics.getTotalOpCount());
|
metrics.getTotalOpCount());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
|
||||||
// Written the same data twice
|
// Written the same data twice
|
||||||
String dataString = new String(data1, UTF_8);
|
String dataString = new String(data1, UTF_8);
|
||||||
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
|
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
|
||||||
|
@ -1203,7 +1198,7 @@ public class TestBlockOutputStreamWithFailures {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
||||||
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
|
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
Assert.assertEquals(pendingWriteChunkCount,
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
Assert.assertEquals(pendingPutBlockCount,
|
||||||
|
|
|
@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.client.rpc;
|
||||||
import org.apache.hadoop.conf.StorageUnit;
|
import org.apache.hadoop.conf.StorageUnit;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
|
||||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||||
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
|
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
@ -68,7 +66,6 @@ public class TestOzoneClientRetriesOnException {
|
||||||
private String volumeName;
|
private String volumeName;
|
||||||
private String bucketName;
|
private String bucketName;
|
||||||
private String keyString;
|
private String keyString;
|
||||||
private XceiverClientManager xceiverClientManager;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a MiniDFSCluster for testing.
|
* Create a MiniDFSCluster for testing.
|
||||||
|
@ -87,6 +84,8 @@ public class TestOzoneClientRetriesOnException {
|
||||||
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
|
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
|
||||||
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
||||||
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
|
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
|
||||||
|
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
|
||||||
|
conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
|
||||||
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
|
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
|
||||||
conf.setQuietMode(false);
|
conf.setQuietMode(false);
|
||||||
cluster = MiniOzoneCluster.newBuilder(conf)
|
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||||
|
@ -101,7 +100,6 @@ public class TestOzoneClientRetriesOnException {
|
||||||
//the easiest way to create an open container is creating a key
|
//the easiest way to create an open container is creating a key
|
||||||
client = OzoneClientFactory.getClient(conf);
|
client = OzoneClientFactory.getClient(conf);
|
||||||
objectStore = client.getObjectStore();
|
objectStore = client.getObjectStore();
|
||||||
xceiverClientManager = new XceiverClientManager(conf);
|
|
||||||
keyString = UUID.randomUUID().toString();
|
keyString = UUID.randomUUID().toString();
|
||||||
volumeName = "testblockoutputstreamwithretries";
|
volumeName = "testblockoutputstreamwithretries";
|
||||||
bucketName = volumeName;
|
bucketName = volumeName;
|
||||||
|
@ -154,9 +152,8 @@ public class TestOzoneClientRetriesOnException {
|
||||||
.getIoException()) instanceof GroupMismatchException);
|
.getIoException()) instanceof GroupMismatchException);
|
||||||
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
|
Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
|
||||||
.contains(pipeline.getId()));
|
.contains(pipeline.getId()));
|
||||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
|
||||||
key.close();
|
key.close();
|
||||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
|
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
|
||||||
validateData(keyName, data1);
|
validateData(keyName, data1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,8 +171,13 @@ public class TestOzoneClientRetriesOnException {
|
||||||
byte[] data1 =
|
byte[] data1 =
|
||||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
||||||
.getBytes(UTF_8);
|
.getBytes(UTF_8);
|
||||||
|
key.write(data1);
|
||||||
|
|
||||||
|
OutputStream stream = entries.get(0).getOutputStream();
|
||||||
|
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||||
|
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||||
|
List<PipelineID> pipelineList = new ArrayList<>();
|
||||||
long containerID;
|
long containerID;
|
||||||
List<Long> containerList = new ArrayList<>();
|
|
||||||
for (BlockOutputStreamEntry entry : entries) {
|
for (BlockOutputStreamEntry entry : entries) {
|
||||||
containerID = entry.getBlockID().getContainerID();
|
containerID = entry.getBlockID().getContainerID();
|
||||||
ContainerInfo container =
|
ContainerInfo container =
|
||||||
|
@ -184,40 +186,18 @@ public class TestOzoneClientRetriesOnException {
|
||||||
Pipeline pipeline =
|
Pipeline pipeline =
|
||||||
cluster.getStorageContainerManager().getPipelineManager()
|
cluster.getStorageContainerManager().getPipelineManager()
|
||||||
.getPipeline(container.getPipelineID());
|
.getPipeline(container.getPipelineID());
|
||||||
XceiverClientSpi xceiverClient =
|
pipelineList.add(pipeline.getId());
|
||||||
xceiverClientManager.acquireClient(pipeline);
|
|
||||||
if (!containerList.contains(containerID)) {
|
|
||||||
xceiverClient.sendCommand(ContainerTestHelper
|
|
||||||
.getCreateContainerRequest(containerID, pipeline));
|
|
||||||
}
|
|
||||||
xceiverClientManager.releaseClient(xceiverClient, false);
|
|
||||||
}
|
}
|
||||||
key.write(data1);
|
ContainerTestHelper.waitForPipelineClose(key, cluster, false);
|
||||||
OutputStream stream = entries.get(0).getOutputStream();
|
|
||||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
|
||||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
|
||||||
ContainerTestHelper.waitForContainerClose(key, cluster);
|
|
||||||
try {
|
try {
|
||||||
key.write(data1);
|
key.write(data1);
|
||||||
Assert.fail("Expected exception not thrown");
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
|
||||||
.getIoException()) instanceof ContainerNotOpenException);
|
.getIoException()) instanceof GroupMismatchException);
|
||||||
Assert.assertTrue(ioe.getMessage().contains(
|
Assert.assertTrue(ioe.getMessage().contains(
|
||||||
"Retry request failed. retries get failed due to exceeded maximum "
|
"Retry request failed. retries get failed due to exceeded maximum "
|
||||||
+ "allowed retries number: 3"));
|
+ "allowed retries number: 3"));
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
key.flush();
|
|
||||||
Assert.fail("Expected exception not thrown");
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
Assert.assertTrue(ioe.getMessage().contains("Stream is closed"));
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
key.close();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
Assert.fail("Expected should not be thrown");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
||||||
|
|
|
@ -1,511 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
|
||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.client.rpc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.StorageUnit;
|
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
||||||
import org.apache.hadoop.hdds.scm.*;
|
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
|
||||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
|
||||||
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
|
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
|
||||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
|
||||||
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
|
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.ratis.protocol.GroupMismatchException;
|
|
||||||
import org.apache.ratis.protocol.RaftRetryFailureException;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class verifies the watchForCommit Handling by client.
|
|
||||||
*/
|
|
||||||
public class TestWatchForCommit {
|
|
||||||
|
|
||||||
private MiniOzoneCluster cluster;
|
|
||||||
private OzoneConfiguration conf;
|
|
||||||
private OzoneClient client;
|
|
||||||
private ObjectStore objectStore;
|
|
||||||
private String volumeName;
|
|
||||||
private String bucketName;
|
|
||||||
private String keyString;
|
|
||||||
private int chunkSize;
|
|
||||||
private int flushSize;
|
|
||||||
private int maxFlushSize;
|
|
||||||
private int blockSize;
|
|
||||||
private StorageContainerLocationProtocolClientSideTranslatorPB
|
|
||||||
storageContainerLocationClient;
|
|
||||||
private static String containerOwner = "OZONE";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a MiniDFSCluster for testing.
|
|
||||||
* <p>
|
|
||||||
* Ozone is made active by setting OZONE_ENABLED = true
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void startCluster(OzoneConfiguration conf) throws Exception {
|
|
||||||
chunkSize = 100;
|
|
||||||
flushSize = 2 * chunkSize;
|
|
||||||
maxFlushSize = 2 * flushSize;
|
|
||||||
blockSize = 2 * maxFlushSize;
|
|
||||||
|
|
||||||
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
|
|
||||||
conf.setTimeDuration(
|
|
||||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
|
|
||||||
1, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
conf.setQuietMode(false);
|
|
||||||
cluster = MiniOzoneCluster.newBuilder(conf)
|
|
||||||
.setNumDatanodes(7)
|
|
||||||
.setBlockSize(blockSize)
|
|
||||||
.setChunkSize(chunkSize)
|
|
||||||
.setStreamBufferFlushSize(flushSize)
|
|
||||||
.setStreamBufferMaxSize(maxFlushSize)
|
|
||||||
.setStreamBufferSizeUnit(StorageUnit.BYTES)
|
|
||||||
.build();
|
|
||||||
cluster.waitForClusterToBeReady();
|
|
||||||
//the easiest way to create an open container is creating a key
|
|
||||||
client = OzoneClientFactory.getClient(conf);
|
|
||||||
objectStore = client.getObjectStore();
|
|
||||||
keyString = UUID.randomUUID().toString();
|
|
||||||
volumeName = "watchforcommithandlingtest";
|
|
||||||
bucketName = volumeName;
|
|
||||||
objectStore.createVolume(volumeName);
|
|
||||||
objectStore.getVolume(volumeName).createBucket(bucketName);
|
|
||||||
storageContainerLocationClient = cluster
|
|
||||||
.getStorageContainerLocationClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Shutdown MiniDFSCluster.
|
|
||||||
*/
|
|
||||||
private void shutdown() {
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getKeyName() {
|
|
||||||
return UUID.randomUUID().toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWatchForCommitWithKeyWrite() throws Exception {
|
|
||||||
// in this case, watch request should fail with RaftRetryFailureException
|
|
||||||
// and will be captured in keyOutputStream and the failover will happen
|
|
||||||
// to a different block
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
|
|
||||||
startCluster(conf);
|
|
||||||
XceiverClientMetrics metrics =
|
|
||||||
XceiverClientManager.getXceiverClientMetrics();
|
|
||||||
long writeChunkCount = metrics.getContainerOpCountMetrics(
|
|
||||||
ContainerProtos.Type.WriteChunk);
|
|
||||||
long putBlockCount = metrics.getContainerOpCountMetrics(
|
|
||||||
ContainerProtos.Type.PutBlock);
|
|
||||||
long pendingWriteChunkCount = metrics.getContainerOpsMetrics(
|
|
||||||
ContainerProtos.Type.WriteChunk);
|
|
||||||
long pendingPutBlockCount = metrics.getContainerOpsMetrics(
|
|
||||||
ContainerProtos.Type.PutBlock);
|
|
||||||
long totalOpCount = metrics.getTotalOpCount();
|
|
||||||
String keyName = getKeyName();
|
|
||||||
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
|
|
||||||
int dataLength = maxFlushSize + 50;
|
|
||||||
// write data more than 1 chunk
|
|
||||||
byte[] data1 =
|
|
||||||
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
|
|
||||||
.getBytes(UTF_8);
|
|
||||||
key.write(data1);
|
|
||||||
// since its hitting the full bufferCondition, it will call watchForCommit
|
|
||||||
// and completes atleast putBlock for first flushSize worth of data
|
|
||||||
Assert.assertTrue(
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
|
|
||||||
<= pendingWriteChunkCount + 2);
|
|
||||||
Assert.assertTrue(
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
|
|
||||||
<= pendingPutBlockCount + 1);
|
|
||||||
Assert.assertEquals(writeChunkCount + 4,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
|
||||||
Assert.assertEquals(putBlockCount + 2,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
|
||||||
Assert.assertEquals(totalOpCount + 6,
|
|
||||||
metrics.getTotalOpCount());
|
|
||||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
|
||||||
KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
|
|
||||||
|
|
||||||
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
|
|
||||||
OutputStream stream = keyOutputStream.getStreamEntries().get(0)
|
|
||||||
.getOutputStream();
|
|
||||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
|
||||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
|
||||||
|
|
||||||
// we have just written data more than flush Size(2 chunks), at this time
|
|
||||||
// buffer pool will have 3 buffers allocated worth of chunk size
|
|
||||||
|
|
||||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
|
||||||
// writtenDataLength as well flushedDataLength will be updated here
|
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
|
||||||
|
|
||||||
Assert.assertEquals(maxFlushSize,
|
|
||||||
blockOutputStream.getTotalDataFlushedLength());
|
|
||||||
|
|
||||||
// since data equals to maxBufferSize is written, this will be a blocking
|
|
||||||
// call and hence will wait for atleast flushSize worth of data to get
|
|
||||||
// acked by all servers right here
|
|
||||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
|
|
||||||
|
|
||||||
// watchForCommit will clean up atleast one entry from the map where each
|
|
||||||
// entry corresponds to flushSize worth of data
|
|
||||||
Assert.assertTrue(
|
|
||||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
|
|
||||||
|
|
||||||
// Now do a flush. This will flush the data and update the flush length and
|
|
||||||
// the map.
|
|
||||||
key.flush();
|
|
||||||
|
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
|
||||||
Assert.assertEquals(writeChunkCount + 5,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
|
||||||
Assert.assertEquals(putBlockCount + 3,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
|
||||||
Assert.assertEquals(totalOpCount + 8,
|
|
||||||
metrics.getTotalOpCount());
|
|
||||||
|
|
||||||
// Since the data in the buffer is already flushed, flush here will have
|
|
||||||
// no impact on the counters and data structures
|
|
||||||
|
|
||||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
|
||||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
|
||||||
|
|
||||||
Assert.assertEquals(dataLength,
|
|
||||||
blockOutputStream.getTotalDataFlushedLength());
|
|
||||||
// flush will make sure one more entry gets updated in the map
|
|
||||||
Assert.assertTrue(
|
|
||||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
|
|
||||||
|
|
||||||
XceiverClientRatis raftClient =
|
|
||||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
|
||||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
|
||||||
Pipeline pipeline = raftClient.getPipeline();
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
|
|
||||||
// again write data with more than max buffer limit. This will call
|
|
||||||
// watchForCommit again. Since the commit will happen 2 way, the
|
|
||||||
// commitInfoMap will get updated for servers which are alive
|
|
||||||
|
|
||||||
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
|
|
||||||
// once exception is hit
|
|
||||||
key.write(data1);
|
|
||||||
|
|
||||||
// As a part of handling the exception, 4 failed writeChunks will be
|
|
||||||
// rewritten plus one partial chunk plus two putBlocks for flushSize
|
|
||||||
// and one flush for partial chunk
|
|
||||||
key.flush();
|
|
||||||
Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
|
|
||||||
.getIoException()) instanceof RaftRetryFailureException);
|
|
||||||
// Make sure the retryCount is reset after the exception is handled
|
|
||||||
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
|
|
||||||
// now close the stream, It will update the ack length after watchForCommit
|
|
||||||
key.close();
|
|
||||||
Assert
|
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
|
||||||
Assert
|
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
Assert.assertEquals(pendingWriteChunkCount,
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
|
||||||
Assert.assertEquals(pendingPutBlockCount,
|
|
||||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
|
|
||||||
Assert.assertEquals(writeChunkCount + 14,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
|
|
||||||
Assert.assertEquals(putBlockCount + 8,
|
|
||||||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
|
||||||
Assert.assertEquals(totalOpCount + 22,
|
|
||||||
metrics.getTotalOpCount());
|
|
||||||
Assert
|
|
||||||
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
|
|
||||||
// make sure the bufferPool is empty
|
|
||||||
Assert
|
|
||||||
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
|
|
||||||
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
|
|
||||||
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
|
|
||||||
validateData(keyName, data1);
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
|
||||||
startCluster(conf);
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
ContainerWithPipeline container1 =
|
|
||||||
storageContainerLocationClient.allocateContainer(
|
|
||||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
|
|
||||||
containerOwner);
|
|
||||||
XceiverClientSpi client = clientManager
|
|
||||||
.acquireClient(container1.getPipeline());
|
|
||||||
Assert.assertEquals(1, client.getRefcount());
|
|
||||||
Assert.assertEquals(container1.getPipeline(),
|
|
||||||
client.getPipeline());
|
|
||||||
Pipeline pipeline = client.getPipeline();
|
|
||||||
XceiverClientReply reply =
|
|
||||||
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
|
|
||||||
container1.getContainerInfo().getContainerID(),
|
|
||||||
client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
long index = reply.getLogIndex();
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
|
|
||||||
try {
|
|
||||||
// just watch for a lo index which in not updated in the commitInfo Map
|
|
||||||
client.watchForCommit(index + 1, 3000);
|
|
||||||
Assert.fail("expected exception not thrown");
|
|
||||||
} catch (Exception e) {
|
|
||||||
Assert.assertTrue(
|
|
||||||
HddsClientUtils.checkForException(e) instanceof TimeoutException);
|
|
||||||
}
|
|
||||||
// After releasing the client, this connection should be closed
|
|
||||||
// and any container operations should fail
|
|
||||||
clientManager.releaseClient(client, false);
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWatchForCommitForRetryfailure() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
|
|
||||||
startCluster(conf);
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
ContainerWithPipeline container1 =
|
|
||||||
storageContainerLocationClient.allocateContainer(
|
|
||||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
|
|
||||||
containerOwner);
|
|
||||||
XceiverClientSpi client = clientManager
|
|
||||||
.acquireClient(container1.getPipeline());
|
|
||||||
Assert.assertEquals(1, client.getRefcount());
|
|
||||||
Assert.assertEquals(container1.getPipeline(),
|
|
||||||
client.getPipeline());
|
|
||||||
Pipeline pipeline = client.getPipeline();
|
|
||||||
XceiverClientReply reply =
|
|
||||||
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
|
|
||||||
container1.getContainerInfo().getContainerID(),
|
|
||||||
client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
long index = reply.getLogIndex();
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
|
|
||||||
// again write data with more than max buffer limit. This wi
|
|
||||||
try {
|
|
||||||
// just watch for a lo index which in not updated in the commitInfo Map
|
|
||||||
client.watchForCommit(index + 1, 20000);
|
|
||||||
Assert.fail("expected exception not thrown");
|
|
||||||
} catch (Exception e) {
|
|
||||||
Assert.assertTrue(HddsClientUtils
|
|
||||||
.checkForException(e) instanceof RaftRetryFailureException);
|
|
||||||
}
|
|
||||||
clientManager.releaseClient(client, false);
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test2WayCommitForRetryfailure() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
|
|
||||||
startCluster(conf);
|
|
||||||
GenericTestUtils.LogCapturer logCapturer =
|
|
||||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
|
|
||||||
ContainerWithPipeline container1 =
|
|
||||||
storageContainerLocationClient.allocateContainer(
|
|
||||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
|
|
||||||
containerOwner);
|
|
||||||
XceiverClientSpi client = clientManager
|
|
||||||
.acquireClient(container1.getPipeline());
|
|
||||||
Assert.assertEquals(1, client.getRefcount());
|
|
||||||
Assert.assertEquals(container1.getPipeline(),
|
|
||||||
client.getPipeline());
|
|
||||||
Pipeline pipeline = client.getPipeline();
|
|
||||||
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
|
|
||||||
XceiverClientReply reply =
|
|
||||||
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
|
|
||||||
container1.getContainerInfo().getContainerID(),
|
|
||||||
client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
|
||||||
reply = client.sendCommandAsync(ContainerTestHelper
|
|
||||||
.getCloseContainer(pipeline,
|
|
||||||
container1.getContainerInfo().getContainerID()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
client.watchForCommit(reply.getLogIndex(), 20000);
|
|
||||||
|
|
||||||
// commitInfo Map will be reduced to 2 here
|
|
||||||
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
|
|
||||||
clientManager.releaseClient(client, false);
|
|
||||||
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
|
|
||||||
Assert.assertTrue(
|
|
||||||
logCapturer.getOutput().contains("RaftRetryFailureException"));
|
|
||||||
Assert
|
|
||||||
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
|
|
||||||
logCapturer.stopCapturing();
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test2WayCommitForTimeoutException() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
|
||||||
startCluster(conf);
|
|
||||||
GenericTestUtils.LogCapturer logCapturer =
|
|
||||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
|
|
||||||
ContainerWithPipeline container1 =
|
|
||||||
storageContainerLocationClient.allocateContainer(
|
|
||||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
|
|
||||||
containerOwner);
|
|
||||||
XceiverClientSpi client = clientManager
|
|
||||||
.acquireClient(container1.getPipeline());
|
|
||||||
Assert.assertEquals(1, client.getRefcount());
|
|
||||||
Assert.assertEquals(container1.getPipeline(),
|
|
||||||
client.getPipeline());
|
|
||||||
Pipeline pipeline = client.getPipeline();
|
|
||||||
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
|
|
||||||
XceiverClientReply reply =
|
|
||||||
client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
|
|
||||||
container1.getContainerInfo().getContainerID(),
|
|
||||||
client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
|
|
||||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
|
||||||
reply = client.sendCommandAsync(ContainerTestHelper
|
|
||||||
.getCloseContainer(pipeline,
|
|
||||||
container1.getContainerInfo().getContainerID()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
client.watchForCommit(reply.getLogIndex(), 3000);
|
|
||||||
|
|
||||||
// commitInfo Map will be reduced to 2 here
|
|
||||||
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
|
|
||||||
clientManager.releaseClient(client, false);
|
|
||||||
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
|
|
||||||
Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
|
|
||||||
Assert
|
|
||||||
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
|
|
||||||
logCapturer.stopCapturing();
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWatchForCommitForGroupMismatchException() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
|
||||||
|
|
||||||
// mark the node stale early so that pipeline gets destroyed quickly
|
|
||||||
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
|
||||||
startCluster(conf);
|
|
||||||
GenericTestUtils.LogCapturer logCapturer =
|
|
||||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
|
|
||||||
ContainerWithPipeline container1 =
|
|
||||||
storageContainerLocationClient.allocateContainer(
|
|
||||||
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
|
|
||||||
containerOwner);
|
|
||||||
XceiverClientSpi client = clientManager
|
|
||||||
.acquireClient(container1.getPipeline());
|
|
||||||
Assert.assertEquals(1, client.getRefcount());
|
|
||||||
Assert.assertEquals(container1.getPipeline(),
|
|
||||||
client.getPipeline());
|
|
||||||
Pipeline pipeline = client.getPipeline();
|
|
||||||
XceiverClientRatis ratisClient = (XceiverClientRatis) client;
|
|
||||||
long containerId = container1.getContainerInfo().getContainerID();
|
|
||||||
XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper
|
|
||||||
.getCreateContainerRequest(containerId, client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
|
|
||||||
List<Pipeline> pipelineList = new ArrayList<>();
|
|
||||||
pipelineList.add(pipeline);
|
|
||||||
ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
|
|
||||||
try {
|
|
||||||
// just watch for a lo index which in not updated in the commitInfo Map
|
|
||||||
//client.watchForCommit(reply.getLogIndex() + 1, 20000);
|
|
||||||
reply = client.sendCommandAsync(ContainerTestHelper
|
|
||||||
.getCreateContainerRequest(containerId, client.getPipeline()));
|
|
||||||
reply.getResponse().get();
|
|
||||||
Assert.fail("Expected exception not thrown");
|
|
||||||
} catch(Exception e) {
|
|
||||||
Assert.assertTrue(HddsClientUtils
|
|
||||||
.checkForException(e) instanceof GroupMismatchException);
|
|
||||||
}
|
|
||||||
clientManager.releaseClient(client, false);
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
|
||||||
long size) throws Exception {
|
|
||||||
return ContainerTestHelper
|
|
||||||
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateData(String keyName, byte[] data) throws Exception {
|
|
||||||
ContainerTestHelper
|
|
||||||
.validateData(keyName, data, objectStore, volumeName, bucketName);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -727,9 +727,7 @@ public final class ContainerTestHelper {
|
||||||
keyOutputStream.getLocationInfoList();
|
keyOutputStream.getLocationInfoList();
|
||||||
List<Long> containerIdList = new ArrayList<>();
|
List<Long> containerIdList = new ArrayList<>();
|
||||||
for (OmKeyLocationInfo info : locationInfoList) {
|
for (OmKeyLocationInfo info : locationInfoList) {
|
||||||
long id = info.getContainerID();
|
containerIdList.add(info.getContainerID());
|
||||||
if (!containerIdList.contains(id))
|
|
||||||
containerIdList.add(id);
|
|
||||||
}
|
}
|
||||||
Assert.assertTrue(!containerIdList.isEmpty());
|
Assert.assertTrue(!containerIdList.isEmpty());
|
||||||
waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
|
waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
|
||||||
|
|
Loading…
Reference in New Issue