diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 673a82bbd82..65241bfa1ac 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -47,6 +47,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { final String rpcType = ozoneConf .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final TimeDuration clientRequestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); final int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); @@ -81,7 +84,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { SecurityConfig(ozoneConf)); return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, - retryPolicy, tlsConfig); + retryPolicy, tlsConfig, clientRequestTimeout); } private final Pipeline pipeline; @@ -90,6 +93,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { private final int maxOutstandingRequests; private final RetryPolicy retryPolicy; private final GrpcTlsConfig tlsConfig; + private final TimeDuration clientRequestTimeout; // Map to track commit index at every server private final ConcurrentHashMap commitInfoMap; @@ -102,7 +106,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, int maxOutStandingChunks, RetryPolicy retryPolicy, - GrpcTlsConfig tlsConfig) { + GrpcTlsConfig tlsConfig, TimeDuration timeout) { super(); this.pipeline = pipeline; this.rpcType = rpcType; @@ -111,6 +115,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { commitInfoMap = new ConcurrentHashMap<>(); watchClient = null; this.tlsConfig = tlsConfig; + this.clientRequestTimeout = timeout; } private void updateCommitInfosMap( @@ -160,7 +165,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { // requests to be handled by raft client if (!client.compareAndSet(null, RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, - maxOutstandingRequests, tlsConfig))) { + maxOutstandingRequests, tlsConfig, clientRequestTimeout))) { throw new IllegalStateException("Client is already connected."); } } @@ -243,7 +248,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { if (watchClient == null) { watchClient = RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, - maxOutstandingRequests, tlsConfig); + maxOutstandingRequests, tlsConfig, clientRequestTimeout); } CompletableFuture replyFuture = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); @@ -260,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi { // TODO : need to remove the code to create the new RaftClient instance // here once the watch request bypassing sliding window in Raft Client // gets fixed. - watchClient = RatisHelper - .newRaftClient(rpcType, getPipeline(), retryPolicy, - maxOutstandingRequests, tlsConfig); + watchClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, + maxOutstandingRequests, tlsConfig, clientRequestTimeout); reply = watchClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 1f84ebe7e8d..4c67eb314c3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -121,12 +121,12 @@ public final class ScmConfigKeys { TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = "dfs.ratis.client.request.max.retries"; - public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180; + public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20; public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY = "dfs.ratis.client.request.retry.interval"; public static final TimeDuration DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT = - TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY = "dfs.ratis.server.retry-cache.timeout.duration"; public static final TimeDuration diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index dc3ebe52d5d..1388d00d039 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -133,6 +133,11 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT = "30s"; + public static final String OZONE_CLIENT_MAX_RETRIES = + "ozone.client.max.retries"; + public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5; + + // This defines the overall connection limit for the connection pool used in // RestClient. public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX = diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 3713d7af58c..a63d18a726b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcFactory; @@ -134,33 +135,51 @@ public interface RatisHelper { static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, RetryPolicy retryPolicy, int maxOutStandingRequest, - GrpcTlsConfig tlsConfig) throws IOException { + GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException { return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), - pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig); + pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig, + timeout); + } + + static TimeDuration getClientRequestTimeout(Configuration conf) { + // Set the client requestTimeout + final TimeUnit timeUnit = + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT + .getUnit(); + final long duration = conf.getTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT + .getDuration(), timeUnit); + final TimeDuration clientRequestTimeout = + TimeDuration.valueOf(duration, timeUnit); + return clientRequestTimeout; } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, int maxOutstandingRequests, - GrpcTlsConfig tlsConfig) { + GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) { return newRaftClient(rpcType, leader.getId(), newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, - maxOutstandingRequests, tlsConfig); + maxOutstandingRequests, tlsConfig, clientRequestTimeout); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RetryPolicy retryPolicy, int maxOutstandingRequests) { + RetryPolicy retryPolicy, int maxOutstandingRequests, + TimeDuration clientRequestTimeout) { return newRaftClient(rpcType, leader.getId(), newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy, - maxOutstandingRequests, null); + maxOutstandingRequests, null, clientRequestTimeout); } static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest, - GrpcTlsConfig tlsConfig) { + GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); final RaftProperties properties = new RaftProperties(); RaftConfigKeys.Rpc.setType(properties, rpcType); + RaftClientConfigKeys.Rpc + .setRequestTimeout(properties, clientRequestTimeout); GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)); diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 9918b8ba518..e68e05e23b0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -231,17 +231,19 @@ dfs.ratis.client.request.timeout.duration 3s OZONE, RATIS, MANAGEMENT - The timeout duration for ratis client request. + The timeout duration for ratis client request.It should be + set greater than leader election timeout in Ratis. + dfs.ratis.client.request.max.retries - 180 + 20 OZONE, RATIS, MANAGEMENT Number of retries for ratis client request. dfs.ratis.client.request.retry.interval - 100ms + 500ms OZONE, RATIS, MANAGEMENT Interval between successive retries for a ratis client request. @@ -417,6 +419,14 @@ a particular request getting replayed to all servers. + + ozone.client.max.retries + 5 + OZONE, CLIENT + Maximum number of retries by Ozone Client on encountering + exception while writing a key. + + ozone.client.protocol org.apache.hadoop.ozone.client.rpc.RpcClient diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index b96e00a89fd..19e43b9948b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import io.opentracing.Scope; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcFactory; @@ -176,7 +175,7 @@ public final class XceiverServerRatis extends XceiverServer { setRaftSegmentPreallocatedSize(conf, properties); // Set max write buffer size, which is the scm chunk size - final int maxChunkSize = setMaxWriteBuffer(conf, properties); + final int maxChunkSize = setMaxWriteBuffer(properties); TimeUnit timeUnit; long duration; @@ -329,23 +328,10 @@ public final class XceiverServerRatis extends XceiverServer { .setRequestTimeout(properties, serverRequestTimeout); } - private int setMaxWriteBuffer(Configuration conf, RaftProperties properties) { + private int setMaxWriteBuffer(RaftProperties properties) { final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE; RaftServerConfigKeys.Log.setWriteBufferSize(properties, SizeInBytes.valueOf(maxChunkSize)); - - // Set the client requestTimeout - TimeUnit timeUnit = - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT - .getUnit(); - long duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT - .getDuration(), timeUnit); - final TimeDuration clientRequestTimeout = - TimeDuration.valueOf(duration, timeUnit); - RaftClientConfigKeys.Rpc - .setRequestTimeout(properties, clientRequestTimeout); return maxChunkSize; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index 7d8b3d61a42..16e0e9d7c80 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; @@ -49,6 +50,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeUnit; /** * Test cases to verify CloseContainerCommandHandler in datanode. @@ -289,8 +291,10 @@ public class TestCloseContainerCommandHandler { final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, Collections.singleton(datanodeDetails)); final int maxOutstandingRequests = 100; - final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC, - peer, retryPolicy, maxOutstandingRequests, null); + final RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.GRPC, peer, retryPolicy, + maxOutstandingRequests, + TimeDuration.valueOf(3, TimeUnit.SECONDS)); Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess()); Thread.sleep(2000); final ContainerID containerId = ContainerID.valueof( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 3b36add0ed9..0af34fb8563 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,9 +117,11 @@ final class RatisPipelineUtils { HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig( new SecurityConfig(ozoneConf)); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); RaftClient client = RatisHelper .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig); + retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout); client .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); } @@ -141,12 +144,13 @@ final class RatisPipelineUtils { HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new SecurityConfig(ozoneConf)); - + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); datanodes.parallelStream().forEach(d -> { final RaftPeer p = RatisHelper.toRaftPeer(d); try (RaftClient client = RatisHelper .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig)) { + retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { rpc.accept(client, p); } catch (IOException ioe) { String errMsg = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 96515188874..012a2256b2d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.*; import org.apache.ratis.protocol.AlreadyClosedException; @@ -27,6 +29,7 @@ import org.apache.ratis.protocol.RaftRetryFailureException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** A utility class for OzoneClient. */ @@ -122,6 +125,14 @@ public final class OzoneClientUtils { return keyInfo; } + public static RetryPolicy createRetryPolicy(int maxRetryCount) { + // just retry without sleep + RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0, + TimeUnit.MILLISECONDS); + return retryPolicy; + } + public static List> getExceptionList() { return EXCEPTION_LIST; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index d1acbe1cefd..a379889e5b9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -45,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -87,6 +90,8 @@ public class KeyOutputStream extends OutputStream { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; private ExcludeList excludeList; + private final RetryPolicy retryPolicy; + private int retryCount; /** * A constructor for testing purpose only. */ @@ -111,6 +116,8 @@ public class KeyOutputStream extends OutputStream { OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); this.bytesPerChecksum = OzoneConfigKeys .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB + this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + retryCount = 0; } @VisibleForTesting @@ -147,7 +154,7 @@ public class KeyOutputStream extends OutputStream { String requestId, ReplicationFactor factor, ReplicationType type, long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, ChecksumType checksumType, int bytesPerChecksum, - String uploadID, int partNumber, boolean isMultipart) { + String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.omClient = omClient; @@ -183,6 +190,8 @@ public class KeyOutputStream extends OutputStream { this.bufferPool = new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize); this.excludeList = new ExcludeList(); + this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount); + this.retryCount = 0; } /** @@ -308,23 +317,18 @@ public class KeyOutputStream extends OutputStream { current.write(b, off, writeLen); } } catch (IOException ioe) { - Throwable t = checkForException(ioe); - if (t != null) { - // for the current iteration, totalDataWritten - currentPos gives the - // amount of data already written to the buffer + // for the current iteration, totalDataWritten - currentPos gives the + // amount of data already written to the buffer - // In the retryPath, the total data to be written will always be equal - // to or less than the max length of the buffer allocated. - // The len specified here is the combined sum of the data length of - // the buffers - Preconditions.checkState(!retry || len <= streamBufferMaxSize); - writeLen = retry ? (int) len : - (int) (current.getWrittenDataLength() - currentPos); - LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex, t); - } else { - throw ioe; - } + // In the retryPath, the total data to be written will always be equal + // to or less than the max length of the buffer allocated. + // The len specified here is the combined sum of the data length of + // the buffers + Preconditions.checkState(!retry || len <= streamBufferMaxSize); + writeLen = retry ? (int) len : + (int) (current.getWrittenDataLength() - currentPos); + LOG.debug("writeLen {}, total len {}", writeLen, len); + handleException(current, currentStreamIndex, ioe); } if (current.getRemaining() <= 0) { // since the current block is already written close the stream. @@ -390,7 +394,8 @@ public class KeyOutputStream extends OutputStream { * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex, Throwable exception) throws IOException { + int streamIndex, IOException exception) throws IOException { + Throwable t = checkForException(exception); boolean retryFailure = checkForRetryFailure(exception); boolean closedContainerException = false; if (!retryFailure) { @@ -413,9 +418,9 @@ public class KeyOutputStream extends OutputStream { if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } - if (checkIfContainerIsClosed(exception)) { + if (checkIfContainerIsClosed(t)) { excludeList.addConatinerId(ContainerID.valueof(containerId)); - } else if (retryFailure || exception instanceof TimeoutException) { + } else if (retryFailure || t instanceof TimeoutException) { pipelineId = streamEntry.getPipeline().getId(); excludeList.addPipeline(pipelineId); } @@ -425,7 +430,7 @@ public class KeyOutputStream extends OutputStream { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. currentStreamIndex += 1; - handleWrite(null, 0, bufferedDataLen, true); + handleRetry(exception, bufferedDataLen); } if (totalSuccessfulFlushedData == 0) { streamEntries.remove(streamIndex); @@ -448,6 +453,43 @@ public class KeyOutputStream extends OutputStream { } } + private void handleRetry(IOException exception, long len) throws IOException { + RetryPolicy.RetryAction action; + try { + action = retryPolicy + .shouldRetry(exception, retryCount, 0, true); + } catch (Exception e) { + throw e instanceof IOException ? (IOException) e : new IOException(e); + } + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + if (action.reason != null) { + LOG.error("Retry request failed. " + action.reason, + exception); + } + throw exception; + } + + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying for retry"); + throw exception; + } + Preconditions.checkArgument( + action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); + if (action.delayMillis > 0) { + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException( + "Interrupted: action=" + action + ", retry policy=" + retryPolicy) + .initCause(e); + } + } + retryCount++; + LOG.trace("Retrying Write request. Already tried " + + retryCount + " time(s); retry policy is " + retryPolicy); + handleWrite(null, 0, len, true); + } /** * Checks if the provided exception signifies retry failure in ratis client. * In case of retry failure, ratis client throws RaftRetryFailureException @@ -462,7 +504,7 @@ public class KeyOutputStream extends OutputStream { return t instanceof ContainerNotOpenException; } - private Throwable checkForException(IOException ioe) { + private Throwable checkForException(IOException ioe) throws IOException { Throwable t = ioe.getCause(); while (t != null) { for (Class cls : OzoneClientUtils @@ -473,7 +515,7 @@ public class KeyOutputStream extends OutputStream { } t = t.getCause(); } - return null; + throw ioe; } private long getKeyLength() { @@ -512,36 +554,30 @@ public class KeyOutputStream extends OutputStream { if (streamEntries.size() == 0) { return; } - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - BlockOutputStreamEntry entry = streamEntries.get(streamIndex); - if (entry != null) { - try { - Collection failedServers = entry.getFailedServers(); - - // failed servers can be null in case there is no data written in the - // stream - if (failedServers != null && !failedServers.isEmpty()) { - excludeList.addDatanodes(failedServers); - } - if (close) { - entry.close(); - } else { - entry.flush(); - } - } catch (IOException ioe) { - Throwable t = checkForException(ioe); - if (t != null) { - // This call will allocate a new streamEntry and write the Data. - // Close needs to be retried on the newly allocated streamEntry as - // as well. - handleException(entry, streamIndex, t); - handleFlushOrClose(close); - } else { - throw ioe; + while (true) { + int size = streamEntries.size(); + int streamIndex = + currentStreamIndex >= size ? size - 1 : currentStreamIndex; + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); + if (entry != null) { + try { + Collection failedServers = entry.getFailedServers(); + // failed servers can be null in case there is no data written in the + // stream + if (failedServers != null && !failedServers.isEmpty()) { + excludeList.addDatanodes(failedServers); + } + if (close) { + entry.close(); + } else { + entry.flush(); + } + } catch (IOException ioe) { + handleException(entry, streamIndex, ioe); + continue; } } + break; } } @@ -616,6 +652,7 @@ public class KeyOutputStream extends OutputStream { private String multipartUploadID; private int multipartNumber; private boolean isMultipartKey; + private int maxRetryCount; public Builder setMultipartUploadID(String uploadID) { @@ -704,11 +741,17 @@ public class KeyOutputStream extends OutputStream { return this; } + public Builder setMaxRetryCount(int maxCount) { + this.maxRetryCount = maxCount; + return this; + } + public KeyOutputStream build() throws IOException { return new KeyOutputStream(openHandler, xceiverManager, scmClient, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksumType, - bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey); + bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, + maxRetryCount); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 7fab65041f3..d0595822a5c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -123,6 +123,7 @@ public class RpcClient implements ClientProtocol { private final long blockSize; private final long watchTimeout; private final ClientId clientId = ClientId.randomId(); + private final int maxRetryCount; /** * Creates RpcClient instance with the given configuration. @@ -205,6 +206,9 @@ public class RpcClient implements ClientProtocol { this.verifyChecksum = conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT); + maxRetryCount = + conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. + OZONE_CLIENT_MAX_RETRIES_DEFAULT); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -594,7 +598,7 @@ public class RpcClient implements ClientProtocol { .build(); OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - KeyOutputStream groupOutputStream = + KeyOutputStream keyOutputStream = new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) @@ -610,20 +614,21 @@ public class RpcClient implements ClientProtocol { .setBlockSize(blockSize) .setChecksumType(checksumType) .setBytesPerChecksum(bytesPerChecksum) + .setMaxRetryCount(maxRetryCount) .build(); - groupOutputStream.addPreallocateBlocks( + keyOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), openKey.getOpenVersion()); - final FileEncryptionInfo feInfo = groupOutputStream + final FileEncryptionInfo feInfo = keyOutputStream .getFileEncryptionInfo(); if (feInfo != null) { KeyProvider.KeyVersion decrypted = getDEK(feInfo); final CryptoOutputStream cryptoOut = new CryptoOutputStream( - groupOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo), + keyOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo), decrypted.getMaterial(), feInfo.getIV()); return new OzoneOutputStream(cryptoOut); } else { - return new OzoneOutputStream(groupOutputStream); + return new OzoneOutputStream(keyOutputStream); } } @@ -856,7 +861,7 @@ public class RpcClient implements ClientProtocol { .build(); OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - KeyOutputStream groupOutputStream = + KeyOutputStream keyOutputStream = new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) @@ -875,11 +880,12 @@ public class RpcClient implements ClientProtocol { .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) + .setMaxRetryCount(maxRetryCount) .build(); - groupOutputStream.addPreallocateBlocks( + keyOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), openKey.getOpenVersion()); - return new OzoneOutputStream(groupOutputStream); + return new OzoneOutputStream(keyOutputStream); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 322339b7ea3..da06c59d190 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -31,6 +31,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,9 +127,11 @@ public interface RatisTestHelper { final OzoneConfiguration conf = new OzoneConfiguration(); final int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests(conf); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(conf); final RaftClient client = newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), - maxOutstandingRequests); + maxOutstandingRequests, requestTimeout); client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId()); } } diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index f0f8a60470c..6fff3e4fd08 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -88,6 +88,7 @@ public final class DistributedStorageHandler implements StorageHandler { private final ChecksumType checksumType; private final int bytesPerChecksum; private final boolean verifyChecksum; + private final int maxRetryCount; /** * Creates a new DistributedStorageHandler. @@ -154,6 +155,9 @@ public final class DistributedStorageHandler implements StorageHandler { this.verifyChecksum = conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT); + this.maxRetryCount = + conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. + OZONE_CLIENT_MAX_RETRIES_DEFAULT); } @Override @@ -438,7 +442,7 @@ public final class DistributedStorageHandler implements StorageHandler { .build(); // contact OM to allocate a block for key. OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - KeyOutputStream groupOutputStream = + KeyOutputStream keyOutputStream = new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) @@ -454,11 +458,12 @@ public final class DistributedStorageHandler implements StorageHandler { .setWatchTimeout(watchTimeout) .setChecksumType(checksumType) .setBytesPerChecksum(bytesPerChecksum) + .setMaxRetryCount(maxRetryCount) .build(); - groupOutputStream.addPreallocateBlocks( + keyOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), openKey.getOpenVersion()); - return new OzoneOutputStream(groupOutputStream); + return new OzoneOutputStream(keyOutputStream); } @Override