HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2019-02-10 10:53:16 +05:30
parent e50dc7ee59
commit 965d26c9c7
8 changed files with 266 additions and 71 deletions

View File

@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
@ -198,11 +199,27 @@ public class XceiverClientGrpc extends XceiverClientSpi {
@Override @Override
public ContainerCommandResponseProto sendCommand( public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException { ContainerCommandRequestProto request) throws IOException {
return sendCommandWithRetry(request); try {
XceiverClientReply reply;
reply = sendCommandWithRetry(request, null);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to execute command " + request, e);
}
} }
public ContainerCommandResponseProto sendCommandWithRetry( @Override
ContainerCommandRequestProto request) throws IOException { public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<UUID> excludeDns)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithRetry(request, excludeDns);
}
private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<UUID> excludeDns)
throws IOException {
ContainerCommandResponseProto responseProto = null; ContainerCommandResponseProto responseProto = null;
// In case of an exception or an error, we will try to read from the // In case of an exception or an error, we will try to read from the
@ -211,13 +228,24 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// TODO: cache the correct leader info in here, so that any subsequent calls // TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader // should first go to leader
List<DatanodeDetails> dns = pipeline.getNodes(); List<DatanodeDetails> dns = pipeline.getNodes();
for (DatanodeDetails dn : dns) { DatanodeDetails datanode = null;
List<DatanodeDetails> healthyDns =
excludeDns != null ? dns.stream().filter(dnId -> {
for (UUID excludeId : excludeDns) {
if (dnId.getUuid().equals(excludeId)) {
return false;
}
}
return true;
}).collect(Collectors.toList()) : dns;
for (DatanodeDetails dn : healthyDns) {
try { try {
LOG.debug("Executing command " + request + " on datanode " + dn); LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode, // In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub // sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode. // in case these don't exist for the specific datanode.
responseProto = sendCommandAsync(request, dn).getResponse().get(); responseProto = sendCommandAsync(request, dn).getResponse().get();
datanode = dn;
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
break; break;
} }
@ -226,14 +254,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
.getUuidString(), e); .getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode() if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) { == Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with " + throw new SCMSecurityException("Failed to authenticate with "
"GRPC XceiverServer with Ozone block token."); + "GRPC XceiverServer with Ozone block token.");
} }
} }
} }
if (responseProto != null) { if (responseProto != null) {
return responseProto; return new XceiverClientReply(
CompletableFuture.completedFuture(responseProto), datanode.getUuid());
} else { } else {
throw new IOException( throw new IOException(
"Failed to execute command " + request + " on the pipeline " "Failed to execute command " + request + " on the pipeline "
@ -256,10 +285,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public XceiverClientAsyncReply sendCommandAsync( public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {
XceiverClientAsyncReply asyncReply = XceiverClientReply asyncReply =
sendCommandAsync(request, pipeline.getFirstNode()); sendCommandAsync(request, pipeline.getFirstNode());
// TODO : for now make this API sync in nature as async requests are // TODO : for now make this API sync in nature as async requests are
@ -272,7 +301,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
return asyncReply; return asyncReply;
} }
private XceiverClientAsyncReply sendCommandAsync( private XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn) ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {
if (closed) { if (closed) {
@ -327,7 +356,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}); });
requestObserver.onNext(request); requestObserver.onNext(request);
requestObserver.onCompleted(); requestObserver.onCompleted();
return new XceiverClientAsyncReply(replyFuture); return new XceiverClientReply(replyFuture);
} }
private void reconnect(DatanodeDetails dn, String encodedToken) private void reconnect(DatanodeDetails dn, String encodedToken)

View File

@ -204,7 +204,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return minIndex.isPresent() ? minIndex.getAsLong() : 0; return minIndex.isPresent() ? minIndex.getAsLong() : 0;
} }
@Override @Override
public long watchForCommit(long index, long timeout) public long watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException, throws InterruptedException, ExecutionException, TimeoutException,
@ -254,7 +253,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
commitInfoMap.remove(address); commitInfoMap.remove(address);
LOG.info( LOG.info(
"Could not commit " + index + " to all the nodes. Server " + address "Could not commit " + index + " to all the nodes. Server " + address
+ " has failed" + "Committed by majority."); + " has failed." + " Committed by majority.");
} }
return index; return index;
} }
@ -266,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* @return Response to the command * @return Response to the command
*/ */
@Override @Override
public XceiverClientAsyncReply sendCommandAsync( public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) { ContainerCommandRequestProto request) {
XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null); XceiverClientReply asyncReply = new XceiverClientReply(null);
CompletableFuture<RaftClientReply> raftClientReply = CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request); sendRequestAsync(request);
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse = CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
@ -291,6 +290,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
if (response.getResult() == ContainerProtos.Result.SUCCESS) { if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos()); updateCommitInfosMap(reply.getCommitInfos());
asyncReply.setLogIndex(reply.getLogIndex()); asyncReply.setLogIndex(reply.getLogIndex());
asyncReply.setDatanode(
RatisHelper.toDatanodeId(reply.getReplierId()));
} }
return response; return response;
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdds.scm.storage; package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException; .StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.Checksum;
@ -35,8 +38,11 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/** /**
* An {@link InputStream} used by the REST service in combination with the * An {@link InputStream} used by the REST service in combination with the
@ -204,27 +210,57 @@ public class BlockInputStream extends InputStream implements Seekable {
// On every chunk read chunkIndex should be increased so as to read the // On every chunk read chunkIndex should be increased so as to read the
// next chunk // next chunk
chunkIndex += 1; chunkIndex += 1;
final ReadChunkResponseProto readChunkResponse; XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
final ChunkInfo chunkInfo = chunks.get(chunkIndex); final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<UUID> excludeDns = null;
ByteString byteString;
List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
while (true) {
try { try {
readChunkResponse = ContainerProtocolCalls reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID); .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
} catch (IOException e) { } catch (IOException e) {
if (e instanceof StorageContainerException) { if (e instanceof StorageContainerException) {
throw e; throw e;
} }
throw new IOException("Unexpected OzoneException: " + e.toString(), e); throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
} }
ByteString byteString = readChunkResponse.getData(); byteString = readChunkResponse.getData();
try {
if (byteString.size() != chunkInfo.getLen()) { if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size. // Bytes read from chunk should be equal to chunk size.
throw new IOException(String throw new IOException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d", .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); chunkInfo.getChunkName(), chunkInfo.getLen(),
byteString.size()));
} }
ChecksumData checksumData = ChecksumData.getFromProtoBuf( ChecksumData checksumData =
chunkInfo.getChecksumData()); ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
Checksum.verifyChecksum(byteString, checksumData); Checksum.verifyChecksum(byteString, checksumData);
break;
} catch (IOException ioe) {
// we will end up in this situation only if the checksum mismatch
// happens or the length of the chunk mismatches.
// In this case, read should be retried on a different replica.
// TODO: Inform SCM of a possible corrupt container replica here
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
excludeDns.add(reply.getDatanode());
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
}
}
buffers = byteString.asReadOnlyByteBufferList(); buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0; bufferIndex = 0;

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.hdds.scm.storage; package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.Checksum;
@ -379,7 +379,7 @@ public class BlockOutputStream extends OutputStream {
CompletableFuture<ContainerProtos. CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture; ContainerCommandResponseProto> flushFuture;
try { try {
XceiverClientAsyncReply asyncReply = XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, containerBlockData.build(), requestId); putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse(); asyncReply.getResponse();
@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream {
traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
.getChunkName(); .getChunkName();
try { try {
XceiverClientAsyncReply asyncReply = XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId); writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse(); asyncReply.getResponse();

View File

@ -21,20 +21,29 @@ package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto; .ContainerCommandResponseProto;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
* This class represents the Async reply from XceiverClient. * This class represents the Async reply from XceiverClient.
*/ */
public class XceiverClientAsyncReply { public class XceiverClientReply {
private CompletableFuture<ContainerCommandResponseProto> response; private CompletableFuture<ContainerCommandResponseProto> response;
private Long logIndex; private Long logIndex;
private UUID dnId;
public XceiverClientAsyncReply( public XceiverClientReply(
CompletableFuture<ContainerCommandResponseProto> response) { CompletableFuture<ContainerCommandResponseProto> response) {
this(response, null);
}
public XceiverClientReply(
CompletableFuture<ContainerCommandResponseProto> response, UUID dnId) {
this.logIndex = (long) 0; this.logIndex = (long) 0;
this.response = response; this.response = response;
this.dnId = dnId;
} }
public CompletableFuture<ContainerCommandResponseProto> getResponse() { public CompletableFuture<ContainerCommandResponseProto> getResponse() {
@ -49,6 +58,14 @@ public class XceiverClientAsyncReply {
this.logIndex = logIndex; this.logIndex = logIndex;
} }
public UUID getDatanode() {
return dnId;
}
public void setDatanode(UUID datanodeId) {
this.dnId = datanodeId;
}
public void setResponse( public void setResponse(
CompletableFuture<ContainerCommandResponseProto> response) { CompletableFuture<ContainerCommandResponseProto> response) {
this.response = response; this.response = response;

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -104,7 +106,7 @@ public abstract class XceiverClientSpi implements Closeable {
public ContainerCommandResponseProto sendCommand( public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException { ContainerCommandRequestProto request) throws IOException {
try { try {
XceiverClientAsyncReply reply; XceiverClientReply reply;
reply = sendCommandAsync(request); reply = sendCommandAsync(request);
ContainerCommandResponseProto responseProto = reply.getResponse().get(); ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto; return responseProto;
@ -113,6 +115,27 @@ public abstract class XceiverClientSpi implements Closeable {
} }
} }
/**
* Sends a given command to server and gets the reply back along with
* the server associated info.
* @param request Request
* @param excludeDns list of servers on which the command won't be sent to.
* @return Response to the command
* @throws IOException
*/
public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<UUID> excludeDns)
throws IOException {
try {
XceiverClientReply reply;
reply = sendCommandAsync(request);
reply.getResponse().get();
return reply;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to command " + request, e);
}
}
/** /**
* Sends a given command to server gets a waitable future back. * Sends a given command to server gets a waitable future back.
* *
@ -120,7 +143,7 @@ public abstract class XceiverClientSpi implements Closeable {
* @return Response to the command * @return Response to the command
* @throws IOException * @throws IOException
*/ */
public abstract XceiverClientAsyncReply public abstract XceiverClientReply
sendCommandAsync(ContainerCommandRequestProto request) sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException; throws IOException, ExecutionException, InterruptedException;

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hdds.scm.storage; package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException; .BlockNotCommittedException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@ -57,8 +57,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto; .PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto; .ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerRequestProto; .ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -72,6 +70,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
/** /**
@ -199,7 +199,7 @@ public final class ContainerProtocolCalls {
* @throws InterruptedException * @throws InterruptedException
* @throws ExecutionException * @throws ExecutionException
*/ */
public static XceiverClientAsyncReply putBlockAsync( public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData, XceiverClientSpi xceiverClient, BlockData containerBlockData,
String traceID) String traceID)
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
@ -217,7 +217,6 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken); builder.setEncodedToken(encodedToken);
} }
ContainerCommandRequestProto request = builder.build(); ContainerCommandRequestProto request = builder.build();
xceiverClient.sendCommand(request);
return xceiverClient.sendCommandAsync(request); return xceiverClient.sendCommandAsync(request);
} }
@ -228,11 +227,13 @@ public final class ContainerProtocolCalls {
* @param chunk information about chunk to read * @param chunk information about chunk to read
* @param blockID ID of the block * @param blockID ID of the block
* @param traceID container protocol call args * @param traceID container protocol call args
* @param excludeDns datamode to exclude while executing the command
* @return container protocol read chunk response * @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient,
ChunkInfo chunk, BlockID blockID, String traceID) throws IOException { ChunkInfo chunk, BlockID blockID, String traceID, List<UUID> excludeDns)
throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder() .newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setBlockID(blockID.getDatanodeBlockIDProtobuf())
@ -251,9 +252,9 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken); builder.setEncodedToken(encodedToken);
} }
ContainerCommandRequestProto request = builder.build(); ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request); XceiverClientReply reply =
validateContainerResponse(response); xceiverClient.sendCommand(request, excludeDns);
return response.getReadChunk(); return reply;
} }
/** /**
@ -302,7 +303,7 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args * @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static XceiverClientAsyncReply writeChunkAsync( public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String traceID) ByteString data, String traceID)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
@ -879,7 +880,7 @@ public abstract class TestOzoneRpcClientAbstract {
// Write data into a key // Write data into a key
OzoneOutputStream out = bucket.createKey(keyName, OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.STAND_ALONE, value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>()); ReplicationFactor.ONE, new HashMap<>());
out.write(value.getBytes()); out.write(value.getBytes());
out.close(); out.close();
@ -889,8 +890,6 @@ public abstract class TestOzoneRpcClientAbstract {
OzoneKey key = bucket.getKey(keyName); OzoneKey key = bucket.getKey(keyName);
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getContainerID(); .getContainerID();
long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getLocalID();
// Get the container by traversing the datanodes. Atleast one of the // Get the container by traversing the datanodes. Atleast one of the
// datanode must have this container. // datanode must have this container.
@ -903,15 +902,114 @@ public abstract class TestOzoneRpcClientAbstract {
} }
} }
Assert.assertNotNull("Container not found", container); Assert.assertNotNull("Container not found", container);
corruptData(container, key);
// Try reading the key. Since the chunk file is corrupted, it should
// throw a checksum mismatch exception.
try {
OzoneInputStream is = bucket.readKey(keyName);
is.read(new byte[100]);
fail("Reading corrupted data should fail.");
} catch (OzoneChecksumException e) {
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
}
}
/**
* Tests reading a corrputed chunk file throws checksum exception.
* @throws IOException
*/
@Test
public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
byte[] data = value.getBytes();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.THREE, new HashMap<>());
out.write(value.getBytes());
out.close();
// We need to find the location of the chunk file corresponding to the
// data we just wrote.
OzoneKey key = bucket.getKey(keyName);
List<OzoneKeyLocation> keyLocation =
((OzoneKeyDetails) key).getOzoneKeyLocations();
Assert.assertTrue("Key location not found in OM", !keyLocation.isEmpty());
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getContainerID();
// Get the container by traversing the datanodes.
List<Container> containerList = new ArrayList<>();
Container container;
for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
container = hddsDatanode.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
if (container != null) {
containerList.add(container);
if (containerList.size() == 3) {
break;
}
}
}
Assert.assertTrue("Container not found", !containerList.isEmpty());
corruptData(containerList.get(0), key);
// Try reading the key. Read will fail on the first node and will eventually
// failover to next replica
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[data.length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, data));
} catch (OzoneChecksumException e) {
fail("Reading corrupted data should not fail.");
}
corruptData(containerList.get(1), key);
// Try reading the key. Read will fail on the first node and will eventually
// failover to next replica
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[data.length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, data));
} catch (OzoneChecksumException e) {
fail("Reading corrupted data should not fail.");
}
corruptData(containerList.get(2), key);
// Try reading the key. Read will fail here as all the replica are corrupt
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[data.length];
is.read(b);
fail("Reading corrupted data should fail.");
} catch (OzoneChecksumException e) {
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
}
}
private void corruptData(Container container, OzoneKey key)
throws IOException {
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getContainerID();
long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getLocalID();
// From the containerData, get the block iterator for all the blocks in // From the containerData, get the block iterator for all the blocks in
// the container. // the container.
KeyValueContainerData containerData = KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData(); (KeyValueContainerData) container.getContainerData();
String containerPath = new File(containerData.getMetadataPath()) String containerPath =
.getParent(); new File(containerData.getMetadataPath()).getParent();
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( KeyValueBlockIterator keyValueBlockIterator =
containerID, new File(containerPath)); new KeyValueBlockIterator(containerID, new File(containerPath));
// Find the block corresponding to the key we put. We use the localID of // Find the block corresponding to the key we put. We use the localID of
// the BlockData to identify out key. // the BlockData to identify out key.
@ -926,8 +1024,8 @@ public abstract class TestOzoneRpcClientAbstract {
// Get the location of the chunk file // Get the location of the chunk file
String chunkName = blockData.getChunks().get(0).getChunkName(); String chunkName = blockData.getChunks().get(0).getChunkName();
String containreBaseDir = container.getContainerData().getVolume() String containreBaseDir =
.getHddsRootDir().getPath(); container.getContainerData().getVolume().getHddsRootDir().getPath();
File chunksLocationPath = KeyValueContainerLocationUtil File chunksLocationPath = KeyValueContainerLocationUtil
.getChunksLocationPath(containreBaseDir, SCM_ID, containerID); .getChunksLocationPath(containreBaseDir, SCM_ID, containerID);
File chunkFile = new File(chunksLocationPath, chunkName); File chunkFile = new File(chunksLocationPath, chunkName);
@ -935,16 +1033,6 @@ public abstract class TestOzoneRpcClientAbstract {
// Corrupt the contents of the chunk file // Corrupt the contents of the chunk file
String newData = new String("corrupted data"); String newData = new String("corrupted data");
FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes()); FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
// Try reading the key. Since the chunk file is corrupted, it should
// throw a checksum mismatch exception.
try {
OzoneInputStream is = bucket.readKey(keyName);
is.read(new byte[100]);
fail("Reading corrupted data should fail.");
} catch (OzoneChecksumException e) {
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
}
} }
@Test @Test