diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java index a4304009add..6c40921c174 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.metrics2.MetricsSystem; @@ -37,7 +38,9 @@ public class XceiverClientMetrics { .getSimpleName(); private @Metric MutableCounterLong pendingOps; + private @Metric MutableCounterLong totalOps; private MutableCounterLong[] pendingOpsArray; + private MutableCounterLong[] opsArray; private MutableRate[] containerOpsLatency; private MetricsRegistry registry; @@ -46,12 +49,17 @@ public class XceiverClientMetrics { this.registry = new MetricsRegistry(SOURCE_NAME); this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; + this.opsArray = new MutableCounterLong[numEnumEntries]; this.containerOpsLatency = new MutableRate[numEnumEntries]; for (int i = 0; i < numEnumEntries; i++) { pendingOpsArray[i] = registry.newCounter( "numPending" + ContainerProtos.Type.forNumber(i + 1), "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops", (long) 0); + opsArray[i] = registry + .newCounter("opCount" + ContainerProtos.Type.forNumber(i + 1), + "number of" + ContainerProtos.Type.forNumber(i + 1) + " ops", + (long) 0); containerOpsLatency[i] = registry.newRate( ContainerProtos.Type.forNumber(i + 1) + "Latency", @@ -68,6 +76,8 @@ public class XceiverClientMetrics { public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { pendingOps.incr(); + totalOps.incr(); + opsArray[type.ordinal()].incr(); pendingOpsArray[type.ordinal()].incr(); } @@ -85,6 +95,16 @@ public class XceiverClientMetrics { return pendingOpsArray[type.ordinal()].value(); } + @VisibleForTesting + public long getTotalOpCount() { + return totalOps.value(); + } + + @VisibleForTesting + public long getContainerOpCountMetrics(ContainerProtos.Type type) { + return opsArray[type.ordinal()].value(); + } + public void unRegister() { MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 65241bfa1ac..a2e65e2009f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig; import io.opentracing.Scope; import io.opentracing.util.GlobalTracer; +import org.apache.hadoop.util.Time; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -101,6 +103,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { // create a separate RaftClient for watchForCommit API private RaftClient watchClient; + private XceiverClientMetrics metrics; + /** * Constructs a client. */ @@ -116,6 +120,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { watchClient = null; this.tlsConfig = tlsConfig; this.clientRequestTimeout = timeout; + metrics = XceiverClientManager.getXceiverClientMetrics(); } private void updateCommitInfosMap( @@ -199,6 +204,12 @@ public final class XceiverClientRatis extends XceiverClientSpi { return Objects.requireNonNull(client.get(), "client is null"); } + + @VisibleForTesting + public ConcurrentHashMap getCommitInfoMap() { + return commitInfoMap; + } + private CompletableFuture sendRequestAsync( ContainerCommandRequestProto request) { try (Scope scope = GlobalTracer.get() @@ -301,47 +312,52 @@ public final class XceiverClientRatis extends XceiverClientSpi { public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request) { XceiverClientReply asyncReply = new XceiverClientReply(null); + long requestTime = Time.monotonicNowNanos(); CompletableFuture raftClientReply = sendRequestAsync(request); + metrics.incrPendingContainerOpsMetrics(request.getCmdType()); CompletableFuture containerCommandResponse = - raftClientReply.whenComplete((reply, e) -> LOG.debug( - "received reply {} for request: cmdType={} containerID={}" - + " pipelineID={} traceID={} exception: {}", reply, - request.getCmdType(), request.getContainerID(), - request.getPipelineID(), request.getTraceID(), e)) - .thenApply(reply -> { - try { - // we need to handle RaftRetryFailure Exception - RaftRetryFailureException raftRetryFailureException = - reply.getRetryFailureException(); - if (raftRetryFailureException != null) { - // in case of raft retry failure, the raft client is - // not able to connect to the leader hence the pipeline - // can not be used but this instance of RaftClient will close - // and refreshed again. In case the client cannot connect to - // leader, getClient call will fail. + raftClientReply.whenComplete((reply, e) -> { + LOG.debug("received reply {} for request: cmdType={} containerID={}" + + " pipelineID={} traceID={} exception: {}", reply, + request.getCmdType(), request.getContainerID(), + request.getPipelineID(), request.getTraceID(), e); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + }).thenApply(reply -> { + try { + // we need to handle RaftRetryFailure Exception + RaftRetryFailureException raftRetryFailureException = + reply.getRetryFailureException(); + if (raftRetryFailureException != null) { + // in case of raft retry failure, the raft client is + // not able to connect to the leader hence the pipeline + // can not be used but this instance of RaftClient will close + // and refreshed again. In case the client cannot connect to + // leader, getClient call will fail. - // No need to set the failed Server ID here. Ozone client - // will directly exclude this pipeline in next allocate block - // to SCM as in this case, it is the raft client which is not - // able to connect to leader in the pipeline, though the - // pipeline can still be functional. - throw new CompletionException(raftRetryFailureException); - } - ContainerCommandResponseProto response = - ContainerCommandResponseProto - .parseFrom(reply.getMessage().getContent()); - UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId()); - if (response.getResult() == ContainerProtos.Result.SUCCESS) { - updateCommitInfosMap(reply.getCommitInfos()); - } - asyncReply.setLogIndex(reply.getLogIndex()); - addDatanodetoReply(serverId, asyncReply); - return response; - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); + // No need to set the failed Server ID here. Ozone client + // will directly exclude this pipeline in next allocate block + // to SCM as in this case, it is the raft client which is not + // able to connect to leader in the pipeline, though the + // pipeline can still be functional. + throw new CompletionException(raftRetryFailureException); + } + ContainerCommandResponseProto response = + ContainerCommandResponseProto + .parseFrom(reply.getMessage().getContent()); + UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId()); + if (response.getResult() == ContainerProtos.Result.SUCCESS) { + updateCommitInfosMap(reply.getCommitInfos()); + } + asyncReply.setLogIndex(reply.getLogIndex()); + addDatanodetoReply(serverId, asyncReply); + return response; + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); asyncReply.setResponse(containerCommandResponse); return asyncReply; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 13913ee7a35..cfbb6ae9daf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -45,7 +46,15 @@ import java.util.Collections; import java.util.UUID; import java.util.List; import java.util.ArrayList; -import java.util.concurrent.*; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls @@ -104,15 +113,25 @@ public class BlockOutputStream extends OutputStream { // by all servers private long totalAckDataLength; + // List containing buffers for which the putBlock call will + // update the length in the datanodes. This list will just maintain + // references to the buffers in the BufferPool which will be cleared + // when the watchForCommit acknowledges a putBlock logIndex has been + // committed on all datanodes. This list will be a place holder for buffers + // which got written between successive putBlock calls. + private List bufferList; + // future Map to hold up all putBlock futures private ConcurrentHashMap> futureMap; - // map containing mapping for putBlock logIndex to to flushedDataLength Map. // The map should maintain the keys (logIndexes) in order so that while // removing we always end up updating incremented data flushed length. - private ConcurrentSkipListMap commitIndex2flushedDataMap; + // Also, corresponding to the logIndex, the corresponding list of buffers will + // be released from the buffer pool. + private ConcurrentSkipListMap> + commitIndex2flushedDataMap; private List failedServers; @@ -167,13 +186,15 @@ public class BlockOutputStream extends OutputStream { totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = Collections.emptyList(); + bufferList = null; } + public BlockID getBlockID() { return blockID; } - public long getTotalSuccessfulFlushedData() { + public long getTotalAckDataLength() { return totalAckDataLength; } @@ -185,6 +206,31 @@ public class BlockOutputStream extends OutputStream { return failedServers; } + @VisibleForTesting + public XceiverClientSpi getXceiverClient() { + return xceiverClient; + } + + @VisibleForTesting + public long getTotalDataFlushedLength() { + return totalDataFlushedLength; + } + + @VisibleForTesting + public BufferPool getBufferPool() { + return bufferPool; + } + + @VisibleForTesting + public IOException getIoException() { + return ioException; + } + + @VisibleForTesting + public Map> getCommitIndex2flushedDataMap() { + return commitIndex2flushedDataMap; + } + @Override public void write(int b) throws IOException { checkOpen(); @@ -206,9 +252,9 @@ public class BlockOutputStream extends OutputStream { if (len == 0) { return; } + while (len > 0) { int writeLen; - // Allocate a buffer if needed. The buffer will be allocated only // once as needed and will be reused again for multiple blockOutputStream // entries. @@ -224,8 +270,8 @@ public class BlockOutputStream extends OutputStream { len -= writeLen; writtenDataLength += writeLen; if (shouldFlush()) { - totalDataFlushedLength += streamBufferFlushSize; - handlePartialFlush(); + updateFlushLength(); + executePutBlock(); } // Data in the bufferPool can not exceed streamBufferMaxSize if (isBufferPoolFull()) { @@ -235,7 +281,11 @@ public class BlockOutputStream extends OutputStream { } private boolean shouldFlush() { - return writtenDataLength % streamBufferFlushSize == 0; + return bufferPool.computeBufferData() % streamBufferFlushSize == 0; + } + + private void updateFlushLength() { + totalDataFlushedLength += writtenDataLength - totalDataFlushedLength; } private boolean isBufferPoolFull() { @@ -264,17 +314,17 @@ public class BlockOutputStream extends OutputStream { len -= writeLen; count++; writtenDataLength += writeLen; - if (shouldFlush()) { + // we should not call isBufferFull/shouldFlush here. + // The buffer might already be full as whole data is already cached in + // the buffer. We should just validate + // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to + // call for handling full buffer/flush buffer condition. + if (writtenDataLength % streamBufferFlushSize == 0) { // reset the position to zero as now we will be reading the // next buffer in the list - totalDataFlushedLength += streamBufferFlushSize; - handlePartialFlush(); + updateFlushLength(); + executePutBlock(); } - - // we should not call isBufferFull here. The buffer might already be full - // as whole data is already cached in the buffer. We should just validate - // if we wrote data of size streamBufferMaxSize to call for handling - // full buffer condition. if (writtenDataLength == streamBufferMaxSize) { handleFullBuffer(); } @@ -289,25 +339,22 @@ public class BlockOutputStream extends OutputStream { Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty()); for (long index : indexes) { Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); - long length = commitIndex2flushedDataMap.remove(index); - - // totalAckDataLength replicated yet should always be less than equal to - // the current length being returned from commitIndex2flushedDataMap. - // The below precondition would ensure commitIndex2flushedDataMap entries - // are removed in order of the insertion to the map. - Preconditions.checkArgument(totalAckDataLength < length); - totalAckDataLength = length; + List buffers = commitIndex2flushedDataMap.remove(index); + long length = buffers.stream().mapToLong(value -> { + int pos = value.position(); + Preconditions.checkArgument(pos <= chunkSize); + return pos; + }).sum(); + // totalAckDataLength replicated yet should always be incremented + // with the current length being returned from commitIndex2flushedDataMap. + totalAckDataLength += length; LOG.debug("Total data successfully replicated: " + totalAckDataLength); futureMap.remove(totalAckDataLength); // Flush has been committed to required servers successful. - // just release the current buffer from the buffer pool. - - // every entry removed from the putBlock future Map signifies - // streamBufferFlushSize/chunkSize no of chunks successfully committed. - // Release the buffers from the buffer pool to be reused again. - int chunkCount = (int) (streamBufferFlushSize / chunkSize); - for (int i = 0; i < chunkCount; i++) { - bufferPool.releaseBuffer(); + // just release the current buffer from the buffer pool corresponding + // to the buffers that have been committed with the putBlock call. + for (ByteBuffer byteBuffer : buffers) { + bufferPool.releaseBuffer(byteBuffer); } } } @@ -325,9 +372,10 @@ public class BlockOutputStream extends OutputStream { waitOnFlushFutures(); } } catch (InterruptedException | ExecutionException e) { - adjustBuffersOnException(); - throw new IOException( + ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + adjustBuffersOnException(); + throw ioException; } if (!commitIndex2flushedDataMap.isEmpty()) { watchForCommit( @@ -389,10 +437,14 @@ public class BlockOutputStream extends OutputStream { } private CompletableFuture handlePartialFlush() + ContainerCommandResponseProto> executePutBlock() throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; + Preconditions.checkNotNull(bufferList); + List byteBufferList = bufferList; + bufferList = null; + Preconditions.checkNotNull(byteBufferList); String requestId = traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID; CompletableFuture { @@ -446,9 +503,12 @@ public class BlockOutputStream extends OutputStream { try { handleFlush(); } catch (InterruptedException | ExecutionException e) { - adjustBuffersOnException(); - throw new IOException( + // just set the exception here as well in order to maintain sanctity of + // ioException field + ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + adjustBuffersOnException(); + throw ioException; } } } @@ -456,6 +516,15 @@ public class BlockOutputStream extends OutputStream { private void writeChunk(ByteBuffer buffer) throws IOException { + // This data in the buffer will be pushed to datanode and a reference will + // be added to the bufferList. Once putBlock gets executed, this list will + // be marked null. Hence, during first writeChunk call after every putBlock + // call or during the first call to writeChunk here, the list will be null. + + if (bufferList == null) { + bufferList = new ArrayList<>(); + } + bufferList.add(buffer); // Please note : We are not flipping the slice when we write since // the slices are pointing the currentBuffer start and end as needed for // the chunk write. Also please note, Duplicate does not create a @@ -472,20 +541,36 @@ public class BlockOutputStream extends OutputStream { checkOpen(); // flush the last chunk data residing on the currentBuffer if (totalDataFlushedLength < writtenDataLength) { - ByteBuffer currentBuffer = bufferPool.getBuffer(); - int pos = currentBuffer.position(); - writeChunk(currentBuffer); - totalDataFlushedLength += pos; - handlePartialFlush(); + ByteBuffer currentBuffer = bufferPool.getCurrentBuffer(); + Preconditions.checkArgument(currentBuffer.position() > 0); + if (currentBuffer.position() != chunkSize) { + writeChunk(currentBuffer); + } + // This can be a partially filled chunk. Since we are flushing the buffer + // here, we just limit this buffer to the current position. So that next + // write will happen in new buffer + updateFlushLength(); + executePutBlock(); } waitOnFlushFutures(); + if (!commitIndex2flushedDataMap.isEmpty()) { + // wait for the last commit index in the commitIndex2flushedDataMap + // to get committed to all or majority of nodes in case timeout + // happens. + long lastIndex = + commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) + .max().getAsLong(); + LOG.debug( + "waiting for last flush Index " + lastIndex + " to catch up"); + watchForCommit(lastIndex); + } + // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded // irrespective of whether the commitIndex2flushedDataMap is empty // or not, ensure there is no exception set checkOpen(); - } @Override @@ -494,21 +579,11 @@ public class BlockOutputStream extends OutputStream { && bufferPool != null && bufferPool.getSize() > 0) { try { handleFlush(); - if (!commitIndex2flushedDataMap.isEmpty()) { - // wait for the last commit index in the commitIndex2flushedDataMap - // to get committed to all or majority of nodes in case timeout - // happens. - long lastIndex = - commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v) - .max().getAsLong(); - LOG.debug( - "waiting for last flush Index " + lastIndex + " to catch up"); - watchForCommit(lastIndex); - } } catch (InterruptedException | ExecutionException e) { - adjustBuffersOnException(); - throw new IOException( + ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + adjustBuffersOnException(); + throw ioException; } finally { cleanup(false); } @@ -564,6 +639,10 @@ public class BlockOutputStream extends OutputStream { futureMap.clear(); } futureMap = null; + if (bufferList != null) { + bufferList.clear(); + } + bufferList = null; if (commitIndex2flushedDataMap != null) { commitIndex2flushedDataMap.clear(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index 541e6bd1758..17788c70a6c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -41,7 +41,7 @@ public class BufferPool { currentBufferIndex = -1; } - public ByteBuffer getBuffer() { + public ByteBuffer getCurrentBuffer() { return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex); } @@ -56,7 +56,7 @@ public class BufferPool { * */ public ByteBuffer allocateBufferIfNeeded() { - ByteBuffer buffer = getBuffer(); + ByteBuffer buffer = getCurrentBuffer(); if (buffer != null && buffer.hasRemaining()) { return buffer; } @@ -74,11 +74,14 @@ public class BufferPool { return buffer; } - public void releaseBuffer() { + public void releaseBuffer(ByteBuffer byteBuffer) { // always remove from head of the list and append at last ByteBuffer buffer = bufferList.remove(0); + // Ensure the buffer to be removed is always at the head of the list. + Preconditions.checkArgument(buffer.equals(byteBuffer)); buffer.clear(); bufferList.add(buffer); + Preconditions.checkArgument(currentBufferIndex >= 0); currentBufferIndex--; } @@ -90,6 +93,7 @@ public class BufferPool { public void checkBufferPoolEmpty() { Preconditions.checkArgument(computeBufferData() == 0); } + public long computeBufferData() { return bufferList.stream().mapToInt(value -> value.position()) .sum(); @@ -99,8 +103,12 @@ public class BufferPool { return bufferList.size(); } - ByteBuffer getBuffer(int index) { + public ByteBuffer getBuffer(int index) { return bufferList.get(index); } + int getCurrentBufferIndex() { + return currentBufferIndex; + } + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java index a49f8ae79f1..07aa536c4e5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -74,8 +74,8 @@ public class BlockID { @Override public String toString() { - return new StringBuffer().append(getContainerBlockID().toString()) - .append(" bcId: ") + return new StringBuilder().append(getContainerBlockID().toString()) + .append(" bcsId: ") .append(blockCommitSequenceId) .toString(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index b6de8abda27..fb700da001c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io; import java.io.IOException; import java.io.OutputStream; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -148,11 +149,11 @@ public final class BlockOutputStreamEntry extends OutputStream { } } - long getTotalSuccessfulFlushedData() throws IOException { + long getTotalAckDataLength() { if (outputStream != null) { BlockOutputStream out = (BlockOutputStream) this.outputStream; blockID = out.getBlockID(); - return out.getTotalSuccessfulFlushedData(); + return out.getTotalAckDataLength(); } else { // For a pre allocated block for which no write has been initiated, // the OutputStream will be null here. @@ -295,6 +296,7 @@ public final class BlockOutputStreamEntry extends OutputStream { } } + @VisibleForTesting public OutputStream getOutputStream() { return outputStream; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 3bd572dceff..78f03d21333 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -64,6 +64,13 @@ import java.util.concurrent.TimeoutException; */ public class KeyOutputStream extends OutputStream { + /** + * Defines stream action while calling handleFlushOrClose. + */ + enum StreamAction { + FLUSH, CLOSE, FULL + } + public static final Logger LOG = LoggerFactory.getLogger(KeyOutputStream.class); @@ -326,8 +333,7 @@ public class KeyOutputStream extends OutputStream { } if (current.getRemaining() <= 0) { // since the current block is already written close the stream. - handleFlushOrClose(true); - currentStreamIndex += 1; + handleFlushOrClose(StreamAction.FULL); } len -= writeLen; off += writeLen; @@ -393,19 +399,21 @@ public class KeyOutputStream extends OutputStream { boolean retryFailure = checkForRetryFailure(exception); boolean closedContainerException = false; if (!retryFailure) { - closedContainerException = checkIfContainerIsClosed(exception); + closedContainerException = checkIfContainerIsClosed(t); } PipelineID pipelineId = null; long totalSuccessfulFlushedData = - streamEntry.getTotalSuccessfulFlushedData(); + streamEntry.getTotalAckDataLength(); //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = computeBufferData(); - LOG.warn("Encountered exception {}", exception); - LOG.info( - "The last committed block length is {}, uncommitted data length is {}", + LOG.warn("Encountered exception {}. The last committed block length is {}, " + + "uncommitted data length is {}", exception, totalSuccessfulFlushedData, bufferedDataLen); Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize); + Preconditions.checkArgument( + streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData + == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); @@ -498,7 +506,7 @@ public class KeyOutputStream extends OutputStream { return t instanceof ContainerNotOpenException; } - private Throwable checkForException(IOException ioe) throws IOException { + public Throwable checkForException(IOException ioe) throws IOException { Throwable t = ioe.getCause(); while (t != null) { for (Class cls : OzoneClientUtils @@ -513,7 +521,7 @@ public class KeyOutputStream extends OutputStream { } private long getKeyLength() { - return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition()) + return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()) .sum(); } @@ -535,16 +543,24 @@ public class KeyOutputStream extends OutputStream { @Override public void flush() throws IOException { checkNotClosed(); - handleFlushOrClose(false); + handleFlushOrClose(StreamAction.FLUSH); } /** - * Close or Flush the latest outputStream. - * @param close Flag which decides whether to call close or flush on the + * Close or Flush the latest outputStream depending upon the action. + * This function gets called when while write is going on, the current stream + * gets full or explicit flush or close request is made by client. when the + * stream gets full and we try to close the stream , we might end up hitting + * an exception in the exception handling path, we write the data residing in + * in the buffer pool to a new Block. In cases, as such, when the data gets + * written to new stream , it will be at max half full. In such cases, we + * should just write the data and not close the stream as the block won't be + * completely full. + * @param op Flag which decides whether to call close or flush on the * outputStream. * @throws IOException In case, flush or close fails with exception. */ - private void handleFlushOrClose(boolean close) throws IOException { + private void handleFlushOrClose(StreamAction op) throws IOException { if (streamEntries.size() == 0) { return; } @@ -561,10 +577,21 @@ public class KeyOutputStream extends OutputStream { if (failedServers != null && !failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } - if (close) { + switch (op) { + case CLOSE: entry.close(); - } else { + break; + case FULL: + if (entry.getRemaining() == 0) { + entry.close(); + currentStreamIndex++; + } + break; + case FLUSH: entry.flush(); + break; + default: + throw new IOException("Invalid Operation"); } } catch (IOException ioe) { handleException(entry, streamIndex, ioe); @@ -587,7 +614,7 @@ public class KeyOutputStream extends OutputStream { } closed = true; try { - handleFlushOrClose(true); + handleFlushOrClose(StreamAction.CLOSE); if (keyArgs != null) { // in test, this could be null removeEmptyBlocks(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 7b65c4689cd..e94f0ac3e50 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -249,6 +250,7 @@ public interface MiniOzoneCluster { protected Optional streamBufferFlushSize = Optional.empty(); protected Optional streamBufferMaxSize = Optional.empty(); protected Optional blockSize = Optional.empty(); + protected Optional streamBufferSizeUnit = Optional.empty(); // Use relative smaller number of handlers for testing protected int numOfOmHandlers = 20; protected int numOfScmHandlers = 20; @@ -434,6 +436,11 @@ public interface MiniOzoneCluster { return this; } + public Builder setStreamBufferSizeUnit(StorageUnit unit) { + this.streamBufferSizeUnit = Optional.of(unit); + return this; + } + public Builder setOMServiceId(String serviceId) { this.omServiceId = serviceId; return this; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 521a4f1c058..e746f3370fd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -446,14 +446,18 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } + + if (!streamBufferSizeUnit.isPresent()) { + streamBufferSizeUnit = Optional.of(StorageUnit.MB); + } conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, - chunkSize.get(), StorageUnit.MB); + chunkSize.get(), streamBufferSizeUnit.get()); conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, - streamBufferFlushSize.get(), StorageUnit.MB); + streamBufferFlushSize.get(), streamBufferSizeUnit.get()); conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, - streamBufferMaxSize.get(), StorageUnit.MB); + streamBufferMaxSize.get(), streamBufferSizeUnit.get()); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(), - StorageUnit.MB); + streamBufferSizeUnit.get()); configureTrace(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java new file mode 100644 index 00000000000..32bef12ae70 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -0,0 +1,690 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests BlockOutputStream class. + */ +public class TestBlockOutputStream { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf = new OzoneConfiguration(); + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testblockoutputstream"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testBufferCaching() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = 50; + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data less than a chunk size, the data will just sit + // in the buffer, with only one buffer being allocated in the buffer pool + + Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize()); + //Just the writtenDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + // no data will be flushed till now + Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(pendingWriteChunkCount, + XceiverClientManager.getXceiverClientMetrics() + .getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + XceiverClientManager.getXceiverClientMetrics() + .getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // commitIndex2FlushedData Map will be empty here + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().isEmpty()); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + // flush is a sync call, all pending operations will complete + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + // we have just written data less than a chunk size, the data will just sit + // in the buffer, with only one buffer being allocated in the buffer pool + + Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(0, + blockOutputStream.getBufferPool().getBuffer(0).position()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + // flush ensures watchForCommit updates the total length acknowledged + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 2, + metrics.getTotalOpCount()); + + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + @Test + public void testFlushChunk() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = flushSize; + // write data equal to 2 chunks + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertEquals(pendingWriteChunkCount + 2, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount + 1, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data equal flush Size = 2 chunks, at this time + // buffer pool will have 2 buffers allocated worth of chunk size + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + // flush is a sync call, all pending operations will complete + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + // flush ensures watchForCommit updates the total length acknowledged + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + @Test + public void testMultiChunkWrite() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = chunkSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertEquals(pendingWriteChunkCount + 1, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data equal flush Size > 1 chunk, at this time + // buffer pool will have 2 buffers allocated worth of chunk size + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + // since data written is still less than flushLength, flushLength will + // still be 0. + Assert.assertEquals(0, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + Assert.assertEquals(writeChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + // flush ensures watchForCommit updates the total length acknowledged + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 3, + metrics.getTotalOpCount()); + validateData(keyName, data1); + } + + @Test + public void testMultiChunkWrite2() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = flushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertEquals(pendingWriteChunkCount + 2, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount + 1, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(3, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(flushSize, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + Assert.assertEquals(flushSize, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertEquals(0, + blockOutputStream.getCommitIndex2flushedDataMap().size()); + + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); + key.close(); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 5, + metrics.getTotalOpCount()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + @Test + public void testFullBufferCondition() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + @Test + public void testWriteWithExceedingMaxBufferLimit() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java new file mode 100644 index 00000000000..671f16caa11 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.ratis.protocol.AlreadyClosedException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests failure detection and handling in BlockOutputStream Class. + */ +public class TestBlockOutputStreamWithFailures { + + private static MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private OzoneClient client; + private ObjectStore objectStore; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private String volumeName; + private String bucketName; + private String keyString; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @Before + public void init() throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(7) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testblockoutputstream"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testWatchForCommitWithCloseContainerException() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 4 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // flush is a sync call, all pending operations will complete + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // Close the containers on the Datanode and write more data + ContainerTestHelper.waitForContainerClose(key, cluster); + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof ContainerNotOpenException); + + // commitInfoMap will remain intact as there is no server failure + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + // Written the same data twice + String dataString = new String(data1, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void testWatchForCommitDatanodeFailure() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes at least putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + // since data written is still less than flushLength, flushLength will + // still be 0. + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // ack'd by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast flushSize worth of data buffer + // where each entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() == 0); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + + // again write data with more than max buffer limit. This will call + // watchForCommit again. Since the commit will happen 2 way, the + // commitInfoMap will get updated for servers which are alive + key.write(data1); + + key.flush(); + Assert.assertEquals(2, raftClient.getCommitInfoMap().size()); + + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + + // in total, there are 8 full write chunks + 2 partial chunks written + Assert.assertEquals(writeChunkCount + 10, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // 4 flushes at flushSize boundaries + 2 flush for partial chunks + Assert.assertEquals(putBlockCount + 6, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 16, + metrics.getTotalOpCount()); + // Written the same data twice + String dataString = new String(data1, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes()); + } + + @Test + public void test2DatanodesFailure() throws Exception { + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingWriteChunkCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.WriteChunk); + long pendingPutBlockCount = metrics.getContainerOpsMetrics( + ContainerProtos.Type.PutBlock); + long totalOpCount = metrics.getTotalOpCount(); + String keyName = getKeyName(); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + int dataLength = maxFlushSize + 50; + // write data more than 1 chunk + byte[] data1 = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(data1); + // since its hitting the full bufferCondition, it will call watchForCommit + // and completes atleast putBlock for first flushSize worth of data + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk) + <= pendingWriteChunkCount + 2); + Assert.assertTrue( + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock) + <= pendingPutBlockCount + 1); + Assert.assertEquals(writeChunkCount + 4, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 6, + metrics.getTotalOpCount()); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream(); + + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + OutputStream stream = keyOutputStream.getStreamEntries().get(0) + .getOutputStream(); + Assert.assertTrue(stream instanceof BlockOutputStream); + BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + + // we have just written data more than flush Size(2 chunks), at this time + // buffer pool will have 3 buffers allocated worth of chunk size + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + // writtenDataLength as well flushedDataLength will be updated here + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(maxFlushSize, + blockOutputStream.getTotalDataFlushedLength()); + + // since data equals to maxBufferSize is written, this will be a blocking + // call and hence will wait for atleast flushSize worth of data to get + // acked by all servers right here + Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize); + + // watchForCommit will clean up atleast one entry from the map where each + // entry corresponds to flushSize worth of data + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); + + // Now do a flush. This will flush the data and update the flush length and + // the map. + key.flush(); + + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 5, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 8, + metrics.getTotalOpCount()); + + // Since the data in the buffer is already flushed, flush here will have + // no impact on the counters and data structures + + Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + + Assert.assertEquals(dataLength, + blockOutputStream.getTotalDataFlushedLength()); + // flush will make sure one more entry gets updated in the map + Assert.assertTrue( + blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); + + XceiverClientRatis raftClient = + (XceiverClientRatis) blockOutputStream.getXceiverClient(); + Assert.assertEquals(3, raftClient.getCommitInfoMap().size()); + Pipeline pipeline = raftClient.getPipeline(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + // again write data with more than max buffer limit. This will call + // watchForCommit again. Since the commit will happen 2 way, the + // commitInfoMap will get updated for servers which are alive + + // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here + // once exception is hit + key.write(data1); + + // As a part of handling the exception, 4 failed writeChunks will be + // rewritten plus one partial chunk plus two putBlocks for flushSize + // and one flush for partial chunk + key.flush(); + Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream + .getIoException()) instanceof AlreadyClosedException); + // now close the stream, It will update the ack length after watchForCommit + key.close(); + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + Assert.assertEquals(pendingWriteChunkCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(pendingPutBlockCount, + metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(writeChunkCount + 14, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + Assert.assertEquals(putBlockCount + 8, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); + Assert.assertEquals(totalOpCount + 22, + metrics.getTotalOpCount()); + Assert + .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + // make sure the bufferPool is empty + Assert + .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); + validateData(keyName, data1); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 3124e772eef..8be1cccb50f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -338,17 +338,8 @@ public class TestCloseContainerHandlingByClient { private void waitForContainerClose(OzoneOutputStream outputStream) throws Exception { - KeyOutputStream keyOutputStream = - (KeyOutputStream) outputStream.getOutputStream(); - List locationInfoList = - keyOutputStream.getLocationInfoList(); - List containerIdList = new ArrayList<>(); - for (OmKeyLocationInfo info : locationInfoList) { - containerIdList.add(info.getContainerID()); - } - Assert.assertTrue(!containerIdList.isEmpty()); ContainerTestHelper - .waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + .waitForContainerClose(outputStream, cluster); } @Ignore // test needs to be fixed after close container is handled for diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 0b97a00430f..f08a07f62e2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.common.Checksum; @@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; @@ -719,6 +721,21 @@ public final class ContainerTestHelper { return String.format("%1$" + length + "s", string); } + public static void waitForContainerClose(OzoneOutputStream outputStream, + MiniOzoneCluster cluster) throws Exception { + KeyOutputStream keyOutputStream = + (KeyOutputStream) outputStream.getOutputStream(); + List locationInfoList = + keyOutputStream.getLocationInfoList(); + List containerIdList = new ArrayList<>(); + for (OmKeyLocationInfo info : locationInfoList) { + containerIdList.add(info.getContainerID()); + } + Assert.assertTrue(!containerIdList.isEmpty()); + ContainerTestHelper + .waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + } + public static void waitForContainerClose(MiniOzoneCluster cluster, Long... containerIdList) throws ContainerNotFoundException, PipelineNotFoundException, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index bb4e6768171..14db90d8cdb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -134,7 +134,7 @@ public class TestBlockDeletion { String keyName = UUID.randomUUID().toString(); OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, - ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>()); + ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); for (int i = 0; i < 100; i++) { out.write(value.getBytes()); }