HDDS-866. Handle RaftRetryFailureException in OzoneClient. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
176bb3f812
commit
ee44b069c6
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.HddsUtils;
|
import org.apache.hadoop.hdds.HddsUtils;
|
||||||
import org.apache.ratis.proto.RaftProtos;
|
import org.apache.ratis.proto.RaftProtos;
|
||||||
|
import org.apache.ratis.protocol.RaftRetryFailureException;
|
||||||
import org.apache.ratis.retry.RetryPolicy;
|
import org.apache.ratis.retry.RetryPolicy;
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf
|
import org.apache.ratis.thirdparty.com.google.protobuf
|
||||||
.InvalidProtocolBufferException;
|
.InvalidProtocolBufferException;
|
||||||
|
@ -196,10 +197,16 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
||||||
new ArrayList<>();
|
new ArrayList<>();
|
||||||
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
|
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
|
||||||
raftClientReply.whenComplete((reply, e) -> LOG
|
raftClientReply.whenComplete((reply, e) -> LOG
|
||||||
.debug("received reply {} for request: {} exception: {}", request,
|
.info("received reply {} for request: {} exception: {}", request,
|
||||||
reply, e))
|
reply, e))
|
||||||
.thenApply(reply -> {
|
.thenApply(reply -> {
|
||||||
try {
|
try {
|
||||||
|
// we need to handle RaftRetryFailure Exception
|
||||||
|
RaftRetryFailureException raftRetryFailureException =
|
||||||
|
reply.getRetryFailureException();
|
||||||
|
if (raftRetryFailureException != null) {
|
||||||
|
throw new CompletionException(raftRetryFailureException);
|
||||||
|
}
|
||||||
ContainerCommandResponseProto response =
|
ContainerCommandResponseProto response =
|
||||||
ContainerCommandResponseProto
|
ContainerCommandResponseProto
|
||||||
.parseFrom(reply.getMessage().getContent());
|
.parseFrom(reply.getMessage().getContent());
|
||||||
|
|
|
@ -284,7 +284,12 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
|
||||||
@Override
|
@Override
|
||||||
public void validateContainerCommand(
|
public void validateContainerCommand(
|
||||||
ContainerCommandRequestProto msg) throws StorageContainerException {
|
ContainerCommandRequestProto msg) throws StorageContainerException {
|
||||||
ContainerType containerType = msg.getCreateContainer().getContainerType();
|
long containerID = msg.getContainerID();
|
||||||
|
Container container = getContainer(containerID);
|
||||||
|
if (container == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ContainerType containerType = container.getContainerType();
|
||||||
ContainerProtos.Type cmdType = msg.getCmdType();
|
ContainerProtos.Type cmdType = msg.getCmdType();
|
||||||
AuditAction action =
|
AuditAction action =
|
||||||
ContainerCommandRequestPBHelper.getAuditAction(cmdType);
|
ContainerCommandRequestPBHelper.getAuditAction(cmdType);
|
||||||
|
@ -299,11 +304,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
|
||||||
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
|
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
long containerID = msg.getContainerID();
|
|
||||||
Container container;
|
|
||||||
container = getContainer(containerID);
|
|
||||||
|
|
||||||
if (container != null) {
|
|
||||||
State containerState = container.getContainerState();
|
State containerState = container.getContainerState();
|
||||||
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
|
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
|
||||||
switch (cmdType) {
|
switch (cmdType) {
|
||||||
|
@ -329,7 +330,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
|
||||||
throw iex;
|
throw iex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the container usage reaches the close threshold or the container is
|
* If the container usage reaches the close threshold or the container is
|
||||||
|
|
|
@ -464,17 +464,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isExist(HddsProtos.PipelineID pipelineId) {
|
public boolean isExist(HddsProtos.PipelineID pipelineId) {
|
||||||
try {
|
|
||||||
for (RaftGroupId groupId : server.getGroupIds()) {
|
for (RaftGroupId groupId : server.getGroupIds()) {
|
||||||
if (PipelineID.valueOf(
|
if (PipelineID.valueOf(groupId.getUuid()).getProtobuf()
|
||||||
groupId.getUuid()).getProtobuf().equals(pipelineId)) {
|
.equals(pipelineId)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
} catch (IOException e) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -313,6 +313,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
|
||||||
try {
|
try {
|
||||||
MetadataStore db = BlockUtils.getDB(containerData, config);
|
MetadataStore db = BlockUtils.getDB(containerData, config);
|
||||||
db.compactDB();
|
db.compactDB();
|
||||||
|
LOG.info("Container {} is closed with bcsId {}.",
|
||||||
|
containerData.getContainerID(),
|
||||||
|
containerData.getBlockCommitSequenceId());
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
|
|
|
@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
|
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
|
||||||
|
|
||||||
<!-- Apache Ratis version -->
|
<!-- Apache Ratis version -->
|
||||||
<ratis.version>0.3.0-6f3419a-SNAPSHOT</ratis.version>
|
<ratis.version>0.4.0-b600fc2-SNAPSHOT</ratis.version>
|
||||||
|
|
||||||
<bouncycastle.version>1.60</bouncycastle.version>
|
<bouncycastle.version>1.60</bouncycastle.version>
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
import org.apache.hadoop.hdds.scm.protocolPB
|
import org.apache.hadoop.hdds.scm.protocolPB
|
||||||
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
|
import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
|
||||||
|
import org.apache.ratis.protocol.RaftRetryFailureException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -393,7 +394,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
|
|
||||||
private boolean checkIfContainerIsClosed(IOException ioe) {
|
private boolean checkIfContainerIsClosed(IOException ioe) {
|
||||||
if (ioe.getCause() != null) {
|
if (ioe.getCause() != null) {
|
||||||
return checkIfContainerNotOpenException(ioe) || Optional
|
return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || Optional
|
||||||
.of(ioe.getCause())
|
.of(ioe.getCause())
|
||||||
.filter(e -> e instanceof StorageContainerException)
|
.filter(e -> e instanceof StorageContainerException)
|
||||||
.map(e -> (StorageContainerException) e)
|
.map(e -> (StorageContainerException) e)
|
||||||
|
@ -403,10 +404,12 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean checkIfContainerNotOpenException(IOException ioe) {
|
private boolean checkIfContainerNotOpenOrRaftRetryFailureException(
|
||||||
|
IOException ioe) {
|
||||||
Throwable t = ioe.getCause();
|
Throwable t = ioe.getCause();
|
||||||
while (t != null) {
|
while (t != null) {
|
||||||
if (t instanceof ContainerNotOpenException) {
|
if (t instanceof ContainerNotOpenException
|
||||||
|
|| t instanceof RaftRetryFailureException) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
t = t.getCause();
|
t = t.getCause();
|
||||||
|
|
|
@ -110,10 +110,6 @@ public class TestFailureHandlingByClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: currently, shutting down 2 datanodes in Ratis leads to
|
|
||||||
// watchForCommit Api in RaftClient to hand=g forever. Once that gets
|
|
||||||
// fixed, we need to execute the tets with 2 node failures.
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockWritesWithDnFailures() throws Exception {
|
public void testBlockWritesWithDnFailures() throws Exception {
|
||||||
String keyName = "ratis3";
|
String keyName = "ratis3";
|
||||||
|
@ -139,7 +135,7 @@ public class TestFailureHandlingByClient {
|
||||||
.getPipeline(container.getPipelineID());
|
.getPipeline(container.getPipelineID());
|
||||||
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
||||||
cluster.shutdownHddsDatanode(datanodes.get(0));
|
cluster.shutdownHddsDatanode(datanodes.get(0));
|
||||||
// cluster.shutdownHddsDatanode(datanodes.get(1));
|
cluster.shutdownHddsDatanode(datanodes.get(1));
|
||||||
// The write will fail but exception will be handled and length will be
|
// The write will fail but exception will be handled and length will be
|
||||||
// updated correctly in OzoneManager once the steam is closed
|
// updated correctly in OzoneManager once the steam is closed
|
||||||
key.close();
|
key.close();
|
||||||
|
@ -151,7 +147,6 @@ public class TestFailureHandlingByClient {
|
||||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||||
Assert.assertEquals(data.length, keyInfo.getDataSize());
|
Assert.assertEquals(data.length, keyInfo.getDataSize());
|
||||||
validateData(keyName, data);
|
validateData(keyName, data);
|
||||||
cluster.restartHddsDatanode(datanodes.get(0), true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -179,8 +174,8 @@ public class TestFailureHandlingByClient {
|
||||||
.getPipeline(container.getPipelineID());
|
.getPipeline(container.getPipelineID());
|
||||||
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
||||||
cluster.shutdownHddsDatanode(datanodes.get(0));
|
cluster.shutdownHddsDatanode(datanodes.get(0));
|
||||||
|
cluster.shutdownHddsDatanode(datanodes.get(1));
|
||||||
|
|
||||||
// cluster.shutdownHddsDatanode(datanodes.get(1));
|
|
||||||
// The write will fail but exception will be handled and length will be
|
// The write will fail but exception will be handled and length will be
|
||||||
// updated correctly in OzoneManager once the steam is closed
|
// updated correctly in OzoneManager once the steam is closed
|
||||||
key.write(data.getBytes());
|
key.write(data.getBytes());
|
||||||
|
@ -192,7 +187,6 @@ public class TestFailureHandlingByClient {
|
||||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||||
Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
|
Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
|
||||||
validateData(keyName, data.concat(data).getBytes());
|
validateData(keyName, data.concat(data).getBytes());
|
||||||
cluster.restartHddsDatanode(datanodes.get(0), true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
private OzoneOutputStream createKey(String keyName, ReplicationType type,
|
||||||
|
|
|
@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
|
<hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
|
||||||
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
|
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
|
||||||
<ozone.version>0.4.0-SNAPSHOT</ozone.version>
|
<ozone.version>0.4.0-SNAPSHOT</ozone.version>
|
||||||
<ratis.version>0.3.0-6f3419a-SNAPSHOT</ratis.version>
|
<ratis.version>0.4.0-b600fc2-SNAPSHOT</ratis.version>
|
||||||
<bouncycastle.version>1.60</bouncycastle.version>
|
<bouncycastle.version>1.60</bouncycastle.version>
|
||||||
<ozone.release>Badlands</ozone.release>
|
<ozone.release>Badlands</ozone.release>
|
||||||
<declared.ozone.version>${ozone.version}</declared.ozone.version>
|
<declared.ozone.version>${ozone.version}</declared.ozone.version>
|
||||||
|
|
Loading…
Reference in New Issue