From 671fd6524b2640474de2bc3b8dbaa0a3cf7fcf01 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 13 Nov 2018 23:39:14 +0530 Subject: [PATCH] HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 28 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 65 ++- .../hdds/scm/storage/ChunkOutputStream.java | 448 ++++++++++++++---- .../hdds/scm/XceiverClientAsyncReply.java | 98 ++++ .../hadoop/hdds/scm/XceiverClientSpi.java | 12 +- .../scm/storage/ContainerProtocolCalls.java | 57 ++- .../apache/hadoop/ozone/OzoneConfigKeys.java | 24 +- .../src/main/resources/ozone-default.xml | 26 +- .../keyvalue/impl/BlockManagerImpl.java | 3 + .../hadoop/ozone/client/OzoneClientUtils.java | 27 -- .../client/io/ChunkGroupOutputStream.java | 341 +++++++------ .../hadoop/ozone/client/rpc/RpcClient.java | 27 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 45 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 19 + .../apache/hadoop/ozone/RatisTestHelper.java | 2 +- .../TestCloseContainerHandlingByClient.java | 252 +++------- .../TestContainerStateMachineFailures.java | 20 +- .../rpc/TestFailureHandlingByClient.java | 213 +++++++++ .../ozone/container/ContainerTestHelper.java | 34 ++ .../ozoneimpl/TestOzoneContainer.java | 2 +- .../ozone/scm/TestXceiverClientMetrics.java | 3 +- .../web/TestOzoneRestWithMiniCluster.java | 2 +- .../storage/DistributedStorageHandler.java | 42 +- .../hadoop/ozone/freon/TestDataValidate.java | 6 + .../ozone/freon/TestRandomKeyGenerator.java | 6 + 25 files changed, 1250 insertions(+), 552 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index cc34e27d27b..9acd832a684 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A Client for the storageContainer protocol. @@ -163,7 +165,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { // In case the command gets retried on a 2nd datanode, // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. - responseProto = sendCommandAsync(request, dn).get(); + responseProto = sendCommandAsync(request, dn).getResponse().get(); if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { break; } @@ -197,13 +199,23 @@ public class XceiverClientGrpc extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture sendCommandAsync( + public XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - return sendCommandAsync(request, pipeline.getFirstNode()); + XceiverClientAsyncReply asyncReply = + sendCommandAsync(request, pipeline.getFirstNode()); + + // TODO : for now make this API sync in nature as async requests are + // served out of order over XceiverClientGrpc. This needs to be fixed + // if this API is to be used for I/O path. Currently, this is not + // used for Read/Write Operation but for tests. + if (!HddsUtils.isReadOnly(request)) { + asyncReply.getResponse().get(); + } + return asyncReply; } - private CompletableFuture sendCommandAsync( + private XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) throws IOException, ExecutionException, InterruptedException { if (closed) { @@ -257,7 +269,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { }); requestObserver.onNext(request); requestObserver.onCompleted(); - return replyFuture; + return new XceiverClientAsyncReply(replyFuture); } private void reconnect(DatanodeDetails dn) @@ -288,6 +300,12 @@ public class XceiverClientGrpc extends XceiverClientSpi { // For stand alone pipeline, there is no notion called destroy pipeline. } + @Override + public void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException { + // there is no notion of watch for commit index in standalone pipeline + }; + /** * Returns pipeline Type. * diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index f38fd3badf4..e4b711afa97 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -50,9 +50,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; /** @@ -192,9 +195,22 @@ public final class XceiverClientRatis extends XceiverClientSpi { getClient().sendAsync(() -> byteString); } - public void watchForCommit(long index, long timeout) throws Exception { - getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED) - .get(timeout, TimeUnit.MILLISECONDS); + @Override + public void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException { + // TODO: Create a new Raft client instance to watch + CompletableFuture replyFuture = getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); + try { + replyFuture.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException toe) { + LOG.warn("3 way commit failed ", toe); + getClient() + .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) + .get(timeout, TimeUnit.MILLISECONDS); + LOG.info("Could not commit " + index + " to all the nodes." + + "Committed by majority."); + } } /** * Sends a given command to server gets a waitable future back. @@ -204,18 +220,37 @@ public final class XceiverClientRatis extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture sendCommandAsync( + public XceiverClientAsyncReply sendCommandAsync( ContainerCommandRequestProto request) { - return sendRequestAsync(request).whenComplete((reply, e) -> - LOG.debug("received reply {} for request: {} exception: {}", request, - reply, e)) - .thenApply(reply -> { - try { - return ContainerCommandResponseProto.parseFrom( - reply.getMessage().getContent()); - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); + XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null); + CompletableFuture raftClientReply = + sendRequestAsync(request); + Collection commitInfos = + new ArrayList<>(); + CompletableFuture containerCommandResponse = + raftClientReply.whenComplete((reply, e) -> LOG + .debug("received reply {} for request: {} exception: {}", request, + reply, e)) + .thenApply(reply -> { + try { + ContainerCommandResponseProto response = + ContainerCommandResponseProto + .parseFrom(reply.getMessage().getContent()); + reply.getCommitInfos().forEach(e -> { + XceiverClientAsyncReply.CommitInfo commitInfo = + new XceiverClientAsyncReply.CommitInfo( + e.getServer().getAddress(), e.getCommitIndex()); + commitInfos.add(commitInfo); + asyncReply.setCommitInfos(commitInfos); + asyncReply.setLogIndex(reply.getLogIndex()); + }); + return response; + } catch (InvalidProtocolBufferException e) { + throw new CompletionException(e); + } + }); + asyncReply.setResponse(containerCommandResponse); + return asyncReply; } + } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 4e881c434c3..bdc6a83dc5f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hdds.scm.storage; - - import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -29,16 +29,24 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.client.BlockID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.UUID; - +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .putBlock; + .putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .writeChunk; + .writeChunkAsync; /** * An {@link OutputStream} used by the REST service in combination with the @@ -57,6 +65,8 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls * through to the container. */ public class ChunkOutputStream extends OutputStream { + public static final Logger LOG = + LoggerFactory.getLogger(ChunkOutputStream.class); private BlockID blockID; private final String key; @@ -64,67 +74,97 @@ public class ChunkOutputStream extends OutputStream { private final BlockData.Builder containerBlockData; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; - private ByteBuffer buffer; private final String streamId; private int chunkIndex; private int chunkSize; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private ByteBuffer buffer; + // The IOException will be set by response handling thread in case there is an + // exception received in the response. If the exception is set, the next + // request will fail upfront. + private IOException ioException; + private ExecutorService responseExecutor; + + // position of the buffer where the last flush was attempted + private int lastFlushPos; + + // position of the buffer till which the flush was successfully + // acknowledged by all nodes in pipeline + private int lastSuccessfulFlushIndex; + + // list to hold up all putBlock futures + private List> + futureList; + // list maintaining commit indexes for putBlocks + private List commitIndexList; /** * Creates a new ChunkOutputStream. * - * @param blockID block ID - * @param key chunk key + * @param blockID block ID + * @param key chunk key * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param traceID container protocol call args - * @param chunkSize chunk size + * @param xceiverClient client to perform container calls + * @param traceID container protocol call args + * @param chunkSize chunk size */ public ChunkOutputStream(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String traceID, int chunkSize) { + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + String traceID, int chunkSize, long streamBufferFlushSize, + long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) { this.blockID = blockID; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; - KeyValue keyValue = KeyValue.newBuilder() - .setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = BlockData.newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); + KeyValue keyValue = + KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); + this.containerBlockData = + BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; - } + this.streamBufferFlushSize = streamBufferFlushSize; + this.streamBufferMaxSize = streamBufferMaxSize; + this.watchTimeout = watchTimeout; + this.buffer = buffer; + this.ioException = null; - public ByteBuffer getBuffer() { - return buffer; + // A single thread executor handle the responses of async requests + responseExecutor = Executors.newSingleThreadExecutor(); + commitIndexList = new ArrayList<>(); + lastSuccessfulFlushIndex = 0; + futureList = new ArrayList<>(); + lastFlushPos = 0; } public BlockID getBlockID() { return blockID; } + public int getLastSuccessfulFlushIndex() { + return lastSuccessfulFlushIndex; + } + + @Override public void write(int b) throws IOException { checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } + byte[] buf = new byte[1]; + buf[0] = (byte) b; + write(buf, 0, 1); } @Override - public void write(byte[] b, int off, int len) - throws IOException { + public void write(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { @@ -132,93 +172,300 @@ public class ChunkOutputStream extends OutputStream { } checkOpen(); while (len > 0) { - int writeLen = Math.min(chunkSize - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); + int writeLen; + writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len); buffer.put(b, off, writeLen); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); + if (buffer.position() % chunkSize == 0) { + int pos = buffer.position() - chunkSize; + int limit = buffer.position(); + writeChunk(pos, limit); } off += writeLen; len -= writeLen; + if (buffer.position() >= streamBufferFlushSize + && buffer.position() % streamBufferFlushSize == 0) { + + lastFlushPos = buffer.position(); + futureList.add(handlePartialFlush()); + } + if (buffer.position() >= streamBufferMaxSize + && buffer.position() % streamBufferMaxSize == 0) { + handleFullBuffer(); + } + } + } + + /** + * Will be called on the retryPath in case closedContainerException/ + * TimeoutException. + * @param len length of data to write + * @throws IOException if error occured + */ + + // In this case, the data is already cached in the buffer. + public void writeOnRetry(int len) throws IOException { + if (len == 0) { + return; + } + int off = 0; + checkOpen(); + while (len > 0) { + int writeLen; + writeLen = Math.min(chunkSize, len); + if (writeLen == chunkSize) { + int pos = off; + int limit = pos + chunkSize; + writeChunk(pos, limit); + } + off += writeLen; + len -= writeLen; + if (off % streamBufferFlushSize == 0) { + lastFlushPos = off; + futureList.add(handlePartialFlush()); + } + if (off % streamBufferMaxSize == 0) { + handleFullBuffer(); + } + } + } + + private void handleResponse( + ContainerProtos.ContainerCommandResponseProto response, + XceiverClientAsyncReply asyncReply) { + validateResponse(response); + discardBuffer(asyncReply); + } + + private void discardBuffer(XceiverClientAsyncReply asyncReply) { + if (!commitIndexList.isEmpty()) { + long index = commitIndexList.get(0); + if (checkIfBufferDiscardRequired(asyncReply, index)) { + updateFlushIndex(); + } + } + } + + /** + * just update the lastSuccessfulFlushIndex. Since we have allocated + * the buffer more than the streamBufferMaxSize, we can keep on writing + * to the buffer. In case of failure, we will read the data starting from + * lastSuccessfulFlushIndex. + */ + private void updateFlushIndex() { + lastSuccessfulFlushIndex += streamBufferFlushSize; + LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex); + if (!commitIndexList.isEmpty()) { + commitIndexList.remove(0); + futureList.remove(0); + } + + } + /** + * Check if the last commitIndex stored at the beginning of the + * commitIndexList is less than equal to current commitInfo indexes. + * If its true, the buffer has been successfully flushed till the + * last position where flush happened. + */ + private boolean checkIfBufferDiscardRequired( + XceiverClientAsyncReply asyncReply, long commitIndex) { + if (asyncReply.getCommitInfos() != null) { + for (XceiverClientAsyncReply.CommitInfo info : asyncReply + .getCommitInfos()) { + if (info.getCommitIndex() < commitIndex) { + return false; + } + } + } + return true; + } + + /** + * This is a blocking call.It will wait for the flush till the commit index + * at the head of the commitIndexList gets replicated to all or majority. + * @throws IOException + */ + private void handleFullBuffer() throws IOException { + if (!commitIndexList.isEmpty()) { + watchForCommit(commitIndexList.get(0)); + } + } + + /** + * calls watchForCommit API of the Ratis Client. For Standalone client, + * it is a no op. + * @param commitIndex log index to watch for + * @throws IOException IOException in case watch gets timed out + */ + private void watchForCommit(long commitIndex) throws IOException { + checkOpen(); + Preconditions.checkState(!commitIndexList.isEmpty()); + try { + xceiverClient.watchForCommit(commitIndex, watchTimeout); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn("watchForCommit failed for index " + commitIndex, e); + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + } + + private CompletableFuture handlePartialFlush() + throws IOException { + String requestId = + traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID; + try { + XceiverClientAsyncReply asyncReply = + putBlockAsync(xceiverClient, containerBlockData.build(), requestId); + CompletableFuture future = + asyncReply.getResponse(); + + return future.thenApplyAsync(e -> { + handleResponse(e, asyncReply); + // if the ioException is not set, putBlock is successful + if (ioException == null) { + LOG.debug( + "Adding index " + asyncReply.getLogIndex() + " commitList size " + + commitIndexList.size()); + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getPutBlock().getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID = responseBlockID; + long index = asyncReply.getLogIndex(); + // for standalone protocol, logIndex will always be 0. + if (index != 0) { + commitIndexList.add(index); + } else { + updateFlushIndex(); + } + } + return e; + }, responseExecutor); + } catch (IOException | InterruptedException | ExecutionException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); } } @Override public void flush() throws IOException { - checkOpen(); - if (buffer.position() > 0) { - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); + if (xceiverClientManager != null && xceiverClient != null + && buffer != null) { + checkOpen(); + if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer + .position()) { + try { + + // flush the last chunk data residing on the buffer + if (buffer.position() % chunkSize > 0) { + int pos = buffer.position() - (buffer.position() % chunkSize); + writeChunk(pos, buffer.position()); + } + if (lastFlushPos != buffer.position()) { + lastFlushPos = buffer.position(); + handlePartialFlush(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + combinedFuture.get(); + // just check again if the exception is hit while waiting for the + // futures to ensure flush has indeed succeeded + checkOpen(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + } } } + private void writeChunk(int pos, int limit) throws IOException { + // Please note : We are not flipping the slice when we write since + // the slices are pointing the buffer start and end as needed for + // the chunk write. Also please note, Duplicate does not create a + // copy of data, it only creates metadata that points to the data + // stream. + ByteBuffer chunk = buffer.duplicate(); + chunk.position(pos); + chunk.limit(limit); + writeChunkToContainer(chunk); + } + @Override public void close() throws IOException { if (xceiverClientManager != null && xceiverClient != null && buffer != null) { - if (buffer.position() > 0) { - writeChunkToContainer(); - } try { - ContainerProtos.PutBlockResponseProto responseProto = - putBlock(xceiverClient, containerBlockData.build(), traceID); - BlockID responseBlockID = BlockID.getFromProtobuf( - responseProto.getCommittedBlockLength().getBlockID()); - Preconditions.checkState(blockID.getContainerBlockID() - .equals(responseBlockID.getContainerBlockID())); - // updates the bcsId of the block - blockID = responseBlockID; - } catch (IOException e) { + if (buffer.position() > lastFlushPos) { + int pos = buffer.position() - (buffer.position() % chunkSize); + writeChunk(pos, buffer.position()); + futureList.add(handlePartialFlush()); + } + CompletableFuture combinedFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + + // wait for all the transactions to complete + combinedFuture.get(); + + // irrespective of whether the commitIndexList is empty or not, + // ensure there is no exception set(For Standalone Protocol) + checkOpen(); + if (!commitIndexList.isEmpty()) { + // wait for the last commit index in the commitIndexList to get + // committed to all or majority of nodes in case timeout happens. + long lastIndex = commitIndexList.get(commitIndexList.size() - 1); + LOG.debug( + "waiting for last flush Index " + lastIndex + " to catch up"); + watchForCommit(lastIndex); + updateFlushIndex(); + } + } catch (InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { cleanup(); } } + // clear the buffer + buffer.clear(); + } + + private void validateResponse( + ContainerProtos.ContainerCommandResponseProto responseProto) { + try { + ContainerProtocolCalls.validateContainerResponse(responseProto); + } catch (StorageContainerException sce) { + ioException = new IOException( + "Unexpected Storage Container Exception: " + sce.toString(), sce); + } } public void cleanup() { - xceiverClientManager.releaseClient(xceiverClient); + if (xceiverClientManager != null) { + xceiverClientManager.releaseClient(xceiverClient); + } xceiverClientManager = null; xceiverClient = null; - buffer = null; + if (futureList != null) { + futureList.clear(); + } + futureList = null; + commitIndexList = null; + responseExecutor.shutdown(); } /** - * Checks if the stream is open. If not, throws an exception. + * Checks if the stream is open or exception has occured. + * If not, throws an exception. * * @throws IOException if stream is closed */ private void checkOpen() throws IOException { if (xceiverClient == null) { throw new IOException("ChunkOutputStream has been closed."); - } - } - - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } + } else if (ioException != null) { + throw ioException; } } @@ -228,23 +475,32 @@ public class ChunkOutputStream extends OutputStream { * * @throws IOException if there is an I/O error while performing the call */ - private void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( - DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + ++chunkIndex) - .setOffset(0) - .setLen(data.size()) - .build(); + private void writeChunkToContainer(ByteBuffer chunk) throws IOException { + int effectiveChunkSize = chunk.remaining(); + ByteString data = ByteString.copyFrom(chunk); + ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName( + DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_" + + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build(); + // generate a unique requestId + String requestId = + traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo + .getChunkName(); try { - writeChunk(xceiverClient, chunk, blockID, data, traceID); - } catch (IOException e) { + XceiverClientAsyncReply asyncReply = + writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId); + CompletableFuture future = + asyncReply.getResponse(); + future.thenApplyAsync(e -> { + handleResponse(e, asyncReply); + return e; + }, responseExecutor); + } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } - containerBlockData.addChunks(chunk); + LOG.debug( + "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID + + " length " + chunk.remaining()); + containerBlockData.addChunks(chunkInfo); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java new file mode 100644 index 00000000000..0d7e1bccff2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * This class represents the Async reply from XceiverClient. + */ +public class XceiverClientAsyncReply { + + private CompletableFuture response; + private Long logIndex; + private Collection commitInfos; + + public XceiverClientAsyncReply( + CompletableFuture response) { + this(response, 0, null); + } + + public XceiverClientAsyncReply( + CompletableFuture response, long index, + Collection commitInfos) { + this.commitInfos = commitInfos; + this.logIndex = index; + this.response = response; + } + + /** + * A class having details about latest commitIndex for each server in the + * Ratis pipeline. For Standalone pipeline, commitInfo will be null. + */ + public static class CommitInfo { + + private final String server; + + private final Long commitIndex; + + public CommitInfo(String server, long commitIndex) { + this.server = server; + this.commitIndex = commitIndex; + } + + public String getServer() { + return server; + } + + public long getCommitIndex() { + return commitIndex; + } + } + + public Collection getCommitInfos() { + return commitInfos; + } + + public CompletableFuture getResponse() { + return response; + } + + public long getLogIndex() { + return logIndex; + } + + public void setCommitInfos(Collection commitInfos) { + this.commitInfos = commitInfos; + } + + public void setLogIndex(Long logIndex) { + this.logIndex = logIndex; + } + + public void setResponse( + CompletableFuture response) { + this.response = response; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index b36315e88b5..9eb49aed9de 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -28,8 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -98,7 +98,10 @@ public abstract class XceiverClientSpi implements Closeable { public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { try { - return sendCommandAsync(request).get(); + XceiverClientAsyncReply reply; + reply = sendCommandAsync(request); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; } catch (ExecutionException | InterruptedException e) { throw new IOException("Failed to command " + request, e); } @@ -111,7 +114,7 @@ public abstract class XceiverClientSpi implements Closeable { * @return Response to the command * @throws IOException */ - public abstract CompletableFuture + public abstract XceiverClientAsyncReply sendCommandAsync(ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException; @@ -132,4 +135,7 @@ public abstract class XceiverClientSpi implements Closeable { * @return - {Stand_Alone, Ratis or Chained} */ public abstract HddsProtos.ReplicationType getPipelineType(); + + public abstract void watchForCommit(long index, long timeout) + throws InterruptedException, ExecutionException, TimeoutException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index c1d90a5c0bd..04f4cbc8d03 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -64,6 +65,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * Implementation of all container protocol calls performed by Container @@ -162,6 +164,31 @@ public final class ContainerProtocolCalls { return response.getPutBlock(); } + /** + * Calls the container protocol to put a container block. + * + * @param xceiverClient client to perform call + * @param containerBlockData block data to identify container + * @param traceID container protocol call args + * @return putBlockResponse + * @throws Exception if there is an error while performing the call + */ + public static XceiverClientAsyncReply putBlockAsync( + XceiverClientSpi xceiverClient, BlockData containerBlockData, + String traceID) + throws IOException, InterruptedException, ExecutionException { + PutBlockRequestProto.Builder createBlockRequest = + PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) + .setContainerID(containerBlockData.getBlockID().getContainerID()) + .setTraceID(traceID).setDatanodeUuid(id) + .setPutBlock(createBlockRequest).build(); + xceiverClient.sendCommand(request); + return xceiverClient.sendCommandAsync(request); + } + /** * Calls the container protocol to read a chunk. * @@ -200,7 +227,7 @@ public final class ContainerProtocolCalls { * @param blockID ID of the block * @param data the data of the chunk to write * @param traceID container protocol call args - * @throws IOException if there is an I/O error while performing the call + * @throws Exception if there is an error while performing the call */ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, ByteString data, String traceID) @@ -223,6 +250,32 @@ public final class ContainerProtocolCalls { validateContainerResponse(response); } + /** + * Calls the container protocol to write a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to write + * @param blockID ID of the block + * @param data the data of the chunk to write + * @param traceID container protocol call args + * @throws IOException if there is an I/O error while performing the call + */ + public static XceiverClientAsyncReply writeChunkAsync( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + ByteString data, String traceID) + throws IOException, ExecutionException, InterruptedException { + WriteChunkRequestProto.Builder writeChunkRequest = + WriteChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk).setData(data); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk) + .setContainerID(blockID.getContainerID()).setTraceID(traceID) + .setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build(); + return xceiverClient.sendCommandAsync(request); + } + /** * Allows writing a small file using single RPC. This takes the container * name, block name and data to write sends all that data to the container @@ -420,7 +473,7 @@ public final class ContainerProtocolCalls { * @param response container protocol call response * @throws IOException if the container protocol call failed */ - private static void validateContainerResponse( + public static void validateContainerResponse( ContainerCommandResponseProto response ) throws StorageContainerException { if (response.getResult() == ContainerProtos.Result.SUCCESS) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 97768173f93..8a5762f3d75 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -112,6 +112,22 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_PROTOCOL = "ozone.client.protocol"; + public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE = + "ozone.client.stream.buffer.flush.size"; + + public static final long OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT = 64; + + public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE = + "ozone.client.stream.buffer.max.size"; + + public static final long OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = 128; + + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT = + "ozone.client.watch.request.timeout"; + + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT = + "30s"; + // This defines the overall connection limit for the connection pool used in // RestClient. public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX = @@ -192,14 +208,6 @@ public final class OzoneConfigKeys { public static final int OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10; - public static final String OZONE_CLIENT_MAX_RETRIES = - "ozone.client.max.retries"; - public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50; - - public static final String OZONE_CLIENT_RETRY_INTERVAL = - "ozone.client.retry.interval"; - public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms"; - public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 2ffc2abb7cf..54bffd5723b 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -335,19 +335,29 @@ - ozone.client.max.retries - 50 + ozone.client.stream.buffer.flush.size + 64 OZONE, CLIENT - Maximum number of retries by Ozone Client on encountering - exception while fetching committed block length. + Size in mb which determines at what buffer position , a partial + flush will be initiated during write. It should be ideally a mutiple + of chunkSize. - ozone.client.retry.interval - 200ms + ozone.client.stream.buffer.max.size + 128 OZONE, CLIENT - Interval between retries by Ozone Client on encountering - exception while fetching committed block length. + Size in mb which determines at what buffer position , + write call be blocked till acknowledgement of the fisrt partial flush + happens by all servers. + + + + ozone.client.watch.request.timeout + 30s + OZONE, CLIENT + Timeout for the watch API in Ratis client to acknowledge + a particular request getting replayed to all servers. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index e2e57006e78..ea0e819173c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -121,6 +121,9 @@ public class BlockManagerImpl implements BlockManager { container.updateBlockCommitSequenceId(bcsId); // Increment keycount here container.getContainerData().incrKeyCount(); + LOG.debug( + "Block " + data.getBlockID() + " successfully committed with bcsId " + + bcsId + " chunk size " + data.getChunks().size()); return data.getSize(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 40e4d83113e..be1449ff36b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -17,23 +17,14 @@ */ package org.apache.hadoop.ozone.client; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; import java.util.ArrayList; import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** A utility class for OzoneClient. */ public final class OzoneClientUtils { @@ -94,24 +85,6 @@ public final class OzoneClientUtils { return keyInfo; } - public static RetryPolicy createRetryPolicy(Configuration conf) { - int maxRetryCount = - conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. - OZONE_CLIENT_MAX_RETRIES_DEFAULT); - long retryInterval = conf.getTimeDuration(OzoneConfigKeys. - OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys. - OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - RetryPolicy basePolicy = RetryPolicies - .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval, - TimeUnit.MILLISECONDS); - Map, RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); - exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy); - RetryPolicy retryPolicy = RetryPolicies - .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, - exceptionToPolicyMap); - return retryPolicy; - } /** * Returns a KeyInfoDetails object constructed using fields of the input * OzoneKeyDetails object. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 450e2dc6885..5dbe9f65d2a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -24,11 +24,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -41,18 +40,17 @@ import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.ListIterator; +import java.util.concurrent.TimeoutException; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -71,7 +69,6 @@ public class ChunkGroupOutputStream extends OutputStream { // array list's get(index) is O(1) private final ArrayList streamEntries; private int currentStreamIndex; - private long byteOffset; private final OzoneManagerProtocolClientSideTranslatorPB omClient; private final StorageContainerLocationProtocolClientSideTranslatorPB scmClient; @@ -81,7 +78,11 @@ public class ChunkGroupOutputStream extends OutputStream { private final int chunkSize; private final String requestID; private boolean closed; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; + private ByteBuffer buffer; /** * A constructor for testing purpose only. */ @@ -96,7 +97,11 @@ public class ChunkGroupOutputStream extends OutputStream { chunkSize = 0; requestID = null; closed = false; - retryPolicy = null; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + buffer = ByteBuffer.allocate(1); + watchTimeout = 0; + blockSize = 0; } /** @@ -127,35 +132,54 @@ public class ChunkGroupOutputStream extends OutputStream { new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) .setLength(streamEntry.currentPosition).setOffset(0) .build(); + LOG.debug("block written " + streamEntry.blockID + ", length " + + streamEntry.currentPosition + " bcsID " + streamEntry.blockID + .getBlockCommitSequenceId()); locationInfoList.add(info); } return locationInfoList; } - public ChunkGroupOutputStream( - OpenKeySession handler, XceiverClientManager xceiverClientManager, + public ChunkGroupOutputStream(OpenKeySession handler, + XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, - OzoneManagerProtocolClientSideTranslatorPB omClient, - int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type, RetryPolicy retryPolicy) throws IOException { + OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, + String requestId, ReplicationFactor factor, ReplicationType type, + long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; - this.byteOffset = 0; this.omClient = omClient; this.scmClient = scmClient; OmKeyInfo info = handler.getKeyInfo(); - this.keyArgs = new OmKeyArgs.Builder() - .setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()) - .setKeyName(info.getKeyName()) - .setType(type) - .setFactor(factor) - .setDataSize(info.getDataSize()).build(); + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setType(type).setFactor(factor).setDataSize(info.getDataSize()) + .build(); this.openID = handler.getId(); this.xceiverClientManager = xceiverClientManager; this.chunkSize = chunkSize; this.requestID = requestId; - this.retryPolicy = retryPolicy; + this.streamBufferFlushSize = bufferFlushSize * OzoneConsts.MB; + this.streamBufferMaxSize = bufferMaxSize * OzoneConsts.MB; + this.blockSize = size * OzoneConsts.MB; + this.watchTimeout = watchTimeout; + + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(streamBufferFlushSize % chunkSize == 0); + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); + Preconditions.checkState(blockSize % streamBufferMaxSize == 0); + + // This byteBuffer will be used to cache data until all the blockCommits + // (putBlock) gets replicated to all/majority servers. The idea here is to + // allocate the buffer of size blockSize so that as and when a chunk is + // is replicated to all servers, as a part of discarding the buffer, we + // don't necessarily need to run compaction(buffer.compact() on the buffer + // to actually discard the acknowledged data. Compaction is inefficient so + // it would be a better choice to avoid compaction on the happy I/O path. + this.buffer = ByteBuffer.allocate((int) blockSize); } /** @@ -191,12 +215,13 @@ public class ChunkGroupOutputStream extends OutputStream { xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength())); + chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, + streamBufferMaxSize, watchTimeout, buffer)); } @VisibleForTesting public long getByteOffset() { - return byteOffset; + return getKeyLength(); } @@ -223,21 +248,23 @@ public class ChunkGroupOutputStream extends OutputStream { public void write(byte[] b, int off, int len) throws IOException { checkNotClosed(); - handleWrite(b, off, len); + handleWrite(b, off, len, false, buffer.position()); } - private void handleWrite(byte[] b, int off, int len) throws IOException { + private void handleWrite(byte[] b, int off, int len, boolean retry, + int pos) throws IOException { if (b == null) { throw new NullPointerException(); } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } int succeededAllocates = 0; + int initialPos; while (len > 0) { if (streamEntries.size() <= currentStreamIndex) { Preconditions.checkNotNull(omClient); @@ -247,8 +274,8 @@ public class ChunkGroupOutputStream extends OutputStream { allocateNewBlock(currentStreamIndex); succeededAllocates += 1; } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " + - "allocated " + succeededAllocates + " blocks for this write."); + LOG.error("Try to allocate more blocks for write failed, already " + + "allocated " + succeededAllocates + " blocks for this write."); throw ioe; } } @@ -257,12 +284,19 @@ public class ChunkGroupOutputStream extends OutputStream { Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); int writeLen = Math.min(len, (int) current.getRemaining()); + initialPos = pos < buffer.position() ? pos : buffer.position(); try { - current.write(b, off, writeLen); + if (retry) { + current.writeOnRetry(len); + } else { + current.write(b, off, writeLen); + } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { - handleCloseContainerException(current, currentStreamIndex); - continue; + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + // for the current iteration, current pos - initialPos gives the + // amount of data already written to the buffer + writeLen = buffer.position() - initialPos; + handleException(current, currentStreamIndex); } else { throw ioe; } @@ -274,57 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream { } len -= writeLen; off += writeLen; - byteOffset += writeLen; - } - } - - private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry) - throws IOException { - long blockLength; - ContainerProtos.GetCommittedBlockLengthResponseProto responseProto; - RetryPolicy.RetryAction action; - int numRetries = 0; - while (true) { - try { - responseProto = ContainerProtocolCalls - .getCommittedBlockLength(streamEntry.xceiverClient, - streamEntry.blockID, requestID); - blockLength = responseProto.getBlockLength(); - return blockLength; - } catch (StorageContainerException sce) { - try { - action = retryPolicy.shouldRetry(sce, numRetries, 0, true); - } catch (Exception e) { - throw e instanceof IOException ? (IOException) e : new IOException(e); - } - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - if (action.reason != null) { - LOG.error( - "GetCommittedBlockLength request failed. " + action.reason, - sce); - } - throw sce; - } - - // Throw the exception if the thread is interrupted - if (Thread.currentThread().isInterrupted()) { - LOG.warn("Interrupted while trying for connection"); - throw sce; - } - Preconditions.checkArgument( - action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); - try { - Thread.sleep(action.delayMillis); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException( - "Interrupted: action=" + action + ", retry policy=" + retryPolicy) - .initCause(e); - } - numRetries++; - LOG.trace("Retrying GetCommittedBlockLength request. Already tried " - + numRetries + " time(s); retry policy is " + retryPolicy); - continue; - } } } @@ -373,55 +356,35 @@ public class ChunkGroupOutputStream extends OutputStream { * * @param streamEntry StreamEntry * @param streamIndex Index of the entry - * @throws IOException Throws IOexception if Write fails + * @throws IOException Throws IOException if Write fails */ - private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, + private void handleException(ChunkOutputStreamEntry streamEntry, int streamIndex) throws IOException { - long committedLength = 0; - ByteBuffer buffer = streamEntry.getBuffer(); - if (buffer == null) { - // the buffer here will be null only when closeContainerException is - // hit while calling putKey during close on chunkOutputStream. - // Since closeContainer auto commit pending keys, no need to do - // anything here. - return; - } + int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex(); + int currentPos = buffer.position(); - // update currentStreamIndex in case of closed container exception. The - // current stream entry cannot be used for further writes because - // container is closed. - currentStreamIndex += 1; - - // In case where not a single chunk of data has been written to the Datanode - // yet. This block does not yet exist on the datanode but cached on the - // outputStream buffer. No need to call GetCommittedBlockLength here - // for this block associated with the stream here. - if (streamEntry.currentPosition >= chunkSize - || streamEntry.currentPosition != buffer.position()) { - committedLength = getCommittedBlockLength(streamEntry); - // update the length of the current stream - streamEntry.currentPosition = committedLength; + // In case of a failure, read the data from the position till the last + // acknowledgement happened. + if (lastSuccessfulFlushIndex > 0) { + buffer.position(lastSuccessfulFlushIndex); + buffer.limit(currentPos); + buffer.compact(); } if (buffer.position() > 0) { + //set the correct length for the current stream + streamEntry.currentPosition = lastSuccessfulFlushIndex; // If the data is still cached in the underlying stream, we need to - // allocate new block and write this data in the datanode. The cached - // data in the buffer does not exceed chunkSize. - Preconditions.checkState(buffer.position() < chunkSize); - // readjust the byteOffset value to the length actually been written. - byteOffset -= buffer.position(); - handleWrite(buffer.array(), 0, buffer.position()); + // allocate new block and write this data in the datanode. + currentStreamIndex += 1; + handleWrite(buffer.array(), 0, buffer.position(), true, + lastSuccessfulFlushIndex); } - // just clean up the current stream. Since the container is already closed, - // it will be auto committed. No need to call close again here. + // just clean up the current stream. streamEntry.cleanup(); - // This case will arise when while writing the first chunk itself fails. - // In such case, the current block associated with the stream has no data - // written. Remove it from the current stream list. - if (committedLength == 0) { + if (lastSuccessfulFlushIndex == 0) { streamEntries.remove(streamIndex); - Preconditions.checkArgument(currentStreamIndex != 0); currentStreamIndex -= 1; } // discard subsequent pre allocated blocks from the streamEntries list @@ -430,11 +393,15 @@ public class ChunkGroupOutputStream extends OutputStream { } private boolean checkIfContainerIsClosed(IOException ioe) { - return checkIfContainerNotOpenException(ioe) || Optional.of(ioe.getCause()) - .filter(e -> e instanceof StorageContainerException) - .map(e -> (StorageContainerException) e) - .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) - .isPresent(); + if (ioe.getCause() != null) { + return checkIfContainerNotOpenException(ioe) || Optional + .of(ioe.getCause()) + .filter(e -> e instanceof StorageContainerException) + .map(e -> (StorageContainerException) e) + .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) + .isPresent(); + } + return false; } private boolean checkIfContainerNotOpenException(IOException ioe) { @@ -448,6 +415,15 @@ public class ChunkGroupOutputStream extends OutputStream { return false; } + private boolean checkIfTimeoutException(IOException ioe) { + if (ioe.getCause() != null) { + return Optional.of(ioe.getCause()) + .filter(e -> e instanceof TimeoutException).isPresent(); + } else { + return false; + } + } + private long getKeyLength() { return streamEntries.parallelStream().mapToLong(e -> e.currentPosition) .sum(); @@ -495,11 +471,11 @@ public class ChunkGroupOutputStream extends OutputStream { entry.flush(); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { // This call will allocate a new streamEntry and write the Data. // Close needs to be retried on the newly allocated streamEntry as // as well. - handleCloseContainerException(entry, streamIndex); + handleException(entry, streamIndex); handleFlushOrClose(close); } else { throw ioe; @@ -519,16 +495,24 @@ public class ChunkGroupOutputStream extends OutputStream { return; } closed = true; - handleFlushOrClose(true); - if (keyArgs != null) { - // in test, this could be null - removeEmptyBlocks(); - Preconditions.checkState(byteOffset == getKeyLength()); - keyArgs.setDataSize(byteOffset); - keyArgs.setLocationInfoList(getLocationInfoList()); - omClient.commitKey(keyArgs, openID); - } else { - LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); + try { + handleFlushOrClose(true); + if (keyArgs != null) { + // in test, this could be null + removeEmptyBlocks(); + keyArgs.setDataSize(getKeyLength()); + keyArgs.setLocationInfoList(getLocationInfoList()); + omClient.commitKey(keyArgs, openID); + } else { + LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); + } + } catch (IOException ioe) { + throw ioe; + } finally { + if (buffer != null) { + buffer.clear(); + } + buffer = null; } } @@ -544,7 +528,10 @@ public class ChunkGroupOutputStream extends OutputStream { private String requestID; private ReplicationType type; private ReplicationFactor factor; - private RetryPolicy retryPolicy; + private long streamBufferFlushSize; + private long streamBufferMaxSize; + private long blockSize; + private long watchTimeout; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -588,16 +575,31 @@ public class ChunkGroupOutputStream extends OutputStream { return this; } - public ChunkGroupOutputStream build() throws IOException { - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - omClient, chunkSize, requestID, factor, type, retryPolicy); - } - - public Builder setRetryPolicy(RetryPolicy rPolicy) { - this.retryPolicy = rPolicy; + public Builder setStreamBufferFlushSize(long size) { + this.streamBufferFlushSize = size; return this; } + public Builder setStreamBufferMaxSize(long size) { + this.streamBufferMaxSize = size; + return this; + } + + public Builder setBlockSize(long size) { + this.blockSize = size; + return this; + } + + public Builder setWatchTimeout(long timeout) { + this.watchTimeout = timeout; + return this; + } + + public ChunkGroupOutputStream build() throws IOException { + return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, + omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, + streamBufferMaxSize, blockSize, watchTimeout); + } } private static class ChunkOutputStreamEntry extends OutputStream { @@ -613,10 +615,16 @@ public class ChunkGroupOutputStream extends OutputStream { // the current position of this stream 0 <= currentPosition < length private long currentPosition; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private ByteBuffer buffer; + ChunkOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length) { + long length, long streamBufferFlushSize, long streamBufferMaxSize, + long watchTimeout, ByteBuffer buffer) { this.outputStream = null; this.blockID = blockID; this.key = key; @@ -627,6 +635,10 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; + this.streamBufferFlushSize = streamBufferFlushSize; + this.streamBufferMaxSize = streamBufferMaxSize; + this.watchTimeout = watchTimeout; + this.buffer = buffer; } /** @@ -645,6 +657,10 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; + streamBufferFlushSize = 0; + streamBufferMaxSize = 0; + buffer = null; + watchTimeout = 0; } long getLength() { @@ -657,9 +673,10 @@ public class ChunkGroupOutputStream extends OutputStream { private void checkStream() { if (this.outputStream == null) { - this.outputStream = new ChunkOutputStream(blockID, - key, xceiverClientManager, xceiverClient, - requestId, chunkSize); + this.outputStream = + new ChunkOutputStream(blockID, key, xceiverClientManager, + xceiverClient, requestId, chunkSize, streamBufferFlushSize, + streamBufferMaxSize, watchTimeout, buffer); } } @@ -696,15 +713,21 @@ public class ChunkGroupOutputStream extends OutputStream { } } - ByteBuffer getBuffer() throws IOException { + int getLastSuccessfulFlushIndex() throws IOException { if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; - return out.getBuffer(); + blockID = out.getBlockID(); + return out.getLastSuccessfulFlushIndex(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; } throw new IOException("Invalid Output Stream for Key: " + key); } - public void cleanup() { + void cleanup() { checkStream(); if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; @@ -712,6 +735,16 @@ public class ChunkGroupOutputStream extends OutputStream { } } + void writeOnRetry(int len) throws IOException { + checkStream(); + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + out.writeOnRetry(len); + this.currentPosition += len; + } else { + throw new IOException("Invalid Output Stream for Key: " + key); + } + } } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index cbb2e498ccf..826f04b6d64 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -24,18 +24,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; -import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; @@ -72,6 +71,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -94,7 +94,10 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long blockSize; + private final long watchTimeout; /** * Creates RpcClient instance with the given configuration. @@ -135,7 +138,6 @@ public class RpcClient implements ClientProtocol { Client.getRpcTimeout(conf))); this.xceiverClientManager = new XceiverClientManager(conf); - retryPolicy = OzoneClientUtils.createRetryPolicy(conf); int configuredChunkSize = conf.getInt( ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, @@ -149,6 +151,18 @@ public class RpcClient implements ClientProtocol { } else { chunkSize = configuredChunkSize; } + streamBufferFlushSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT); + streamBufferMaxSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT); + blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, + OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); + watchTimeout = + conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -468,7 +482,10 @@ public class RpcClient implements ClientProtocol { .setRequestID(requestId) .setType(HddsProtos.ReplicationType.valueOf(type.toString())) .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) - .setRetryPolicy(retryPolicy) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) + .setBlockSize(blockSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 15bf8d099ab..b352e3680f4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -230,7 +230,10 @@ public interface MiniOzoneCluster { protected Boolean ozoneEnabled = true; protected Boolean randomContainerPort = true; - + protected Optional chunkSize = Optional.empty(); + protected Optional streamBufferFlushSize = Optional.empty(); + protected Optional streamBufferMaxSize = Optional.empty(); + protected Optional blockSize = Optional.empty(); // Use relative smaller number of handlers for testing protected int numOfOmHandlers = 20; protected int numOfScmHandlers = 20; @@ -358,6 +361,46 @@ public interface MiniOzoneCluster { return this; } + /** + * Sets the chunk size. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setChunkSize(int size) { + chunkSize = Optional.of(size); + return this; + } + + /** + * Sets the flush size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setStreamBufferFlushSize(long size) { + streamBufferFlushSize = Optional.of(size); + return this; + } + + /** + * Sets the max size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setStreamBufferMaxSize(long size) { + streamBufferMaxSize = Optional.of(size); + return this; + } + + /** + * Sets the block size for stream buffer. + * + * @return MiniOzoneCluster.Builder + */ + public Builder setBlockSize(long size) { + blockSize = Optional.of(size); + return this; + } + /** * Constructs and returns MiniOzoneCluster. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 37b6fdcd0f2..324e17ba695 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -391,6 +391,25 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { Path metaDir = Paths.get(path, "ozone-meta"); Files.createDirectories(metaDir); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString()); + if (!chunkSize.isPresent()) { + chunkSize = Optional.of(1); + } + if (!streamBufferFlushSize.isPresent()) { + streamBufferFlushSize = Optional.of((long)chunkSize.get()); + } + if (!streamBufferMaxSize.isPresent()) { + streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get()); + } + if (!blockSize.isPresent()) { + blockSize = Optional.of(2 * streamBufferMaxSize.get()); + } + conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, + (int) (chunkSize.get() * OzoneConsts.MB)); + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + streamBufferFlushSize.get()); + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + streamBufferMaxSize.get()); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, blockSize.get()); configureTrace(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 871f389b038..0197304fe44 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -50,7 +50,7 @@ public interface RatisTestHelper { /** For testing Ozone with Ratis. */ class RatisTestSuite implements Closeable { - static final RpcType RPC = SupportedRpcType.NETTY; + static final RpcType RPC = SupportedRpcType.GRPC; static final int NUM_DATANODES = 3; private final OzoneConfiguration conf; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 43517ae6cad..935423de12d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.client.rpc; -import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -27,11 +26,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.ozone.HddsDatanodeService; -import org.apache.hadoop.hdds.scm.container.common.helpers. - StorageContainerException; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -55,15 +49,17 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.event.Level; import java.io.IOException; -import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests Close Container Exception handling by Ozone Client. @@ -79,7 +75,6 @@ public class TestCloseContainerHandlingByClient { private static String volumeName; private static String bucketName; private static String keyString; - private static int maxRetries; /** * Create a MiniDFSCluster for testing. @@ -91,15 +86,14 @@ public class TestCloseContainerHandlingByClient { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - maxRetries = 100; - conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries); - conf.set(OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, "200ms"); chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; - conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(3).build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -121,44 +115,29 @@ public class TestCloseContainerHandlingByClient { } } - private static String fixedLengthString(String string, int length) { - return String.format("%1$" + length + "s", string); - } - @Test public void testBlockWritesWithFlushAndClose() throws Exception { String keyName = "standalone"; - OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 0); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.write(data); key.flush(); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - //we have written two blocks - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length + (data.length % chunkSize), - keyLocationInfos.get(1).getLength()); Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); // Written the same data twice @@ -170,37 +149,24 @@ public class TestCloseContainerHandlingByClient { @Test public void testBlockWritesCloseConsistency() throws Exception { String keyName = "standalone2"; - OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - // Though we have written only block initially, the close will hit - // closeContainerException and remaining data in the chunkOutputStream - // buffer will be copied into a different allocated block and will be - // committed. - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length % chunkSize, - keyLocationInfos.get(1).getLength()); Assert.assertEquals(data.length, keyInfo.getDataSize()); validateData(keyName, data); } @@ -210,29 +176,30 @@ public class TestCloseContainerHandlingByClient { String keyName = "standalone3"; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize)); + createKey(keyName, ReplicationType.RATIS, (4 * blockSize)); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); - // write data for 3 blocks and 1 more chunk - byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes(); + // write data more than 1 chunk + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize)) + .getBytes(); Assert.assertEquals(data.length, 3 * blockSize); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, - HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); // write 1 more block worth of data. It will fail and new block will be // allocated - key.write(fixedLengthString(keyString, blockSize).getBytes()); + key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize) + .getBytes()); key.close(); // read the key from OM again and match the length.The length will still @@ -253,10 +220,10 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites2() throws Exception { - String keyName = "standalone4"; + String keyName = "ratis2"; long dataLength; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize); + createKey(keyName, ReplicationType.RATIS, 4 * blockSize); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); @@ -264,21 +231,21 @@ public class TestCloseContainerHandlingByClient { // With the initial size provided, it should have pre allocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); String dataString = - fixedLengthString(keyString, (3 * blockSize + chunkSize)); + ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize)); byte[] data = dataString.getBytes(); key.write(data); // 3 block are completely written to the DataNode in 3 blocks. // Data of length half of chunkSize resides in the chunkOutput stream buffer - String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2); + String dataString2 = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2); key.write(dataString2.getBytes()); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) - .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.close(); // read the key from OM again and match the length.The length will still @@ -290,9 +257,8 @@ public class TestCloseContainerHandlingByClient { // closeContainerException and remaining data in the chunkOutputStream // buffer will be copied into a different allocated block and will be // committed. - Assert.assertEquals(5, keyLocationInfos.size()); - dataLength = 3 * blockSize + (long) (1.5 * chunkSize); - Assert.assertEquals(dataLength, keyInfo.getDataSize()); + Assert.assertEquals(dataString.concat(dataString2).getBytes().length, + keyInfo.getDataSize()); validateData(keyName, dataString.concat(dataString2).getBytes()); } @@ -301,14 +267,14 @@ public class TestCloseContainerHandlingByClient { String keyName = "standalone5"; int keyLen = 4 * blockSize; - OzoneOutputStream key = - createKey(keyName, ReplicationType.RATIS, keyLen); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); // write data 3 blocks and one more chunk - byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes(); + byte[] writtenData = + ContainerTestHelper.getFixedLengthString(keyString, keyLen).getBytes(); byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize); Assert.assertEquals(data.length, 3 * blockSize + chunkSize); key.write(data); @@ -316,17 +282,14 @@ public class TestCloseContainerHandlingByClient { Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.RATIS) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, - HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); // write 3 more chunks worth of data. It will fail and new block will be // allocated. This write completes 4 blocks worth of data written to key - data = Arrays - .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); + data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); key.write(data); key.close(); @@ -345,8 +308,6 @@ public class TestCloseContainerHandlingByClient { // closeContainerException and remaining data in the chunkOutputStream // buffer will be copied into a different allocated block and will be // committed. - Assert.assertEquals(5, keyLocationInfos.size()); - Assert.assertEquals(4 * blockSize, keyInfo.getDataSize()); long length = 0; for (OmKeyLocationInfo locationInfo : keyLocationInfos) { length += locationInfo.getLength(); @@ -378,9 +339,9 @@ public class TestCloseContainerHandlingByClient { cluster.getStorageContainerManager().getEventQueue() .fireEvent(SCMEvents.CLOSE_CONTAINER, ContainerID.valueof(containerID)); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); @@ -406,8 +367,8 @@ public class TestCloseContainerHandlingByClient { .isContainerPresent(cluster, containerID, dn))) { for (DatanodeDetails datanodeDetails : datanodes) { GenericTestUtils.waitFor(() -> ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails), - 500, 15 * 1000); + .isContainerClosed(cluster, containerID, datanodeDetails), 500, + 15 * 1000); //double check if it's really closed // (waitFor also throws an exception) Assert.assertTrue(ContainerTestHelper @@ -425,29 +386,31 @@ public class TestCloseContainerHandlingByClient { public void testDiscardPreallocatedBlocks() throws Exception { String keyName = "discardpreallocatedblocks"; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize); + createKey(keyName, ReplicationType.RATIS, 2 * blockSize); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); // With the initial size provided, it should have pre allocated 4 blocks Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); - String dataString = fixedLengthString(keyString, (1 * blockSize)); + String dataString = + ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); byte[] data = dataString.getBytes(); key.write(data); List locationInfos = new ArrayList<>(groupOutputStream.getLocationInfoList()); long containerID = locationInfos.get(0).getContainerID(); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); List datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); - dataString = fixedLengthString(keyString, (1 * blockSize)); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + dataString = + ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); data = dataString.getBytes(); key.write(data); Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); @@ -466,40 +429,28 @@ public class TestCloseContainerHandlingByClient { private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { - ReplicationFactor factor = - type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE : - ReplicationFactor.THREE; - return objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(keyName, size, type, factor); + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); } private void validateData(String keyName, byte[] data) throws Exception { - byte[] readData = new byte[data.length]; - OzoneInputStream is = - objectStore.getVolume(volumeName).getBucket(bucketName) - .readKey(keyName); - is.read(readData); - MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha1.update(data); - MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha2.update(readData); - Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest())); - is.close(); + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); } @Test public void testBlockWriteViaRatis() throws Exception { String keyName = "ratis"; OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) - .setFactor(HddsProtos.ReplicationFactor.THREE) - .setKeyName(keyName).build(); + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); @@ -510,79 +461,10 @@ public class TestCloseContainerHandlingByClient { // The write will fail but exception will be handled and length will be // updated correctly in OzoneManager once the steam is closed key.close(); - // read the key from OM again and match the length.The length will still - // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - //we have written two blocks - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length + (data.length % chunkSize), - keyLocationInfos.get(1).getLength()); - Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); String dataString = new String(data); dataString.concat(dataString); + Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); validateData(keyName, dataString.getBytes()); } - - @Test - public void testRetriesOnBlockNotCommittedException() throws Exception { - String keyName = "blockcommitexceptiontest"; - OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); - GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG); - - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - String dataString = fixedLengthString(keyString, (3 * chunkSize)); - key.write(dataString.getBytes()); - List locationInfos = - groupOutputStream.getLocationInfoList(); - long containerID = locationInfos.get(0).getContainerID(); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); - Pipeline pipeline = - cluster.getStorageContainerManager().getPipelineManager() - .getPipeline(container.getPipelineID()); - List datanodes = pipeline.getNodes(); - Assert.assertEquals(1, datanodes.size()); - // move the container on the datanode to Closing state, this will ensure - // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying - // to fetch the committed length - for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) { - if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) { - datanodeService.getDatanodeStateMachine().getContainer() - .getContainerSet().getContainer(containerID).getContainerData() - .setState(ContainerProtos.ContainerDataProto.State.CLOSED); - } - } - dataString = fixedLengthString(keyString, (chunkSize * 1 / 2)); - key.write(dataString.getBytes()); - try { - key.close(); - Assert.fail("Expected Exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StorageContainerException); - Assert.assertTrue(((StorageContainerException) ioe).getResult() - == ContainerProtos.Result.BLOCK_NOT_COMMITTED); - } - // It should retry only for max retries times - for (int i = 1; i <= maxRetries; i++) { - Assert.assertTrue(logCapturer.getOutput() - .contains("Retrying GetCommittedBlockLength request")); - Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i)); - } - Assert.assertTrue(logCapturer.getOutput() - .contains("GetCommittedBlockLength request failed.")); - Assert.assertTrue(logCapturer.getOutput().contains( - "retries get failed due to exceeded maximum allowed retries number" - + ": " + maxRetries)); - logCapturer.stopCapturing(); - } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 73bff6f0311..c1827c9c176 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container. - common.helpers.StorageContainerException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; @@ -141,15 +139,8 @@ public class TestContainerStateMachineFailures { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - try { - // flush will throw an exception for the second write as the container - // dir has been deleted. - key.flush(); - Assert.fail("Expected exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getCause() instanceof StorageContainerException); - } + key.close(); // Make sure the container is marked unhealthy Assert.assertTrue( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() @@ -157,14 +148,5 @@ public class TestContainerStateMachineFailures { .getContainer(omKeyLocationInfo.getContainerID()) .getContainerState() == ContainerProtos.ContainerDataProto.State.UNHEALTHY); - try { - // subsequent requests will fail with unhealthy container exception - key.close(); - Assert.fail("Expected exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StorageContainerException); - Assert.assertTrue(((StorageContainerException) ioe).getResult() - == ContainerProtos.Result.BLOCK_NOT_COMMITTED); - } } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java new file mode 100644 index 00000000000..dc6747f89e4 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests Close Container Exception handling by Ozone Client. + */ +public class TestFailureHandlingByClient { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + private static int maxRetries; + + /** + * TODO: we will spawn new MiniOzoneCluster every time for each unit test + * invocation. Need to use the same instance for all tests. + */ + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + maxRetries = 100; + chunkSize = (int) OzoneConsts.MB; + blockSize = 4 * chunkSize; + conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, 1); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, 2); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5, + TimeUnit.SECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(6).build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "datanodefailurehandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + // TODO: currently, shutting down 2 datanodes in Ratis leads to + // watchForCommit Api in RaftClient to hand=g forever. Once that gets + // fixed, we need to execute the tets with 2 node failures. + + @Test + public void testBlockWritesWithDnFailures() throws Exception { + String keyName = "ratis3"; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + key.write(data); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertTrue(locationInfoList.size() == 1); + long containerId = locationInfoList.get(0).getContainerID(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List datanodes = pipeline.getNodes(); + cluster.shutdownHddsDatanode(datanodes.get(0)); + // cluster.shutdownHddsDatanode(datanodes.get(1)); + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.close(); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(data.length, keyInfo.getDataSize()); + validateData(keyName, data); + cluster.restartHddsDatanode(datanodes.get(0), true); + } + + @Test + public void testMultiBlockWritesWithDnFailures() throws Exception { + String keyName = "ratis3"; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + String data = + ContainerTestHelper + .getFixedLengthString(keyString, blockSize + chunkSize); + key.write(data.getBytes()); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertTrue(locationInfoList.size() == 2); + long containerId = locationInfoList.get(1).getContainerID(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List datanodes = pipeline.getNodes(); + cluster.shutdownHddsDatanode(datanodes.get(0)); + + // cluster.shutdownHddsDatanode(datanodes.get(1)); + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.write(data.getBytes()); + key.close(); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).getBytes()); + cluster.restartHddsDatanode(datanodes.get(0), true); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7d002c3e5ee..7e9bab5c0da 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -21,10 +21,14 @@ package org.apache.hadoop.ozone.container; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -632,4 +636,34 @@ public final class ContainerTestHelper { return false; } + public static OzoneOutputStream createKey(String keyName, + ReplicationType type, long size, ObjectStore objectStore, + String volumeName, String bucketName) throws Exception { + org.apache.hadoop.hdds.client.ReplicationFactor factor = + type == ReplicationType.STAND_ALONE ? + org.apache.hadoop.hdds.client.ReplicationFactor.ONE : + org.apache.hadoop.hdds.client.ReplicationFactor.THREE; + return objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, size, type, factor); + } + + public static void validateData(String keyName, byte[] data, + ObjectStore objectStore, String volumeName, String bucketName) + throws Exception { + byte[] readData = new byte[data.length]; + OzoneInputStream is = + objectStore.getVolume(volumeName).getBucket(bucketName) + .readKey(keyName); + is.read(readData); + MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha1.update(data); + MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha2.update(readData); + Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest())); + is.close(); + } + + public static String getFixedLengthString(String string, int length) { + return String.format("%1$" + length + "s", string); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index f7ba97920a5..3a15b211437 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -468,7 +468,7 @@ public class TestOzoneContainer { client.getPipeline(), blockID, 1024); CompletableFuture - response = client.sendCommandAsync(smallFileRequest); + response = client.sendCommandAsync(smallFileRequest).getResponse(); computeResults.add(response); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java index aa1df4c8f63..ce91dbdb95e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java @@ -119,7 +119,8 @@ public class TestXceiverClientMetrics { smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( client.getPipeline(), blockID, 1024); CompletableFuture - response = client.sendCommandAsync(smallFileRequest); + response = + client.sendCommandAsync(smallFileRequest).getResponse(); computeResults.add(response); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java index 0f49ade54cc..968024346a9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java @@ -62,7 +62,7 @@ public class TestOzoneRestWithMiniCluster { private static OzoneConfiguration conf; private static ClientProtocol client; private static ReplicationFactor replicationFactor = ReplicationFactor.ONE; - private static ReplicationType replicationType = ReplicationType.STAND_ALONE; + private static ReplicationType replicationType = ReplicationType.RATIS; @Rule public ExpectedException exception = ExpectedException.none(); diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a8df11440fa..6a433b58dee 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -22,8 +22,7 @@ import com.google.common.base.Strings; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -33,7 +32,6 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts.Versioning; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; @@ -63,6 +61,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * A {@link StorageHandler} implementation that distributes object storage @@ -80,10 +79,10 @@ public final class DistributedStorageHandler implements StorageHandler { private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; private int chunkSize; - private final boolean useRatis; - private final HddsProtos.ReplicationType type; - private final HddsProtos.ReplicationFactor factor; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; /** * Creates a new DistributedStorageHandler. @@ -100,17 +99,6 @@ public final class DistributedStorageHandler implements StorageHandler { this.ozoneManagerClient = ozoneManagerClient; this.storageContainerLocationClient = storageContainerLocation; this.xceiverClientManager = new XceiverClientManager(conf); - this.useRatis = conf.getBoolean( - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - - if(useRatis) { - type = HddsProtos.ReplicationType.RATIS; - factor = HddsProtos.ReplicationFactor.THREE; - } else { - type = HddsProtos.ReplicationType.STAND_ALONE; - factor = HddsProtos.ReplicationFactor.ONE; - } chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT); @@ -118,7 +106,6 @@ public final class DistributedStorageHandler implements StorageHandler { OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT); groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS, OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT); - retryPolicy = OzoneClientUtils.createRetryPolicy(conf); if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { LOG.warn("The chunk size ({}) is not allowed to be more than" + " the maximum size ({})," @@ -126,6 +113,18 @@ public final class DistributedStorageHandler implements StorageHandler { chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; } + streamBufferFlushSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT); + streamBufferMaxSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT); + blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, + OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); + watchTimeout = + conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); } @Override @@ -420,7 +419,10 @@ public final class DistributedStorageHandler implements StorageHandler { .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) - .setRetryPolicy(retryPolicy) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setBlockSize(blockSize) + .setWatchTimeout(watchTimeout) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index a2df50d533f..3cf44161cdc 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,6 +45,7 @@ public class TestDataValidate { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); @@ -86,6 +88,8 @@ public class TestDataValidate { randomKeyGenerator.setNumOfKeys(1); randomKeyGenerator.setKeySize(20971520); randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); randomKeyGenerator.call(); Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); @@ -101,6 +105,8 @@ public class TestDataValidate { randomKeyGenerator.setNumOfBuckets(5); randomKeyGenerator.setNumOfKeys(10); randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); randomKeyGenerator.call(); Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index d21d399941c..e5bb8ae80f6 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,6 +45,7 @@ public class TestRandomKeyGenerator { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); } @@ -65,6 +67,8 @@ public class TestRandomKeyGenerator { randomKeyGenerator.setNumOfVolumes(2); randomKeyGenerator.setNumOfBuckets(5); randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); randomKeyGenerator.call(); Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); @@ -81,6 +85,8 @@ public class TestRandomKeyGenerator { randomKeyGenerator.setNumOfKeys(10); randomKeyGenerator.setNumOfThreads(10); randomKeyGenerator.setKeySize(10240); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); randomKeyGenerator.call(); Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());