diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 2f118727ccf..9526be38ccc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; @@ -40,6 +41,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.UUID; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -50,9 +54,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private final Pipeline pipeline; private final Configuration config; - private XceiverClientProtocolServiceStub asyncStub; + private Map asyncStubs; private XceiverClientMetrics metrics; - private ManagedChannel channel; + private Map channels; private final Semaphore semaphore; private boolean closed = false; @@ -72,46 +76,62 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.semaphore = new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); this.metrics = XceiverClientManager.getXceiverClientMetrics(); + this.channels = new HashMap<>(); + this.asyncStubs = new HashMap<>(); } @Override public void connect() throws Exception { - DatanodeDetails leader = this.pipeline.getLeader(); + // leader by default is the 1st datanode in the datanode list of pipleline + DatanodeDetails leader = this.pipeline.getLeader(); + // just make a connection to the 1st datanode at the beginning + connectToDatanode(leader); + } + + private void connectToDatanode(DatanodeDetails dn) { // read port from the data node, on failure use default configured // port. - int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); + int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); if (port == 0) { port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); } - LOG.debug("Connecting to server Port : " + leader.getIpAddress()); - channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) - .usePlaintext() - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .build(); - asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); + LOG.debug("Connecting to server Port : " + dn.getIpAddress()); + ManagedChannel channel = + NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + XceiverClientProtocolServiceStub asyncStub = + XceiverClientProtocolServiceGrpc.newStub(channel); + asyncStubs.put(dn.getUuid(), asyncStub); + channels.put(dn.getUuid(), channel); } - /** - * Returns if the xceiver client connects to a server. + * Returns if the xceiver client connects to all servers in the pipeline. * * @return True if the connection is alive, false otherwise. */ @VisibleForTesting - public boolean isConnected() { - return !channel.isTerminated() && !channel.isShutdown(); + public boolean isConnected(DatanodeDetails details) { + return isConnected(channels.get(details.getUuid())); + } + + private boolean isConnected(ManagedChannel channel) { + return channel != null && !channel.isTerminated() && !channel.isShutdown(); } @Override public void close() { closed = true; - channel.shutdownNow(); - try { - channel.awaitTermination(60, TimeUnit.MINUTES); - } catch (Exception e) { - LOG.error("Unexpected exception while waiting for channel termination", - e); + for (ManagedChannel channel : channels.values()) { + channel.shutdownNow(); + try { + channel.awaitTermination(60, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting for channel termination", + e); + } } } @@ -120,6 +140,56 @@ public class XceiverClientGrpc extends XceiverClientSpi { return pipeline; } + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + return sendCommandWithRetry(request); + } + + public ContainerCommandResponseProto sendCommandWithRetry( + ContainerCommandRequestProto request) throws IOException { + int size = pipeline.getMachines().size(); + ContainerCommandResponseProto responseProto = null; + DatanodeDetails dn = null; + + // In case of an exception or an error, we will try to read from the + // datanodes in the pipeline in a round robin fashion. + + // TODO: cache the correct leader info in here, so that any subsequent calls + // should first go to leader + for (int dnIndex = 0; dnIndex < size; dnIndex++) { + try { + dn = pipeline.getMachines().get(dnIndex); + LOG.debug("Executing command " + request + " on datanode " + dn); + // In case the command gets retried on a 2nd datanode, + // sendCommandAsyncCall will create a new channel and async stub + // in case these don't exist for the specific datanode. + responseProto = sendCommandAsync(request, dn).get(); + if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) { + break; + } + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Failed to execute command " + request + " on datanode " + dn + .getUuidString(), e); + } + } + + if (responseProto != null) { + return responseProto; + } else { + throw new IOException( + "Failed to execute command " + request + " on the pipeline " + + pipeline.getId()); + } + } + + // TODO: for a true async API, once the waitable future while executing + // the command on one channel fails, it should be retried asynchronously + // on the future Task for all the remaining datanodes. + + // Note: this Async api is not used currently used in any active I/O path. + // In case it gets used, the asynchronous retry logic needs to be plugged + // in here. /** * Sends a given command to server gets a waitable future back. * @@ -128,15 +198,25 @@ public class XceiverClientGrpc extends XceiverClientSpi { * @throws IOException */ @Override - public CompletableFuture - sendCommandAsync(ContainerCommandRequestProto request) + public CompletableFuture sendCommandAsync( + ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - if(closed){ + return sendCommandAsync(request, pipeline.getLeader()); + } + + private CompletableFuture sendCommandAsync( + ContainerCommandRequestProto request, DatanodeDetails dn) + throws IOException, ExecutionException, InterruptedException { + if (closed) { throw new IOException("This channel is not connected."); } - if(channel == null || !isConnected()) { - reconnect(); + UUID dnId = dn.getUuid(); + ManagedChannel channel = channels.get(dnId); + // If the channel doesn't exist for this specific datanode or the channel + // is closed, just reconnect + if (!isConnected(channel)) { + reconnect(dn); } final CompletableFuture replyFuture = @@ -145,48 +225,54 @@ public class XceiverClientGrpc extends XceiverClientSpi { long requestTime = Time.monotonicNowNanos(); metrics.incrPendingContainerOpsMetrics(request.getCmdType()); // create a new grpc stream for each non-async call. - final StreamObserver requestObserver = - asyncStub.send(new StreamObserver() { - @Override - public void onNext(ContainerCommandResponseProto value) { - replyFuture.complete(value); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - @Override - public void onError(Throwable t) { - replyFuture.completeExceptionally(t); - metrics.decrPendingContainerOpsMetrics(request.getCmdType()); - metrics.addContainerOpsLatency(request.getCmdType(), - Time.monotonicNowNanos() - requestTime); - semaphore.release(); - } - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally( - new IOException("Stream completed but no reply for request " - + request)); - } - } - }); + // TODO: for async calls, we should reuse StreamObserver resources. + final StreamObserver requestObserver = + asyncStubs.get(dnId) + .send(new StreamObserver() { + @Override + public void onNext(ContainerCommandResponseProto value) { + replyFuture.complete(value); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onError(Throwable t) { + replyFuture.completeExceptionally(t); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally(new IOException( + "Stream completed but no reply for request " + request)); + } + } + }); requestObserver.onNext(request); requestObserver.onCompleted(); return replyFuture; } - private void reconnect() throws IOException { + private void reconnect(DatanodeDetails dn) + throws IOException { + ManagedChannel channel; try { - connect(); + connectToDatanode(dn); + channel = channels.get(dn.getUuid()); } catch (Exception e) { LOG.error("Error while connecting: ", e); throw new IOException(e); } - if (channel == null || !isConnected()) { + if (channel == null || !isConnected(channel)) { throw new IOException("This channel is not connected."); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index d542abc9b2d..83b5a4c0e42 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,7 +27,6 @@ import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import java.io.Closeable; import java.io.IOException; @@ -59,7 +58,7 @@ public class XceiverClientManager implements Closeable { //TODO : change this to SCM configuration class private final Configuration conf; - private final Cache clientCache; + private final Cache clientCache; private final boolean useRatis; private static XceiverClientMetrics metrics; @@ -83,10 +82,10 @@ public class XceiverClientManager implements Closeable { .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .maximumSize(maxSize) .removalListener( - new RemovalListener() { + new RemovalListener() { @Override public void onRemoval( - RemovalNotification + RemovalNotification removalNotification) { synchronized (clientCache) { // Mark the entry as evicted @@ -98,7 +97,7 @@ public class XceiverClientManager implements Closeable { } @VisibleForTesting - public Cache getClientCache() { + public Cache getClientCache() { return clientCache; } @@ -140,13 +139,14 @@ public class XceiverClientManager implements Closeable { private XceiverClientSpi getClient(Pipeline pipeline) throws IOException { + HddsProtos.ReplicationType type = pipeline.getType(); try { - return clientCache.get(pipeline.getId(), + return clientCache.get(pipeline.getId().getId().toString() + type, new Callable() { @Override public XceiverClientSpi call() throws Exception { XceiverClientSpi client = null; - switch (pipeline.getType()) { + switch (type) { case RATIS: client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); break; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 45e9d6eda33..d2eb68b1660 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -209,6 +211,10 @@ public final class XceiverClientRatis extends XceiverClientSpi { getClient().sendAsync(() -> byteString); } + public void watchForCommit(long index, long timeout) throws Exception { + getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED) + .get(timeout, TimeUnit.MILLISECONDS); + } /** * Sends a given command to server gets a waitable future back. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index c36ca1f934f..b0817f70eaa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -299,6 +299,10 @@ public class Pipeline { return b.toString(); } + public void setType(HddsProtos.ReplicationType type) { + this.type = type; + } + /** * Returns a JSON string of this object. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 125784c46c9..6cff6651379 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -276,8 +278,13 @@ public class ChunkGroupInputStream extends InputStream implements Seekable { long containerID = blockID.getContainerID(); ContainerWithPipeline containerWithPipeline = storageContainerLocationClient.getContainerWithPipeline(containerID); + Pipeline pipeline = containerWithPipeline.getPipeline(); + + // irrespective of the container state, we will always read via Standalone + // protocol. + pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE); XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(containerWithPipeline.getPipeline()); + .acquireClient(pipeline); boolean success = false; containerKey = omKeyLocationInfo.getLocalID(); try { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 9f46b2d009d..6d13bb2b96c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -116,6 +116,10 @@ public class ChunkGroupOutputStream extends OutputStream { public List getStreamEntries() { return streamEntries; } + @VisibleForTesting + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } public List getLocationInfoList() throws IOException { List locationInfoList = new ArrayList<>(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 302ea465f05..bf6a18947a4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -102,7 +102,7 @@ public class TestMiniOzoneCluster { // Verify client is able to connect to the container try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ client.connect(); - assertTrue(client.isConnected()); + assertTrue(client.isConnected(pipeline.getLeader())); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 881c827432d..d507303ac2b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -25,6 +25,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; + import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.*; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -32,6 +37,7 @@ import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -597,6 +603,108 @@ public class TestOzoneRpcClient { } } + @Test + public void testPutKeyAndGetKeyThreeNodes() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String keyName = UUID.randomUUID().toString(); + + OzoneOutputStream out = bucket + .createKey(keyName, value.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.THREE); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) out.getOutputStream(); + XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); + out.write(value.getBytes()); + out.close(); + // First, confirm the key info from the client matches the info in OM. + OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); + builder.setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName); + OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()). + getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0); + long containerID = keyInfo.getContainerID(); + long localID = keyInfo.getLocalID(); + OzoneKeyDetails keyDetails = bucket.getKey(keyName); + Assert.assertEquals(keyName, keyDetails.getName()); + + List keyLocations = keyDetails.getOzoneKeyLocations(); + Assert.assertEquals(1, keyLocations.size()); + Assert.assertEquals(containerID, keyLocations.get(0).getContainerID()); + Assert.assertEquals(localID, keyLocations.get(0).getLocalID()); + + // Make sure that the data size matched. + Assert + .assertEquals(value.getBytes().length, keyLocations.get(0).getLength()); + + ContainerWithPipeline container = + cluster.getStorageContainerManager().getContainerManager() + .getContainerWithPipeline(new ContainerID(containerID)); + Pipeline pipeline = container.getPipeline(); + List datanodes = pipeline.getMachines(); + + DatanodeDetails datanodeDetails = datanodes.get(0); + Assert.assertNotNull(datanodeDetails); + + XceiverClientSpi clientSpi = manager.acquireClient(pipeline); + Assert.assertTrue(clientSpi instanceof XceiverClientRatis); + XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; + + ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000); + // shutdown the datanode + cluster.shutdownHddsDatanode(datanodeDetails); + + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + // try to read, this shouls be successful + readKey(bucket, keyName, value); + + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + // shutdown the second datanode + datanodeDetails = datanodes.get(1); + cluster.shutdownHddsDatanode(datanodeDetails); + Assert.assertTrue(container.getContainerInfo().getState() + == HddsProtos.LifeCycleState.OPEN); + + // the container is open and with loss of 2 nodes we still should be able + // to read via Standalone protocol + // try to read + readKey(bucket, keyName, value); + + // shutdown the 3rd datanode + datanodeDetails = datanodes.get(2); + cluster.shutdownHddsDatanode(datanodeDetails); + try { + // try to read + readKey(bucket, keyName, value); + Assert.fail("Expected exception not thrown"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Failed to execute command")); + Assert.assertTrue( + e.getMessage().contains("on the pipeline " + pipeline.getId())); + } + manager.releaseClient(clientSpi); + } + + private void readKey(OzoneBucket bucket, String keyName, String data) + throws IOException { + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + OzoneInputStream is = bucket.readKey(keyName); + byte[] fileContent = new byte[data.getBytes().length]; + is.read(fileContent); + is.close(); + } + @Test public void testGetKeyDetails() throws IOException, OzoneException { String volumeName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index da445bfa9de..8b35bbbb181 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.scm; import com.google.common.cache.Cache; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -107,7 +106,7 @@ public class TestXceiverClientManager { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -130,8 +129,9 @@ public class TestXceiverClientManager { Assert.assertNotEquals(client1, client2); // least recent container (i.e containerName1) is evicted - XceiverClientSpi nonExistent1 = cache - .getIfPresent(container1.getContainerInfo().getPipelineID()); + XceiverClientSpi nonExistent1 = cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType()); Assert.assertEquals(null, nonExistent1); // However container call should succeed because of refcount on the client. String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); @@ -160,7 +160,7 @@ public class TestXceiverClientManager { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); XceiverClientManager clientManager = new XceiverClientManager(conf); - Cache cache = + Cache cache = clientManager.getClientCache(); ContainerWithPipeline container1 = @@ -183,8 +183,9 @@ public class TestXceiverClientManager { Assert.assertNotEquals(client1, client2); // now client 1 should be evicted - XceiverClientSpi nonExistent = cache - .getIfPresent(container1.getContainerInfo().getPipelineID()); + XceiverClientSpi nonExistent = cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType()); Assert.assertEquals(null, nonExistent); // Any container operation should now fail