From 965d26c9c758bb0211cb918e95fa661194e771d3 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Sun, 10 Feb 2019 10:53:16 +0530 Subject: [PATCH] HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 51 +++++-- .../hadoop/hdds/scm/XceiverClientRatis.java | 9 +- .../hdds/scm/storage/BlockInputStream.java | 72 +++++++--- .../hdds/scm/storage/BlockOutputStream.java | 6 +- ...syncReply.java => XceiverClientReply.java} | 23 +++- .../hadoop/hdds/scm/XceiverClientSpi.java | 27 +++- .../scm/storage/ContainerProtocolCalls.java | 23 ++-- .../rpc/TestOzoneRpcClientAbstract.java | 126 +++++++++++++++--- 8 files changed, 266 insertions(+), 71 deletions(-) rename hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/{XceiverClientAsyncReply.java => XceiverClientReply.java} (79%) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 5c8ca26049d..6fa54a5827d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * A Client for the storageContainer protocol. @@ -198,11 +199,27 @@ public class XceiverClientGrpc extends XceiverClientSpi { @Override public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { - return sendCommandWithRetry(request); + try { + XceiverClientReply reply; + reply = sendCommandWithRetry(request, null); + ContainerCommandResponseProto responseProto = reply.getResponse().get(); + return responseProto; + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to execute command " + request, e); + } } - public ContainerCommandResponseProto sendCommandWithRetry( - ContainerCommandRequestProto request) throws IOException { + @Override + public XceiverClientReply sendCommand( + ContainerCommandRequestProto request, List excludeDns) + throws IOException { + Preconditions.checkState(HddsUtils.isReadOnly(request)); + return sendCommandWithRetry(request, excludeDns); + } + + private XceiverClientReply sendCommandWithRetry( + ContainerCommandRequestProto request, List excludeDns) + throws IOException { ContainerCommandResponseProto responseProto = null; // In case of an exception or an error, we will try to read from the @@ -211,13 +228,24 @@ public class XceiverClientGrpc extends XceiverClientSpi { // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader List dns = pipeline.getNodes(); - for (DatanodeDetails dn : dns) { + DatanodeDetails datanode = null; + List healthyDns = + excludeDns != null ? dns.stream().filter(dnId -> { + for (UUID excludeId : excludeDns) { + if (dnId.getUuid().equals(excludeId)) { + return false; + } + } + return true; + }).collect(Collectors.toList()) : dns; + for (DatanodeDetails dn : healthyDns) { try { LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. responseProto = sendCommandAsync(request, dn).getResponse().get(); + datanode = dn; if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { break; } @@ -226,14 +254,15 @@ public class XceiverClientGrpc extends XceiverClientSpi { .getUuidString(), e); if (Status.fromThrowable(e.getCause()).getCode() == Status.UNAUTHENTICATED.getCode()) { - throw new SCMSecurityException("Failed to authenticate with " + - "GRPC XceiverServer with Ozone block token."); + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); } } } if (responseProto != null) { - return responseProto; + return new XceiverClientReply( + CompletableFuture.completedFuture(responseProto), datanode.getUuid()); } else { throw new IOException( "Failed to execute command " + request + " on the pipeline " @@ -256,10 +285,10 @@ public class XceiverClientGrpc extends XceiverClientSpi { * @throws IOException */ @Override - public XceiverClientAsyncReply sendCommandAsync( + public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - XceiverClientAsyncReply asyncReply = + XceiverClientReply asyncReply = sendCommandAsync(request, pipeline.getFirstNode()); // TODO : for now make this API sync in nature as async requests are @@ -272,7 +301,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { return asyncReply; } - private XceiverClientAsyncReply sendCommandAsync( + private XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) throws IOException, ExecutionException, InterruptedException { if (closed) { @@ -327,7 +356,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { }); requestObserver.onNext(request); requestObserver.onCompleted(); - return new XceiverClientAsyncReply(replyFuture); + return new XceiverClientReply(replyFuture); } private void reconnect(DatanodeDetails dn, String encodedToken) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 338a198ac37..c697b098aab 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -204,7 +204,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { return minIndex.isPresent() ? minIndex.getAsLong() : 0; } - @Override public long watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, @@ -254,7 +253,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { commitInfoMap.remove(address); LOG.info( "Could not commit " + index + " to all the nodes. Server " + address - + " has failed" + "Committed by majority."); + + " has failed." + " Committed by majority."); } return index; } @@ -266,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi { * @return Response to the command */ @Override - public XceiverClientAsyncReply sendCommandAsync( + public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request) { - XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null); + XceiverClientReply asyncReply = new XceiverClientReply(null); CompletableFuture raftClientReply = sendRequestAsync(request); CompletableFuture containerCommandResponse = @@ -291,6 +290,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { if (response.getResult() == ContainerProtos.Result.SUCCESS) { updateCommitInfosMap(reply.getCommitInfos()); asyncReply.setLogIndex(reply.getLogIndex()); + asyncReply.setDatanode( + RatisHelper.toDatanodeId(reply.getReplierId())); } return response; } catch (InvalidProtocolBufferException e) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 5303efd2aa2..001fedab1dd 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.common.Checksum; @@ -35,8 +38,11 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; /** * An {@link InputStream} used by the REST service in combination with the @@ -204,27 +210,57 @@ public class BlockInputStream extends InputStream implements Seekable { // On every chunk read chunkIndex should be increased so as to read the // next chunk chunkIndex += 1; - final ReadChunkResponseProto readChunkResponse; + XceiverClientReply reply; + ReadChunkResponseProto readChunkResponse = null; final ChunkInfo chunkInfo = chunks.get(chunkIndex); - try { - readChunkResponse = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; + List excludeDns = null; + ByteString byteString; + List dnList = xceiverClient.getPipeline().getNodes(); + while (true) { + try { + reply = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); + ContainerProtos.ContainerCommandResponseProto response; + response = reply.getResponse().get(); + ContainerProtocolCalls.validateContainerResponse(response); + readChunkResponse = response.getReadChunk(); + } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } catch (ExecutionException | InterruptedException e) { + throw new IOException( + "Failed to execute ReadChunk command for chunk " + chunkInfo + .getChunkName(), e); + } + byteString = readChunkResponse.getData(); + try { + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new IOException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), + byteString.size())); + } + ChecksumData checksumData = + ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); + Checksum.verifyChecksum(byteString, checksumData); + break; + } catch (IOException ioe) { + // we will end up in this situation only if the checksum mismatch + // happens or the length of the chunk mismatches. + // In this case, read should be retried on a different replica. + // TODO: Inform SCM of a possible corrupt container replica here + if (excludeDns == null) { + excludeDns = new ArrayList<>(); + } + excludeDns.add(reply.getDatanode()); + if (excludeDns.size() == dnList.size()) { + throw ioe; + } } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); } - ByteString byteString = readChunkResponse.getData(); - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); - } - ChecksumData checksumData = ChecksumData.getFromProtoBuf( - chunkInfo.getChecksumData()); - Checksum.verifyChecksum(byteString, checksumData); buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 6cc0b54e7fa..4c6cd7b1c8c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; +import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; @@ -379,7 +379,7 @@ public class BlockOutputStream extends OutputStream { CompletableFuture flushFuture; try { - XceiverClientAsyncReply asyncReply = + XceiverClientReply asyncReply = putBlockAsync(xceiverClient, containerBlockData.build(), requestId); CompletableFuture future = asyncReply.getResponse(); @@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream { traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo .getChunkName(); try { - XceiverClientAsyncReply asyncReply = + XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId); CompletableFuture future = asyncReply.getResponse(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java similarity index 79% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java index 40d97bf9cb2..56785556083 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java @@ -21,20 +21,29 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; + +import java.util.UUID; import java.util.concurrent.CompletableFuture; /** * This class represents the Async reply from XceiverClient. */ -public class XceiverClientAsyncReply { +public class XceiverClientReply { private CompletableFuture response; private Long logIndex; + private UUID dnId; - public XceiverClientAsyncReply( + public XceiverClientReply( CompletableFuture response) { - this.logIndex = (long)0; + this(response, null); + } + + public XceiverClientReply( + CompletableFuture response, UUID dnId) { + this.logIndex = (long) 0; this.response = response; + this.dnId = dnId; } public CompletableFuture getResponse() { @@ -49,6 +58,14 @@ public class XceiverClientAsyncReply { this.logIndex = logIndex; } + public UUID getDatanode() { + return dnId; + } + + public void setDatanode(UUID datanodeId) { + this.dnId = datanodeId; + } + public void setResponse( CompletableFuture response) { this.response = response; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 9da74afefbf..0d4b3a4e9b9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -104,7 +106,7 @@ public abstract class XceiverClientSpi implements Closeable { public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { try { - XceiverClientAsyncReply reply; + XceiverClientReply reply; reply = sendCommandAsync(request); ContainerCommandResponseProto responseProto = reply.getResponse().get(); return responseProto; @@ -113,6 +115,27 @@ public abstract class XceiverClientSpi implements Closeable { } } + /** + * Sends a given command to server and gets the reply back along with + * the server associated info. + * @param request Request + * @param excludeDns list of servers on which the command won't be sent to. + * @return Response to the command + * @throws IOException + */ + public XceiverClientReply sendCommand( + ContainerCommandRequestProto request, List excludeDns) + throws IOException { + try { + XceiverClientReply reply; + reply = sendCommandAsync(request); + reply.getResponse().get(); + return reply; + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Failed to command " + request, e); + } + } + /** * Sends a given command to server gets a waitable future back. * @@ -120,7 +143,7 @@ public abstract class XceiverClientSpi implements Closeable { * @return Response to the command * @throws IOException */ - public abstract XceiverClientAsyncReply + public abstract XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 114b6e62750..62968314dfc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; -import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; +import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; @@ -57,8 +57,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -72,6 +70,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; +import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; /** @@ -199,7 +199,7 @@ public final class ContainerProtocolCalls { * @throws InterruptedException * @throws ExecutionException */ - public static XceiverClientAsyncReply putBlockAsync( + public static XceiverClientReply putBlockAsync( XceiverClientSpi xceiverClient, BlockData containerBlockData, String traceID) throws IOException, InterruptedException, ExecutionException { @@ -217,7 +217,6 @@ public final class ContainerProtocolCalls { builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - xceiverClient.sendCommand(request); return xceiverClient.sendCommandAsync(request); } @@ -228,11 +227,13 @@ public final class ContainerProtocolCalls { * @param chunk information about chunk to read * @param blockID ID of the block * @param traceID container protocol call args + * @param excludeDns datamode to exclude while executing the command * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, - ChunkInfo chunk, BlockID blockID, String traceID) throws IOException { + public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient, + ChunkInfo chunk, BlockID blockID, String traceID, List excludeDns) + throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -251,9 +252,9 @@ public final class ContainerProtocolCalls { builder.setEncodedToken(encodedToken); } ContainerCommandRequestProto request = builder.build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); - return response.getReadChunk(); + XceiverClientReply reply = + xceiverClient.sendCommand(request, excludeDns); + return reply; } /** @@ -302,7 +303,7 @@ public final class ContainerProtocolCalls { * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static XceiverClientAsyncReply writeChunkAsync( + public static XceiverClientReply writeChunkAsync( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, ByteString data, String traceID) throws IOException, ExecutionException, InterruptedException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index c9c1c1cde89..fcf94656a0c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; + import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; @@ -879,7 +880,7 @@ public abstract class TestOzoneRpcClientAbstract { // Write data into a key OzoneOutputStream out = bucket.createKey(keyName, - value.getBytes().length, ReplicationType.STAND_ALONE, + value.getBytes().length, ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); out.write(value.getBytes()); out.close(); @@ -889,8 +890,6 @@ public abstract class TestOzoneRpcClientAbstract { OzoneKey key = bucket.getKey(keyName); long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) .getContainerID(); - long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) - .getLocalID(); // Get the container by traversing the datanodes. Atleast one of the // datanode must have this container. @@ -903,15 +902,114 @@ public abstract class TestOzoneRpcClientAbstract { } } Assert.assertNotNull("Container not found", container); + corruptData(container, key); + // Try reading the key. Since the chunk file is corrupted, it should + // throw a checksum mismatch exception. + try { + OzoneInputStream is = bucket.readKey(keyName); + is.read(new byte[100]); + fail("Reading corrupted data should fail."); + } catch (OzoneChecksumException e) { + GenericTestUtils.assertExceptionContains("Checksum mismatch", e); + } + } + + /** + * Tests reading a corrputed chunk file throws checksum exception. + * @throws IOException + */ + @Test + public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + byte[] data = value.getBytes(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // Write data into a key + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + out.write(value.getBytes()); + out.close(); + + // We need to find the location of the chunk file corresponding to the + // data we just wrote. + OzoneKey key = bucket.getKey(keyName); + List keyLocation = + ((OzoneKeyDetails) key).getOzoneKeyLocations(); + Assert.assertTrue("Key location not found in OM", !keyLocation.isEmpty()); + long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) + .getContainerID(); + + // Get the container by traversing the datanodes. + List containerList = new ArrayList<>(); + Container container; + for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) { + container = hddsDatanode.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID); + if (container != null) { + containerList.add(container); + if (containerList.size() == 3) { + break; + } + } + } + Assert.assertTrue("Container not found", !containerList.isEmpty()); + corruptData(containerList.get(0), key); + // Try reading the key. Read will fail on the first node and will eventually + // failover to next replica + try { + OzoneInputStream is = bucket.readKey(keyName); + byte[] b = new byte[data.length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, data)); + } catch (OzoneChecksumException e) { + fail("Reading corrupted data should not fail."); + } + corruptData(containerList.get(1), key); + // Try reading the key. Read will fail on the first node and will eventually + // failover to next replica + try { + OzoneInputStream is = bucket.readKey(keyName); + byte[] b = new byte[data.length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, data)); + } catch (OzoneChecksumException e) { + fail("Reading corrupted data should not fail."); + } + corruptData(containerList.get(2), key); + // Try reading the key. Read will fail here as all the replica are corrupt + try { + OzoneInputStream is = bucket.readKey(keyName); + byte[] b = new byte[data.length]; + is.read(b); + fail("Reading corrupted data should fail."); + } catch (OzoneChecksumException e) { + GenericTestUtils.assertExceptionContains("Checksum mismatch", e); + } + } + + private void corruptData(Container container, OzoneKey key) + throws IOException { + long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) + .getContainerID(); + long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) + .getLocalID(); // From the containerData, get the block iterator for all the blocks in // the container. KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); - String containerPath = new File(containerData.getMetadataPath()) - .getParent(); - KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( - containerID, new File(containerPath)); + String containerPath = + new File(containerData.getMetadataPath()).getParent(); + KeyValueBlockIterator keyValueBlockIterator = + new KeyValueBlockIterator(containerID, new File(containerPath)); // Find the block corresponding to the key we put. We use the localID of // the BlockData to identify out key. @@ -926,8 +1024,8 @@ public abstract class TestOzoneRpcClientAbstract { // Get the location of the chunk file String chunkName = blockData.getChunks().get(0).getChunkName(); - String containreBaseDir = container.getContainerData().getVolume() - .getHddsRootDir().getPath(); + String containreBaseDir = + container.getContainerData().getVolume().getHddsRootDir().getPath(); File chunksLocationPath = KeyValueContainerLocationUtil .getChunksLocationPath(containreBaseDir, SCM_ID, containerID); File chunkFile = new File(chunksLocationPath, chunkName); @@ -935,16 +1033,6 @@ public abstract class TestOzoneRpcClientAbstract { // Corrupt the contents of the chunk file String newData = new String("corrupted data"); FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes()); - - // Try reading the key. Since the chunk file is corrupted, it should - // throw a checksum mismatch exception. - try { - OzoneInputStream is = bucket.readKey(keyName); - is.read(new byte[100]); - fail("Reading corrupted data should fail."); - } catch (OzoneChecksumException e) { - GenericTestUtils.assertExceptionContains("Checksum mismatch", e); - } } @Test