diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java
new file mode 100644
index 00000000000..86f5a66cf4c
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+/**
+ * Exceptions thrown when a block is yet to be committed on the datanode.
+ */
+public class BlockNotCommittedException extends StorageContainerException {
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the {@link #getMessage()} method)
+ */
+ public BlockNotCommittedException(String message) {
+ super(message, ContainerProtos.Result.BLOCK_NOT_COMMITTED);
+ }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1f2fafbc7d0..1d6a89d73aa 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.storage;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .BlockNotCommittedException;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
@@ -420,6 +422,9 @@ public final class ContainerProtocolCalls {
) throws StorageContainerException {
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
return;
+ } else if (response.getResult()
+ == ContainerProtos.Result.BLOCK_NOT_COMMITTED) {
+ throw new BlockNotCommittedException(response.getMessage());
}
throw new StorageContainerException(
response.getMessage(), response.getResult());
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 6ad9085444d..8f53da58b44 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -194,6 +194,14 @@ public final class OzoneConfigKeys {
public static final int
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
+ public static final String OZONE_CLIENT_MAX_RETRIES =
+ "ozone.client.max.retries";
+ public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50;
+
+ public static final String OZONE_CLIENT_RETRY_INTERVAL =
+ "ozone.client.retry.interval";
+ public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms";
+
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6f296c61d07..a9fd10b21ae 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -243,6 +243,22 @@
Connection timeout for Ozone client in milliseconds.
+
+ ozone.client.max.retries
+ 50
+ OZONE, CLIENT
+ Maximum number of retries by Ozone Client on encountering
+ exception while fetching committed block length.
+
+
+
+ ozone.client.retry.interval
+ 200ms
+ OZONE, CLIENT
+ Interval between retries by Ozone Client on encountering
+ exception while fetching committed block length.
+
+
ozone.client.protocol
org.apache.hadoop.ozone.client.rpc.RpcClient
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 0aaee31ffb9..5d577532208 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -17,14 +17,23 @@
*/
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
/** A utility class for OzoneClient. */
public final class OzoneClientUtils {
@@ -84,4 +93,23 @@ public final class OzoneClientUtils {
keyInfo.setSize(key.getDataSize());
return keyInfo;
}
+
+ public static RetryPolicy createRetryPolicy(Configuration conf) {
+ int maxRetryCount =
+ conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+ OZONE_CLIENT_MAX_RETRIES_DEFAULT);
+ long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
+ OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
+ OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+ RetryPolicy basePolicy = RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
+ TimeUnit.MILLISECONDS);
+ Map, RetryPolicy> exceptionToPolicyMap =
+ new HashMap, RetryPolicy>();
+ exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy);
+ RetryPolicy retryPolicy = RetryPolicies
+ .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ exceptionToPolicyMap);
+ return retryPolicy;
+ }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index c632df6a6dd..21406b52c94 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -46,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -63,7 +65,7 @@ import java.util.Optional;
*/
public class ChunkGroupOutputStream extends OutputStream {
- private static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupOutputStream.class);
// array list's get(index) is O(1)
@@ -80,6 +82,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final String requestID;
private boolean closed;
private List locationInfoList;
+ private final RetryPolicy retryPolicy;
/**
* A constructor for testing purpose only.
*/
@@ -95,6 +98,7 @@ public class ChunkGroupOutputStream extends OutputStream {
requestID = null;
closed = false;
locationInfoList = null;
+ retryPolicy = null;
}
/**
@@ -124,7 +128,7 @@ public class ChunkGroupOutputStream extends OutputStream {
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
OzoneManagerProtocolClientSideTranslatorPB omClient,
int chunkSize, String requestId, ReplicationFactor factor,
- ReplicationType type) throws IOException {
+ ReplicationType type, RetryPolicy retryPolicy) throws IOException {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.byteOffset = 0;
@@ -143,6 +147,7 @@ public class ChunkGroupOutputStream extends OutputStream {
this.chunkSize = chunkSize;
this.requestID = requestId;
this.locationInfoList = new ArrayList<>();
+ this.retryPolicy = retryPolicy;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
}
@@ -305,6 +310,62 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
+ private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
+ throws IOException {
+ long blockLength;
+ ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
+ RetryPolicy.RetryAction action;
+ int numRetries = 0;
+
+ // TODO : At this point of time, we also need to allocate new blocks
+ // from a different container and may need to nullify
+ // all the remaining pre-allocated blocks in case they were
+ // pre-allocated on the same container which got closed now.This needs
+ // caching the closed container list on the client itself.
+ while (true) {
+ try {
+ responseProto = ContainerProtocolCalls
+ .getCommittedBlockLength(streamEntry.xceiverClient,
+ streamEntry.blockID, requestID);
+ blockLength = responseProto.getBlockLength();
+ return blockLength;
+ } catch (StorageContainerException sce) {
+ try {
+ action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
+ } catch (Exception e) {
+ throw e instanceof IOException ? (IOException) e : new IOException(e);
+ }
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ if (action.reason != null) {
+ LOG.error(
+ "GetCommittedBlockLength request failed. " + action.reason,
+ sce);
+ }
+ throw sce;
+ }
+
+ // Throw the exception if the thread is interrupted
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Interrupted while trying for connection");
+ throw sce;
+ }
+ Preconditions.checkArgument(
+ action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(
+ "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+ .initCause(e);
+ }
+ numRetries++;
+ LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
+ + numRetries + " time(s); retry policy is " + retryPolicy);
+ continue;
+ }
+ }
+ }
+
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@@ -317,15 +378,6 @@ public class ChunkGroupOutputStream extends OutputStream {
*/
private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
int streamIndex) throws IOException {
- // TODO : If the block is still not committed and is in the
- // pending openBlock Map, it will return BLOCK_NOT_COMMITTED
- // exception. We should handle this by retrying the same operation
- // n times and update the OzoneManager with the actual block length
- // written. At this point of time, we also need to allocate new blocks
- // from a different container and may need to nullify
- // all the remaining pre-allocated blocks in case they were
- // pre-allocated on the same container which got closed now.This needs
- // caching the closed container list on the client itself.
long committedLength = 0;
ByteBuffer buffer = streamEntry.getBuffer();
if (buffer == null) {
@@ -342,11 +394,7 @@ public class ChunkGroupOutputStream extends OutputStream {
// for this block associated with the stream here.
if (streamEntry.currentPosition >= chunkSize
|| streamEntry.currentPosition != buffer.position()) {
- ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
- ContainerProtocolCalls
- .getCommittedBlockLength(streamEntry.xceiverClient,
- streamEntry.blockID, requestID);
- committedLength = responseProto.getBlockLength();
+ committedLength = getCommittedBlockLength(streamEntry);
// update the length of the current stream
locationInfoList.get(streamIndex).setLength(committedLength);
}
@@ -481,6 +529,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
+ private RetryPolicy retryPolicy;
public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler;
@@ -526,8 +575,14 @@ public class ChunkGroupOutputStream extends OutputStream {
public ChunkGroupOutputStream build() throws IOException {
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
- omClient, chunkSize, requestID, factor, type);
+ omClient, chunkSize, requestID, factor, type, retryPolicy);
}
+
+ public Builder setRetryPolicy(RetryPolicy rPolicy) {
+ this.retryPolicy = rPolicy;
+ return this;
+ }
+
}
private static class ChunkOutputStreamEntry extends OutputStream {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index fc705144532..387f41fe6a8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -97,6 +99,7 @@ public class RpcClient implements ClientProtocol {
private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights;
+ private final RetryPolicy retryPolicy;
/**
* Creates RpcClient instance with the given configuration.
@@ -137,6 +140,7 @@ public class RpcClient implements ClientProtocol {
Client.getRpcTimeout(conf)));
this.xceiverClientManager = new XceiverClientManager(conf);
+ retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
int configuredChunkSize = conf.getInt(
ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -469,6 +473,7 @@ public class RpcClient implements ClientProtocol {
.setRequestID(requestId)
.setType(HddsProtos.ReplicationType.valueOf(type.toString()))
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+ .setRetryPolicy(retryPolicy)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 50d7ec54ddc..9f126333811 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -22,6 +22,10 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+ StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -45,6 +49,7 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.event.Level;
import java.io.IOException;
import java.security.MessageDigest;
@@ -52,6 +57,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.Random;
/**
* Tests Close Container Exception handling by Ozone Client.
@@ -67,6 +73,7 @@ public class TestCloseContainerHandlingByClient {
private static String volumeName;
private static String bucketName;
private static String keyString;
+ private static int maxRetries;
/**
* Create a MiniDFSCluster for testing.
@@ -78,6 +85,9 @@ public class TestCloseContainerHandlingByClient {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ // generate a no between 1 to 10
+ maxRetries = new Random().nextInt(10);
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries);
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
@@ -286,17 +296,8 @@ public class TestCloseContainerHandlingByClient {
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) outputStream.getOutputStream();
- long clientId = groupOutputStream.getOpenID();
- OMMetadataManager metadataManager =
- cluster.getOzoneManager().getMetadataManager();
- byte[] openKey =
- metadataManager.getOpenKeyBytes(
- volumeName, bucketName, keyName, clientId);
- byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
- OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
- OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
List locationInfoList =
- keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ getLocationInfos(groupOutputStream, keyName);
List containerIdList = new ArrayList<>();
List pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
@@ -318,7 +319,6 @@ public class TestCloseContainerHandlingByClient {
new CloseContainerCommand(containerID, type, pipeline.getId()));
}
}
-
int index = 0;
for (long containerID : containerIdList) {
Pipeline pipeline = pipelineList.get(index);
@@ -333,7 +333,6 @@ public class TestCloseContainerHandlingByClient {
}
index++;
}
-
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
@@ -345,6 +344,20 @@ public class TestCloseContainerHandlingByClient {
.createKey(keyName, size, type, factor);
}
+ private List getLocationInfos(
+ ChunkGroupOutputStream groupOutputStream, String keyName)
+ throws IOException {
+ long clientId = groupOutputStream.getOpenID();
+ OMMetadataManager metadataManager =
+ cluster.getOzoneManager().getMetadataManager();
+ byte[] openKey = metadataManager
+ .getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
+ byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
+ OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
+ OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
+ return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ }
+
private void validateData(String keyName, byte[] data) throws Exception {
byte[] readData = new byte[data.length];
OzoneInputStream is =
@@ -399,4 +412,58 @@ public class TestCloseContainerHandlingByClient {
dataString.concat(dataString);
validateData(keyName, dataString.getBytes());
}
+
+ @Test
+ public void testRetriesOnBlockNotCommittedException() throws Exception {
+ String keyName = "blockcommitexceptiontest";
+ OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+ ChunkGroupOutputStream groupOutputStream =
+ (ChunkGroupOutputStream) key.getOutputStream();
+ GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE);
+ GenericTestUtils.LogCapturer logCapturer =
+ GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG);
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ String dataString = fixedLengthString(keyString, (3 * chunkSize));
+ key.write(dataString.getBytes());
+ List locationInfos =
+ getLocationInfos(groupOutputStream, keyName);
+ long containerID = locationInfos.get(0).getContainerID();
+ List datanodes =
+ cluster.getStorageContainerManager().getScmContainerManager()
+ .getContainerWithPipeline(containerID).getPipeline().getMachines();
+ Assert.assertEquals(1, datanodes.size());
+ // move the container on the datanode to Closing state, this will ensure
+ // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying
+ // to fetch the committed length
+ for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
+ if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
+ datanodeService.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID).getContainerData()
+ .setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+ }
+ }
+ dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
+ key.write(dataString.getBytes());
+ try {
+ key.close();
+ Assert.fail("Expected Exception not thrown");
+ } catch (IOException ioe) {
+ Assert.assertTrue(ioe instanceof StorageContainerException);
+ Assert.assertTrue(((StorageContainerException) ioe).getResult()
+ == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
+ }
+ // It should retry only for max retries times
+ for (int i = 1; i <= maxRetries; i++) {
+ Assert.assertTrue(logCapturer.getOutput()
+ .contains("Retrying GetCommittedBlockLength request"));
+ Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i));
+ }
+ Assert.assertTrue(logCapturer.getOutput()
+ .contains("GetCommittedBlockLength request failed."));
+ Assert.assertTrue(logCapturer.getOutput().contains(
+ "retries get failed due to exceeded maximum allowed retries number"
+ + ": " + maxRetries));
+ logCapturer.stopCapturing();
+ }
}
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index ec33990de43..0d62432e3c4 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -85,6 +87,7 @@ public final class DistributedStorageHandler implements StorageHandler {
private final boolean useRatis;
private final HddsProtos.ReplicationType type;
private final HddsProtos.ReplicationFactor factor;
+ private final RetryPolicy retryPolicy;
/**
* Creates a new DistributedStorageHandler.
@@ -119,6 +122,7 @@ public final class DistributedStorageHandler implements StorageHandler {
OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
+ retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
+ " the maximum size ({}),"
@@ -420,6 +424,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.setRequestID(args.getRequestID())
.setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor())
+ .setRetryPolicy(retryPolicy)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),