HDDS-263. Add retries in Ozone Client to handle BlockNotCommitted Exception. Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-09-03 12:26:34 +05:30
parent ff036e49ff
commit 873ef8ae81
9 changed files with 254 additions and 29 deletions

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.common.helpers;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
/**
* Exceptions thrown when a block is yet to be committed on the datanode.
*/
public class BlockNotCommittedException extends StorageContainerException {
/**
* Constructs an {@code IOException} with the specified detail message.
*
* @param message The detail message (which is saved for later retrieval by
* the {@link #getMessage()} method)
*/
public BlockNotCommittedException(String message) {
super(message, ContainerProtos.Result.BLOCK_NOT_COMMITTED);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
@ -420,6 +422,9 @@ public final class ContainerProtocolCalls {
) throws StorageContainerException {
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
return;
} else if (response.getResult()
== ContainerProtos.Result.BLOCK_NOT_COMMITTED) {
throw new BlockNotCommittedException(response.getMessage());
}
throw new StorageContainerException(
response.getMessage(), response.getResult());

View File

@ -194,6 +194,14 @@ public final class OzoneConfigKeys {
public static final int
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
public static final String OZONE_CLIENT_MAX_RETRIES =
"ozone.client.max.retries";
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50;
public static final String OZONE_CLIENT_RETRY_INTERVAL =
"ozone.client.retry.interval";
public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms";
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -243,6 +243,22 @@
<description>Connection timeout for Ozone client in milliseconds.
</description>
</property>
<property>
<name>ozone.client.max.retries</name>
<value>50</value>
<tag>OZONE, CLIENT</tag>
<description>Maximum number of retries by Ozone Client on encountering
exception while fetching committed block length.
</description>
</property>
<property>
<name>ozone.client.retry.interval</name>
<value>200ms</value>
<tag>OZONE, CLIENT</tag>
<description>Interval between retries by Ozone Client on encountering
exception while fetching committed block length.
</description>
</property>
<property>
<name>ozone.client.protocol</name>
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>

View File

@ -17,14 +17,23 @@
*/
package org.apache.hadoop.ozone.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/** A utility class for OzoneClient. */
public final class OzoneClientUtils {
@ -84,4 +93,23 @@ public final class OzoneClientUtils {
keyInfo.setSize(key.getDataSize());
return keyInfo;
}
public static RetryPolicy createRetryPolicy(Configuration conf) {
int maxRetryCount =
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
RetryPolicy basePolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy);
RetryPolicy retryPolicy = RetryPolicies
.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
return retryPolicy;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@ -46,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -63,7 +65,7 @@ import java.util.Optional;
*/
public class ChunkGroupOutputStream extends OutputStream {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupOutputStream.class);
// array list's get(index) is O(1)
@ -80,6 +82,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final String requestID;
private boolean closed;
private List<OmKeyLocationInfo> locationInfoList;
private final RetryPolicy retryPolicy;
/**
* A constructor for testing purpose only.
*/
@ -95,6 +98,7 @@ public class ChunkGroupOutputStream extends OutputStream {
requestID = null;
closed = false;
locationInfoList = null;
retryPolicy = null;
}
/**
@ -124,7 +128,7 @@ public class ChunkGroupOutputStream extends OutputStream {
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
OzoneManagerProtocolClientSideTranslatorPB omClient,
int chunkSize, String requestId, ReplicationFactor factor,
ReplicationType type) throws IOException {
ReplicationType type, RetryPolicy retryPolicy) throws IOException {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.byteOffset = 0;
@ -143,6 +147,7 @@ public class ChunkGroupOutputStream extends OutputStream {
this.chunkSize = chunkSize;
this.requestID = requestId;
this.locationInfoList = new ArrayList<>();
this.retryPolicy = retryPolicy;
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
}
@ -305,6 +310,62 @@ public class ChunkGroupOutputStream extends OutputStream {
}
}
private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
throws IOException {
long blockLength;
ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
RetryPolicy.RetryAction action;
int numRetries = 0;
// TODO : At this point of time, we also need to allocate new blocks
// from a different container and may need to nullify
// all the remaining pre-allocated blocks in case they were
// pre-allocated on the same container which got closed now.This needs
// caching the closed container list on the client itself.
while (true) {
try {
responseProto = ContainerProtocolCalls
.getCommittedBlockLength(streamEntry.xceiverClient,
streamEntry.blockID, requestID);
blockLength = responseProto.getBlockLength();
return blockLength;
} catch (StorageContainerException sce) {
try {
action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
} catch (Exception e) {
throw e instanceof IOException ? (IOException) e : new IOException(e);
}
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.error(
"GetCommittedBlockLength request failed. " + action.reason,
sce);
}
throw sce;
}
// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupted while trying for connection");
throw sce;
}
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
}
numRetries++;
LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
+ numRetries + " time(s); retry policy is " + retryPolicy);
continue;
}
}
}
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@ -317,15 +378,6 @@ public class ChunkGroupOutputStream extends OutputStream {
*/
private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
int streamIndex) throws IOException {
// TODO : If the block is still not committed and is in the
// pending openBlock Map, it will return BLOCK_NOT_COMMITTED
// exception. We should handle this by retrying the same operation
// n times and update the OzoneManager with the actual block length
// written. At this point of time, we also need to allocate new blocks
// from a different container and may need to nullify
// all the remaining pre-allocated blocks in case they were
// pre-allocated on the same container which got closed now.This needs
// caching the closed container list on the client itself.
long committedLength = 0;
ByteBuffer buffer = streamEntry.getBuffer();
if (buffer == null) {
@ -342,11 +394,7 @@ public class ChunkGroupOutputStream extends OutputStream {
// for this block associated with the stream here.
if (streamEntry.currentPosition >= chunkSize
|| streamEntry.currentPosition != buffer.position()) {
ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
ContainerProtocolCalls
.getCommittedBlockLength(streamEntry.xceiverClient,
streamEntry.blockID, requestID);
committedLength = responseProto.getBlockLength();
committedLength = getCommittedBlockLength(streamEntry);
// update the length of the current stream
locationInfoList.get(streamIndex).setLength(committedLength);
}
@ -481,6 +529,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
private RetryPolicy retryPolicy;
public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler;
@ -526,8 +575,14 @@ public class ChunkGroupOutputStream extends OutputStream {
public ChunkGroupOutputStream build() throws IOException {
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
omClient, chunkSize, requestID, factor, type);
omClient, chunkSize, requestID, factor, type, retryPolicy);
}
public Builder setRetryPolicy(RetryPolicy rPolicy) {
this.retryPolicy = rPolicy;
return this;
}
}
private static class ChunkOutputStreamEntry extends OutputStream {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
@ -97,6 +99,7 @@ public class RpcClient implements ClientProtocol {
private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights;
private final RetryPolicy retryPolicy;
/**
* Creates RpcClient instance with the given configuration.
@ -137,6 +140,7 @@ public class RpcClient implements ClientProtocol {
Client.getRpcTimeout(conf)));
this.xceiverClientManager = new XceiverClientManager(conf);
retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
int configuredChunkSize = conf.getInt(
ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@ -469,6 +473,7 @@ public class RpcClient implements ClientProtocol {
.setRequestID(requestId)
.setType(HddsProtos.ReplicationType.valueOf(type.toString()))
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
.setRetryPolicy(retryPolicy)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),

View File

@ -22,6 +22,10 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -45,6 +49,7 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import java.io.IOException;
import java.security.MessageDigest;
@ -52,6 +57,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.Random;
/**
* Tests Close Container Exception handling by Ozone Client.
@ -67,6 +73,7 @@ public class TestCloseContainerHandlingByClient {
private static String volumeName;
private static String bucketName;
private static String keyString;
private static int maxRetries;
/**
* Create a MiniDFSCluster for testing.
@ -78,6 +85,9 @@ public class TestCloseContainerHandlingByClient {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
// generate a no between 1 to 10
maxRetries = new Random().nextInt(10);
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries);
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
@ -286,17 +296,8 @@ public class TestCloseContainerHandlingByClient {
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) outputStream.getOutputStream();
long clientId = groupOutputStream.getOpenID();
OMMetadataManager metadataManager =
cluster.getOzoneManager().getMetadataManager();
byte[] openKey =
metadataManager.getOpenKeyBytes(
volumeName, bucketName, keyName, clientId);
byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
List<OmKeyLocationInfo> locationInfoList =
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
getLocationInfos(groupOutputStream, keyName);
List<Long> containerIdList = new ArrayList<>();
List<Pipeline> pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
@ -318,7 +319,6 @@ public class TestCloseContainerHandlingByClient {
new CloseContainerCommand(containerID, type, pipeline.getId()));
}
}
int index = 0;
for (long containerID : containerIdList) {
Pipeline pipeline = pipelineList.get(index);
@ -333,7 +333,6 @@ public class TestCloseContainerHandlingByClient {
}
index++;
}
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
@ -345,6 +344,20 @@ public class TestCloseContainerHandlingByClient {
.createKey(keyName, size, type, factor);
}
private List<OmKeyLocationInfo> getLocationInfos(
ChunkGroupOutputStream groupOutputStream, String keyName)
throws IOException {
long clientId = groupOutputStream.getOpenID();
OMMetadataManager metadataManager =
cluster.getOzoneManager().getMetadataManager();
byte[] openKey = metadataManager
.getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
}
private void validateData(String keyName, byte[] data) throws Exception {
byte[] readData = new byte[data.length];
OzoneInputStream is =
@ -399,4 +412,58 @@ public class TestCloseContainerHandlingByClient {
dataString.concat(dataString);
validateData(keyName, dataString.getBytes());
}
@Test
public void testRetriesOnBlockNotCommittedException() throws Exception {
String keyName = "blockcommitexceptiontest";
OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG);
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
String dataString = fixedLengthString(keyString, (3 * chunkSize));
key.write(dataString.getBytes());
List<OmKeyLocationInfo> locationInfos =
getLocationInfos(groupOutputStream, keyName);
long containerID = locationInfos.get(0).getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getScmContainerManager()
.getContainerWithPipeline(containerID).getPipeline().getMachines();
Assert.assertEquals(1, datanodes.size());
// move the container on the datanode to Closing state, this will ensure
// closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying
// to fetch the committed length
for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
datanodeService.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID).getContainerData()
.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
}
}
dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
key.write(dataString.getBytes());
try {
key.close();
Assert.fail("Expected Exception not thrown");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof StorageContainerException);
Assert.assertTrue(((StorageContainerException) ioe).getResult()
== ContainerProtos.Result.BLOCK_NOT_COMMITTED);
}
// It should retry only for max retries times
for (int i = 1; i <= maxRetries; i++) {
Assert.assertTrue(logCapturer.getOutput()
.contains("Retrying GetCommittedBlockLength request"));
Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i));
}
Assert.assertTrue(logCapturer.getOutput()
.contains("GetCommittedBlockLength request failed."));
Assert.assertTrue(logCapturer.getOutput().contains(
"retries get failed due to exceeded maximum allowed retries number"
+ ": " + maxRetries));
logCapturer.stopCapturing();
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@ -85,6 +87,7 @@ public final class DistributedStorageHandler implements StorageHandler {
private final boolean useRatis;
private final HddsProtos.ReplicationType type;
private final HddsProtos.ReplicationFactor factor;
private final RetryPolicy retryPolicy;
/**
* Creates a new DistributedStorageHandler.
@ -119,6 +122,7 @@ public final class DistributedStorageHandler implements StorageHandler {
OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
+ " the maximum size ({}),"
@ -420,6 +424,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.setRequestID(args.getRequestID())
.setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor())
.setRetryPolicy(retryPolicy)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),