HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
ba50a36a3e
commit
155ab6d5d8
|
@ -47,6 +47,7 @@ import org.apache.ratis.protocol.RaftClientReply;
|
|||
import org.apache.ratis.rpc.RpcType;
|
||||
import org.apache.ratis.rpc.SupportedRpcType;
|
||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||
import org.apache.ratis.util.TimeDuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -74,6 +75,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
final String rpcType = ozoneConf
|
||||
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
|
||||
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
|
||||
final TimeDuration clientRequestTimeout =
|
||||
RatisHelper.getClientRequestTimeout(ozoneConf);
|
||||
final int maxOutstandingRequests =
|
||||
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
|
||||
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
|
||||
|
@ -81,7 +84,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
SecurityConfig(ozoneConf));
|
||||
return new XceiverClientRatis(pipeline,
|
||||
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
|
||||
retryPolicy, tlsConfig);
|
||||
retryPolicy, tlsConfig, clientRequestTimeout);
|
||||
}
|
||||
|
||||
private final Pipeline pipeline;
|
||||
|
@ -90,6 +93,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
private final int maxOutstandingRequests;
|
||||
private final RetryPolicy retryPolicy;
|
||||
private final GrpcTlsConfig tlsConfig;
|
||||
private final TimeDuration clientRequestTimeout;
|
||||
|
||||
// Map to track commit index at every server
|
||||
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
|
||||
|
@ -102,7 +106,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
*/
|
||||
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
|
||||
int maxOutStandingChunks, RetryPolicy retryPolicy,
|
||||
GrpcTlsConfig tlsConfig) {
|
||||
GrpcTlsConfig tlsConfig, TimeDuration timeout) {
|
||||
super();
|
||||
this.pipeline = pipeline;
|
||||
this.rpcType = rpcType;
|
||||
|
@ -111,6 +115,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
commitInfoMap = new ConcurrentHashMap<>();
|
||||
watchClient = null;
|
||||
this.tlsConfig = tlsConfig;
|
||||
this.clientRequestTimeout = timeout;
|
||||
}
|
||||
|
||||
private void updateCommitInfosMap(
|
||||
|
@ -160,7 +165,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
// requests to be handled by raft client
|
||||
if (!client.compareAndSet(null,
|
||||
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
||||
maxOutstandingRequests, tlsConfig))) {
|
||||
maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
|
||||
throw new IllegalStateException("Client is already connected.");
|
||||
}
|
||||
}
|
||||
|
@ -243,7 +248,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
if (watchClient == null) {
|
||||
watchClient =
|
||||
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
||||
maxOutstandingRequests, tlsConfig);
|
||||
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
|
||||
}
|
||||
CompletableFuture<RaftClientReply> replyFuture = watchClient
|
||||
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
|
||||
|
@ -260,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
|||
// TODO : need to remove the code to create the new RaftClient instance
|
||||
// here once the watch request bypassing sliding window in Raft Client
|
||||
// gets fixed.
|
||||
watchClient = RatisHelper
|
||||
.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
||||
maxOutstandingRequests, tlsConfig);
|
||||
watchClient =
|
||||
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
|
||||
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
|
||||
reply = watchClient
|
||||
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
|
||||
.get(timeout, TimeUnit.MILLISECONDS);
|
||||
|
|
|
@ -121,12 +121,12 @@ public final class ScmConfigKeys {
|
|||
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
|
||||
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
|
||||
"dfs.ratis.client.request.max.retries";
|
||||
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
|
||||
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
|
||||
public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
|
||||
"dfs.ratis.client.request.retry.interval";
|
||||
public static final TimeDuration
|
||||
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
|
||||
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
|
||||
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
|
||||
public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
|
||||
"dfs.ratis.server.retry-cache.timeout.duration";
|
||||
public static final TimeDuration
|
||||
|
|
|
@ -133,6 +133,11 @@ public final class OzoneConfigKeys {
|
|||
public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
|
||||
"30s";
|
||||
|
||||
public static final String OZONE_CLIENT_MAX_RETRIES =
|
||||
"ozone.client.max.retries";
|
||||
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;
|
||||
|
||||
|
||||
// This defines the overall connection limit for the connection pool used in
|
||||
// RestClient.
|
||||
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
|
||||
import org.apache.ratis.client.RaftClient;
|
||||
import org.apache.ratis.client.RaftClientConfigKeys;
|
||||
import org.apache.ratis.conf.RaftProperties;
|
||||
import org.apache.ratis.grpc.GrpcConfigKeys;
|
||||
import org.apache.ratis.grpc.GrpcFactory;
|
||||
|
@ -134,33 +135,51 @@ public interface RatisHelper {
|
|||
|
||||
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
|
||||
RetryPolicy retryPolicy, int maxOutStandingRequest,
|
||||
GrpcTlsConfig tlsConfig) throws IOException {
|
||||
GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
|
||||
return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
|
||||
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
|
||||
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
|
||||
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
|
||||
timeout);
|
||||
}
|
||||
|
||||
static TimeDuration getClientRequestTimeout(Configuration conf) {
|
||||
// Set the client requestTimeout
|
||||
final TimeUnit timeUnit =
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
|
||||
.getUnit();
|
||||
final long duration = conf.getTimeDuration(
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
|
||||
.getDuration(), timeUnit);
|
||||
final TimeDuration clientRequestTimeout =
|
||||
TimeDuration.valueOf(duration, timeUnit);
|
||||
return clientRequestTimeout;
|
||||
}
|
||||
|
||||
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
|
||||
RetryPolicy retryPolicy, int maxOutstandingRequests,
|
||||
GrpcTlsConfig tlsConfig) {
|
||||
GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
|
||||
return newRaftClient(rpcType, leader.getId(),
|
||||
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
|
||||
maxOutstandingRequests, tlsConfig);
|
||||
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
|
||||
}
|
||||
|
||||
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
|
||||
RetryPolicy retryPolicy, int maxOutstandingRequests) {
|
||||
RetryPolicy retryPolicy, int maxOutstandingRequests,
|
||||
TimeDuration clientRequestTimeout) {
|
||||
return newRaftClient(rpcType, leader.getId(),
|
||||
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
|
||||
maxOutstandingRequests, null);
|
||||
maxOutstandingRequests, null, clientRequestTimeout);
|
||||
}
|
||||
|
||||
static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
|
||||
RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
|
||||
GrpcTlsConfig tlsConfig) {
|
||||
GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
|
||||
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
|
||||
final RaftProperties properties = new RaftProperties();
|
||||
RaftConfigKeys.Rpc.setType(properties, rpcType);
|
||||
RaftClientConfigKeys.Rpc
|
||||
.setRequestTimeout(properties, clientRequestTimeout);
|
||||
|
||||
GrpcConfigKeys.setMessageSizeMax(properties,
|
||||
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
|
||||
|
|
|
@ -231,17 +231,19 @@
|
|||
<name>dfs.ratis.client.request.timeout.duration</name>
|
||||
<value>3s</value>
|
||||
<tag>OZONE, RATIS, MANAGEMENT</tag>
|
||||
<description>The timeout duration for ratis client request.</description>
|
||||
<description>The timeout duration for ratis client request.It should be
|
||||
set greater than leader election timeout in Ratis.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dfs.ratis.client.request.max.retries</name>
|
||||
<value>180</value>
|
||||
<value>20</value>
|
||||
<tag>OZONE, RATIS, MANAGEMENT</tag>
|
||||
<description>Number of retries for ratis client request.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dfs.ratis.client.request.retry.interval</name>
|
||||
<value>100ms</value>
|
||||
<value>500ms</value>
|
||||
<tag>OZONE, RATIS, MANAGEMENT</tag>
|
||||
<description>Interval between successive retries for a ratis client request.
|
||||
</description>
|
||||
|
@ -417,6 +419,14 @@
|
|||
a particular request getting replayed to all servers.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.client.max.retries</name>
|
||||
<value>5</value>
|
||||
<tag>OZONE, CLIENT</tag>
|
||||
<description>Maximum number of retries by Ozone Client on encountering
|
||||
exception while writing a key.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.client.protocol</name>
|
||||
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
|||
import io.opentracing.Scope;
|
||||
import org.apache.ratis.RaftConfigKeys;
|
||||
import org.apache.ratis.RatisHelper;
|
||||
import org.apache.ratis.client.RaftClientConfigKeys;
|
||||
import org.apache.ratis.conf.RaftProperties;
|
||||
import org.apache.ratis.grpc.GrpcConfigKeys;
|
||||
import org.apache.ratis.grpc.GrpcFactory;
|
||||
|
@ -176,7 +175,7 @@ public final class XceiverServerRatis extends XceiverServer {
|
|||
setRaftSegmentPreallocatedSize(conf, properties);
|
||||
|
||||
// Set max write buffer size, which is the scm chunk size
|
||||
final int maxChunkSize = setMaxWriteBuffer(conf, properties);
|
||||
final int maxChunkSize = setMaxWriteBuffer(properties);
|
||||
TimeUnit timeUnit;
|
||||
long duration;
|
||||
|
||||
|
@ -329,23 +328,10 @@ public final class XceiverServerRatis extends XceiverServer {
|
|||
.setRequestTimeout(properties, serverRequestTimeout);
|
||||
}
|
||||
|
||||
private int setMaxWriteBuffer(Configuration conf, RaftProperties properties) {
|
||||
private int setMaxWriteBuffer(RaftProperties properties) {
|
||||
final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE;
|
||||
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
|
||||
SizeInBytes.valueOf(maxChunkSize));
|
||||
|
||||
// Set the client requestTimeout
|
||||
TimeUnit timeUnit =
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
|
||||
.getUnit();
|
||||
long duration = conf.getTimeDuration(
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
|
||||
.getDuration(), timeUnit);
|
||||
final TimeDuration clientRequestTimeout =
|
||||
TimeDuration.valueOf(duration, timeUnit);
|
||||
RaftClientConfigKeys.Rpc
|
||||
.setRequestTimeout(properties, clientRequestTimeout);
|
||||
return maxChunkSize;
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftGroupId;
|
|||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.retry.RetryPolicy;
|
||||
import org.apache.ratis.rpc.SupportedRpcType;
|
||||
import org.apache.ratis.util.TimeDuration;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -49,6 +50,7 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Test cases to verify CloseContainerCommandHandler in datanode.
|
||||
|
@ -289,8 +291,10 @@ public class TestCloseContainerCommandHandler {
|
|||
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
|
||||
Collections.singleton(datanodeDetails));
|
||||
final int maxOutstandingRequests = 100;
|
||||
final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
|
||||
peer, retryPolicy, maxOutstandingRequests, null);
|
||||
final RaftClient client = RatisHelper
|
||||
.newRaftClient(SupportedRpcType.GRPC, peer, retryPolicy,
|
||||
maxOutstandingRequests,
|
||||
TimeDuration.valueOf(3, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
|
||||
Thread.sleep(2000);
|
||||
final ContainerID containerId = ContainerID.valueof(
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftGroupId;
|
|||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.retry.RetryPolicy;
|
||||
import org.apache.ratis.rpc.SupportedRpcType;
|
||||
import org.apache.ratis.util.TimeDuration;
|
||||
import org.apache.ratis.util.function.CheckedBiConsumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -116,9 +117,11 @@ final class RatisPipelineUtils {
|
|||
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
|
||||
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
|
||||
new SecurityConfig(ozoneConf));
|
||||
final TimeDuration requestTimeout =
|
||||
RatisHelper.getClientRequestTimeout(ozoneConf);
|
||||
RaftClient client = RatisHelper
|
||||
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
|
||||
retryPolicy, maxOutstandingRequests, tlsConfig);
|
||||
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout);
|
||||
client
|
||||
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
|
||||
}
|
||||
|
@ -141,12 +144,13 @@ final class RatisPipelineUtils {
|
|||
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
|
||||
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
|
||||
SecurityConfig(ozoneConf));
|
||||
|
||||
final TimeDuration requestTimeout =
|
||||
RatisHelper.getClientRequestTimeout(ozoneConf);
|
||||
datanodes.parallelStream().forEach(d -> {
|
||||
final RaftPeer p = RatisHelper.toRaftPeer(d);
|
||||
try (RaftClient client = RatisHelper
|
||||
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
|
||||
retryPolicy, maxOutstandingRequests, tlsConfig)) {
|
||||
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
|
||||
rpc.accept(client, p);
|
||||
} catch (IOException ioe) {
|
||||
String errMsg =
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client;
|
|||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.client.rest.response.*;
|
||||
import org.apache.ratis.protocol.AlreadyClosedException;
|
||||
|
@ -27,6 +29,7 @@ import org.apache.ratis.protocol.RaftRetryFailureException;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/** A utility class for OzoneClient. */
|
||||
|
@ -122,6 +125,14 @@ public final class OzoneClientUtils {
|
|||
return keyInfo;
|
||||
}
|
||||
|
||||
public static RetryPolicy createRetryPolicy(int maxRetryCount) {
|
||||
// just retry without sleep
|
||||
RetryPolicy retryPolicy = RetryPolicies
|
||||
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0,
|
||||
TimeUnit.MILLISECONDS);
|
||||
return retryPolicy;
|
||||
}
|
||||
|
||||
public static List<Class<? extends Exception>> getExceptionList() {
|
||||
return EXCEPTION_LIST;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.storage.BufferPool;
|
|||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.om.helpers.*;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
|
@ -45,6 +47,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -87,6 +90,8 @@ public class KeyOutputStream extends OutputStream {
|
|||
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
|
||||
private FileEncryptionInfo feInfo;
|
||||
private ExcludeList excludeList;
|
||||
private final RetryPolicy retryPolicy;
|
||||
private int retryCount;
|
||||
/**
|
||||
* A constructor for testing purpose only.
|
||||
*/
|
||||
|
@ -111,6 +116,8 @@ public class KeyOutputStream extends OutputStream {
|
|||
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||
this.bytesPerChecksum = OzoneConfigKeys
|
||||
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
|
||||
this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
retryCount = 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -147,7 +154,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
String requestId, ReplicationFactor factor, ReplicationType type,
|
||||
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
|
||||
ChecksumType checksumType, int bytesPerChecksum,
|
||||
String uploadID, int partNumber, boolean isMultipart) {
|
||||
String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) {
|
||||
this.streamEntries = new ArrayList<>();
|
||||
this.currentStreamIndex = 0;
|
||||
this.omClient = omClient;
|
||||
|
@ -183,6 +190,8 @@ public class KeyOutputStream extends OutputStream {
|
|||
this.bufferPool =
|
||||
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
|
||||
this.excludeList = new ExcludeList();
|
||||
this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount);
|
||||
this.retryCount = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -308,23 +317,18 @@ public class KeyOutputStream extends OutputStream {
|
|||
current.write(b, off, writeLen);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
Throwable t = checkForException(ioe);
|
||||
if (t != null) {
|
||||
// for the current iteration, totalDataWritten - currentPos gives the
|
||||
// amount of data already written to the buffer
|
||||
// for the current iteration, totalDataWritten - currentPos gives the
|
||||
// amount of data already written to the buffer
|
||||
|
||||
// In the retryPath, the total data to be written will always be equal
|
||||
// to or less than the max length of the buffer allocated.
|
||||
// The len specified here is the combined sum of the data length of
|
||||
// the buffers
|
||||
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
|
||||
writeLen = retry ? (int) len :
|
||||
(int) (current.getWrittenDataLength() - currentPos);
|
||||
LOG.debug("writeLen {}, total len {}", writeLen, len);
|
||||
handleException(current, currentStreamIndex, t);
|
||||
} else {
|
||||
throw ioe;
|
||||
}
|
||||
// In the retryPath, the total data to be written will always be equal
|
||||
// to or less than the max length of the buffer allocated.
|
||||
// The len specified here is the combined sum of the data length of
|
||||
// the buffers
|
||||
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
|
||||
writeLen = retry ? (int) len :
|
||||
(int) (current.getWrittenDataLength() - currentPos);
|
||||
LOG.debug("writeLen {}, total len {}", writeLen, len);
|
||||
handleException(current, currentStreamIndex, ioe);
|
||||
}
|
||||
if (current.getRemaining() <= 0) {
|
||||
// since the current block is already written close the stream.
|
||||
|
@ -390,7 +394,8 @@ public class KeyOutputStream extends OutputStream {
|
|||
* @throws IOException Throws IOException if Write fails
|
||||
*/
|
||||
private void handleException(BlockOutputStreamEntry streamEntry,
|
||||
int streamIndex, Throwable exception) throws IOException {
|
||||
int streamIndex, IOException exception) throws IOException {
|
||||
Throwable t = checkForException(exception);
|
||||
boolean retryFailure = checkForRetryFailure(exception);
|
||||
boolean closedContainerException = false;
|
||||
if (!retryFailure) {
|
||||
|
@ -413,9 +418,9 @@ public class KeyOutputStream extends OutputStream {
|
|||
if (!failedServers.isEmpty()) {
|
||||
excludeList.addDatanodes(failedServers);
|
||||
}
|
||||
if (checkIfContainerIsClosed(exception)) {
|
||||
if (checkIfContainerIsClosed(t)) {
|
||||
excludeList.addConatinerId(ContainerID.valueof(containerId));
|
||||
} else if (retryFailure || exception instanceof TimeoutException) {
|
||||
} else if (retryFailure || t instanceof TimeoutException) {
|
||||
pipelineId = streamEntry.getPipeline().getId();
|
||||
excludeList.addPipeline(pipelineId);
|
||||
}
|
||||
|
@ -425,7 +430,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
// If the data is still cached in the underlying stream, we need to
|
||||
// allocate new block and write this data in the datanode.
|
||||
currentStreamIndex += 1;
|
||||
handleWrite(null, 0, bufferedDataLen, true);
|
||||
handleRetry(exception, bufferedDataLen);
|
||||
}
|
||||
if (totalSuccessfulFlushedData == 0) {
|
||||
streamEntries.remove(streamIndex);
|
||||
|
@ -448,6 +453,43 @@ public class KeyOutputStream extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleRetry(IOException exception, long len) throws IOException {
|
||||
RetryPolicy.RetryAction action;
|
||||
try {
|
||||
action = retryPolicy
|
||||
.shouldRetry(exception, retryCount, 0, true);
|
||||
} catch (Exception e) {
|
||||
throw e instanceof IOException ? (IOException) e : new IOException(e);
|
||||
}
|
||||
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
|
||||
if (action.reason != null) {
|
||||
LOG.error("Retry request failed. " + action.reason,
|
||||
exception);
|
||||
}
|
||||
throw exception;
|
||||
}
|
||||
|
||||
// Throw the exception if the thread is interrupted
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
LOG.warn("Interrupted while trying for retry");
|
||||
throw exception;
|
||||
}
|
||||
Preconditions.checkArgument(
|
||||
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
|
||||
if (action.delayMillis > 0) {
|
||||
try {
|
||||
Thread.sleep(action.delayMillis);
|
||||
} catch (InterruptedException e) {
|
||||
throw (IOException) new InterruptedIOException(
|
||||
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
|
||||
.initCause(e);
|
||||
}
|
||||
}
|
||||
retryCount++;
|
||||
LOG.trace("Retrying Write request. Already tried "
|
||||
+ retryCount + " time(s); retry policy is " + retryPolicy);
|
||||
handleWrite(null, 0, len, true);
|
||||
}
|
||||
/**
|
||||
* Checks if the provided exception signifies retry failure in ratis client.
|
||||
* In case of retry failure, ratis client throws RaftRetryFailureException
|
||||
|
@ -462,7 +504,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
return t instanceof ContainerNotOpenException;
|
||||
}
|
||||
|
||||
private Throwable checkForException(IOException ioe) {
|
||||
private Throwable checkForException(IOException ioe) throws IOException {
|
||||
Throwable t = ioe.getCause();
|
||||
while (t != null) {
|
||||
for (Class<? extends Exception> cls : OzoneClientUtils
|
||||
|
@ -473,7 +515,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
}
|
||||
t = t.getCause();
|
||||
}
|
||||
return null;
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
private long getKeyLength() {
|
||||
|
@ -512,36 +554,30 @@ public class KeyOutputStream extends OutputStream {
|
|||
if (streamEntries.size() == 0) {
|
||||
return;
|
||||
}
|
||||
int size = streamEntries.size();
|
||||
int streamIndex =
|
||||
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
|
||||
BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
|
||||
if (entry != null) {
|
||||
try {
|
||||
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
|
||||
|
||||
// failed servers can be null in case there is no data written in the
|
||||
// stream
|
||||
if (failedServers != null && !failedServers.isEmpty()) {
|
||||
excludeList.addDatanodes(failedServers);
|
||||
}
|
||||
if (close) {
|
||||
entry.close();
|
||||
} else {
|
||||
entry.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
Throwable t = checkForException(ioe);
|
||||
if (t != null) {
|
||||
// This call will allocate a new streamEntry and write the Data.
|
||||
// Close needs to be retried on the newly allocated streamEntry as
|
||||
// as well.
|
||||
handleException(entry, streamIndex, t);
|
||||
handleFlushOrClose(close);
|
||||
} else {
|
||||
throw ioe;
|
||||
while (true) {
|
||||
int size = streamEntries.size();
|
||||
int streamIndex =
|
||||
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
|
||||
BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
|
||||
if (entry != null) {
|
||||
try {
|
||||
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
|
||||
// failed servers can be null in case there is no data written in the
|
||||
// stream
|
||||
if (failedServers != null && !failedServers.isEmpty()) {
|
||||
excludeList.addDatanodes(failedServers);
|
||||
}
|
||||
if (close) {
|
||||
entry.close();
|
||||
} else {
|
||||
entry.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
handleException(entry, streamIndex, ioe);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -616,6 +652,7 @@ public class KeyOutputStream extends OutputStream {
|
|||
private String multipartUploadID;
|
||||
private int multipartNumber;
|
||||
private boolean isMultipartKey;
|
||||
private int maxRetryCount;
|
||||
|
||||
|
||||
public Builder setMultipartUploadID(String uploadID) {
|
||||
|
@ -704,11 +741,17 @@ public class KeyOutputStream extends OutputStream {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxRetryCount(int maxCount) {
|
||||
this.maxRetryCount = maxCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public KeyOutputStream build() throws IOException {
|
||||
return new KeyOutputStream(openHandler, xceiverManager, scmClient,
|
||||
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
||||
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
|
||||
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
|
||||
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
|
||||
maxRetryCount);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -123,6 +123,7 @@ public class RpcClient implements ClientProtocol {
|
|||
private final long blockSize;
|
||||
private final long watchTimeout;
|
||||
private final ClientId clientId = ClientId.randomId();
|
||||
private final int maxRetryCount;
|
||||
|
||||
/**
|
||||
* Creates RpcClient instance with the given configuration.
|
||||
|
@ -205,6 +206,9 @@ public class RpcClient implements ClientProtocol {
|
|||
this.verifyChecksum =
|
||||
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
|
||||
maxRetryCount =
|
||||
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
||||
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
||||
}
|
||||
|
||||
private InetSocketAddress getScmAddressForClient() throws IOException {
|
||||
|
@ -594,7 +598,7 @@ public class RpcClient implements ClientProtocol {
|
|||
.build();
|
||||
|
||||
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
|
||||
KeyOutputStream groupOutputStream =
|
||||
KeyOutputStream keyOutputStream =
|
||||
new KeyOutputStream.Builder()
|
||||
.setHandler(openKey)
|
||||
.setXceiverClientManager(xceiverClientManager)
|
||||
|
@ -610,20 +614,21 @@ public class RpcClient implements ClientProtocol {
|
|||
.setBlockSize(blockSize)
|
||||
.setChecksumType(checksumType)
|
||||
.setBytesPerChecksum(bytesPerChecksum)
|
||||
.setMaxRetryCount(maxRetryCount)
|
||||
.build();
|
||||
groupOutputStream.addPreallocateBlocks(
|
||||
keyOutputStream.addPreallocateBlocks(
|
||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||
openKey.getOpenVersion());
|
||||
final FileEncryptionInfo feInfo = groupOutputStream
|
||||
final FileEncryptionInfo feInfo = keyOutputStream
|
||||
.getFileEncryptionInfo();
|
||||
if (feInfo != null) {
|
||||
KeyProvider.KeyVersion decrypted = getDEK(feInfo);
|
||||
final CryptoOutputStream cryptoOut = new CryptoOutputStream(
|
||||
groupOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
|
||||
keyOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
|
||||
decrypted.getMaterial(), feInfo.getIV());
|
||||
return new OzoneOutputStream(cryptoOut);
|
||||
} else {
|
||||
return new OzoneOutputStream(groupOutputStream);
|
||||
return new OzoneOutputStream(keyOutputStream);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -856,7 +861,7 @@ public class RpcClient implements ClientProtocol {
|
|||
.build();
|
||||
|
||||
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
|
||||
KeyOutputStream groupOutputStream =
|
||||
KeyOutputStream keyOutputStream =
|
||||
new KeyOutputStream.Builder()
|
||||
.setHandler(openKey)
|
||||
.setXceiverClientManager(xceiverClientManager)
|
||||
|
@ -875,11 +880,12 @@ public class RpcClient implements ClientProtocol {
|
|||
.setMultipartNumber(partNumber)
|
||||
.setMultipartUploadID(uploadID)
|
||||
.setIsMultipartKey(true)
|
||||
.setMaxRetryCount(maxRetryCount)
|
||||
.build();
|
||||
groupOutputStream.addPreallocateBlocks(
|
||||
keyOutputStream.addPreallocateBlocks(
|
||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||
openKey.getOpenVersion());
|
||||
return new OzoneOutputStream(groupOutputStream);
|
||||
return new OzoneOutputStream(keyOutputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.ratis.client.RaftClient;
|
|||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.rpc.RpcType;
|
||||
import org.apache.ratis.rpc.SupportedRpcType;
|
||||
import org.apache.ratis.util.TimeDuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -126,9 +127,11 @@ public interface RatisTestHelper {
|
|||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final int maxOutstandingRequests =
|
||||
HddsClientUtils.getMaxOutstandingRequests(conf);
|
||||
final TimeDuration requestTimeout =
|
||||
RatisHelper.getClientRequestTimeout(conf);
|
||||
final RaftClient client =
|
||||
newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
|
||||
maxOutstandingRequests);
|
||||
maxOutstandingRequests, requestTimeout);
|
||||
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
private final ChecksumType checksumType;
|
||||
private final int bytesPerChecksum;
|
||||
private final boolean verifyChecksum;
|
||||
private final int maxRetryCount;
|
||||
|
||||
/**
|
||||
* Creates a new DistributedStorageHandler.
|
||||
|
@ -154,6 +155,9 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
this.verifyChecksum =
|
||||
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
|
||||
this.maxRetryCount =
|
||||
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
||||
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -438,7 +442,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.build();
|
||||
// contact OM to allocate a block for key.
|
||||
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
|
||||
KeyOutputStream groupOutputStream =
|
||||
KeyOutputStream keyOutputStream =
|
||||
new KeyOutputStream.Builder()
|
||||
.setHandler(openKey)
|
||||
.setXceiverClientManager(xceiverClientManager)
|
||||
|
@ -454,11 +458,12 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.setWatchTimeout(watchTimeout)
|
||||
.setChecksumType(checksumType)
|
||||
.setBytesPerChecksum(bytesPerChecksum)
|
||||
.setMaxRetryCount(maxRetryCount)
|
||||
.build();
|
||||
groupOutputStream.addPreallocateBlocks(
|
||||
keyOutputStream.addPreallocateBlocks(
|
||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||
openKey.getOpenVersion());
|
||||
return new OzoneOutputStream(groupOutputStream);
|
||||
return new OzoneOutputStream(keyOutputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue