diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java index 30563c3434c..6d5d4411aaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java @@ -24,13 +24,17 @@ import org.apache.hadoop.cblock.meta.VolumeInfo; import org.apache.hadoop.cblock.proto.CBlockClientProtocol; import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos; import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; -import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB; +import org.apache.hadoop.cblock.protocolPB + .CBlockClientServerProtocolServerSideTranslatorPB; import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.cblock.protocolPB + .CBlockServiceProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.scm.XceiverClientManager; @@ -42,7 +46,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.utils.LevelDBStore; @@ -58,22 +63,34 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_IPADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_IPADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_PORT_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; /** * The main entry point of CBlock operations, ALL the CBlock operations @@ -118,10 +135,8 @@ public class CBlockManager implements CBlockServiceProtocol, RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class, ProtobufRpcEngine.class); // start service for client command-to-cblock server service - InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr( - conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, - DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + InetSocketAddress serviceRpcAddr = + OzoneClientUtils.getCblockServiceRpcAddr(conf); BlockingService cblockProto = CBlockServiceProtocolProtos .CBlockServiceProtocolService @@ -133,14 +148,15 @@ public class CBlockManager implements CBlockServiceProtocol, DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY, DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); - + InetSocketAddress cblockServiceRpcAddress = + OzoneClientUtils.updateListenAddress(conf, + DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService); LOG.info("CBlock manager listening for client commands on: {}", - serviceRpcAddr); + cblockServiceRpcAddress); // now start service for cblock client-to-cblock server communication - InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr( - conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, - DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + + InetSocketAddress serverRpcAddr = + OzoneClientUtils.getCblockServerRpcAddr(conf); BlockingService serverProto = CBlockClientServerProtocolProtos .CBlockClientServerProtocolService @@ -153,8 +169,11 @@ public class CBlockManager implements CBlockServiceProtocol, DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY, DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); + InetSocketAddress cblockServerRpcAddress = + OzoneClientUtils.updateListenAddress(conf, + DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer); LOG.info("CBlock server listening for client commands on: {}", - serverRpcAddr); + cblockServerRpcAddress); } public void start() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java index c783c67e609..f70e8a4af12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java @@ -22,6 +22,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.security.UserGroupInformation; @@ -30,11 +31,6 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_KEY; - /** * Implementation of client used by CBlock command line tool. */ @@ -45,12 +41,7 @@ public class CBlockVolumeClient { public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { this.conf = conf; long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); - String serverAddress = conf.get(DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY, - DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT); - int serverPort = conf.getInt(DFS_CBLOCK_SERVICERPC_PORT_KEY, - DFS_CBLOCK_SERVICERPC_PORT_DEFAULT); - InetSocketAddress address = new InetSocketAddress( - serverAddress, serverPort); + InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf); // currently the largest supported volume is about 8TB, which might take // > 20 seconds to finish creating containers. thus set timeout to 30 sec. cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index db3c14f98e1..f38dd7ed0c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -23,6 +23,7 @@ import com.google.common.net.HostAndPort; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.http.client.config.RequestConfig; @@ -39,6 +40,17 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSI_PORT_DEFAULT; + import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY; import static org.apache.hadoop.ozone.ksm.KSMConfigKeys .OZONE_KSM_BIND_HOST_DEFAULT; @@ -299,6 +311,44 @@ public final class OzoneClientUtils { port.or(OZONE_KSM_PORT_DEFAULT)); } + /** + * Retrieve the socket address that is used by CBlock Service. + * @param conf + * @return Target InetSocketAddress for the CBlock Service endpoint. + */ + public static InetSocketAddress getCblockServiceRpcAddr( + Configuration conf) { + final Optional host = getHostNameFromConfigKeys(conf, + DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional port = getPortNumberFromConfigKeys(conf, + DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + + port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT)); + } + + /** + * Retrieve the socket address that is used by CBlock Server. + * @param conf + * @return Target InetSocketAddress for the CBlock Server endpoint. + */ + public static InetSocketAddress getCblockServerRpcAddr( + Configuration conf) { + final Optional host = getHostNameFromConfigKeys(conf, + DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional port = getPortNumberFromConfigKeys(conf, + DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + + port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT)); + } + /** * Retrieve the hostname, trying the supplied config keys in order. * Each config value may be absent, or if present in the format @@ -560,6 +610,28 @@ public final class OzoneClientUtils { .DFS_CONTAINER_IPC_PORT_DEFAULT); } + /** + * After starting an RPC server, updates configuration with the actual + * listening address of that server. The listening address may be different + * from the configured address if, for example, the configured address uses + * port 0 to request use of an ephemeral port. + * + * @param conf configuration to update + * @param rpcAddressKey configuration key for RPC server address + * @param addr configured address + * @param rpcServer started RPC server. + */ + public static InetSocketAddress updateListenAddress( + OzoneConfiguration conf, String rpcAddressKey, + InetSocketAddress addr, RPC.Server rpcServer) { + InetSocketAddress listenAddr = rpcServer.getListenerAddress(); + InetSocketAddress updatedAddr = new InetSocketAddress( + addr.getHostString(), listenAddr.getPort()); + conf.set(rpcAddressKey, + listenAddr.getHostString() + ":" + listenAddr.getPort()); + return updatedAddr; + } + /** * Releases a http connection if the request is not null. * @param request diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java index 3cf1fb39b4f..f4da2bb0bfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java @@ -91,7 +91,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol { ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr, KeySpaceManagerProtocolPB.class, ksmService, handlerCount); - ksmRpcAddress = updateListenAddress(conf, + ksmRpcAddress = OzoneClientUtils.updateListenAddress(conf, OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer); metadataManager = new MetadataManagerImpl(conf); volumeManager = new VolumeManagerImpl(metadataManager, conf); @@ -190,27 +190,6 @@ public class KeySpaceManager implements KeySpaceManagerProtocol { String.format("%s not started", description); } - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { - InetSocketAddress listenAddr = rpcServer.getListenerAddress(); - InetSocketAddress updatedAddr = new InetSocketAddress( - addr.getHostString(), listenAddr.getPort()); - conf.set(rpcAddressKey, - listenAddr.getHostString() + ":" + listenAddr.getPort()); - return updatedAddr; - } - /** * Start service. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index f54a6a06b77..3f0f6d30647 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -209,7 +209,7 @@ public class StorageContainerManager datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr, StorageContainerDatanodeProtocolPB.class, dnProtoPbService, handlerCount); - datanodeRpcAddress = updateListenAddress(conf, + datanodeRpcAddress = OzoneClientUtils.updateListenAddress(conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); // SCM Container Service RPC @@ -224,7 +224,7 @@ public class StorageContainerManager clientRpcServer = startRpcServer(conf, scmAddress, StorageContainerLocationProtocolPB.class, storageProtoPbService, handlerCount); - clientRpcAddress = updateListenAddress(conf, + clientRpcAddress = OzoneClientUtils.updateListenAddress(conf, OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer); @@ -240,7 +240,7 @@ public class StorageContainerManager blockRpcServer = startRpcServer(conf, scmBlockAddress, ScmBlockLocationProtocolPB.class, blockProtoPbService, handlerCount); - blockRpcAddress = updateListenAddress(conf, + blockRpcAddress = OzoneClientUtils.updateListenAddress(conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer); registerMXBean(); @@ -302,27 +302,6 @@ public class StorageContainerManager } } - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { - InetSocketAddress listenAddr = rpcServer.getListenerAddress(); - InetSocketAddress updatedAddr = new InetSocketAddress( - addr.getHostString(), listenAddr.getPort()); - conf.set(rpcAddressKey, - listenAddr.getHostString() + ":" + listenAddr.getPort()); - return updatedAddr; - } - /** * Main entry point for starting StorageContainerManager. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java index 90fb5b37c12..ec188ddd63a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java @@ -55,7 +55,7 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; /** - * Tests for Tests for local cache. + * Tests for Local Cache Buffer Manager. */ public class TestBufferManager { private final static long GB = 1024 * 1024 * 1024; @@ -310,6 +310,8 @@ public class TestBufferManager { flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 120); String volumeName = "volume" + RandomStringUtils.randomNumeric(4); String userName = "user" + RandomStringUtils.randomNumeric(4); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java index 46e1f6462ef..ebf00af563b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java @@ -22,17 +22,24 @@ import org.apache.hadoop.cblock.meta.VolumeDescriptor; import org.apache.hadoop.cblock.util.MockStorageClient; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.util.List; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -53,7 +60,16 @@ public class TestCBlockCLI { outContent = new ByteArrayOutputStream(); ScmClient storageClient = new MockStorageClient(); conf = new OzoneConfiguration(); - conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, "/tmp/testCblockCli.dat"); + String path = GenericTestUtils + .getTempPath(TestCBlockCLI.class.getSimpleName()); + File filePath = new File(path); + if (!filePath.exists() && !filePath.mkdirs()) { + throw new IOException("Unable to create test DB dir"); + } + conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( + "/testCblockCli.dat")); cBlockManager = new CBlockManager(conf, storageClient); cBlockManager.start(); testPrintOut = new PrintStream(outContent); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java new file mode 100644 index 00000000000..94fe4cb2cba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.primitives.Longs; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; + +/** + * Tests for Cblock read write functionality. + */ +public class TestCBlockReadWrite { + private final static long GB = 1024 * 1024 * 1024; + private final static int KB = 1024; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration config; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + URL p = config.getClass().getResource(""); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName()); + config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + config.setBoolean(DFS_CBLOCK_TRACE_IO, true); + config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + cluster = new MiniOzoneCluster.Builder(config) + .numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(config); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanup(null, storageContainerLocationClient, cluster); + } + + /** + * getContainerPipelines creates a set of containers and returns the + * Pipelines that define those containers. + * + * @param count - Number of containers to create. + * @return - List of Pipelines. + * @throws IOException throws Exception + */ + private List getContainerPipeline(int count) throws IOException { + List containerPipelines = new LinkedList<>(); + for (int x = 0; x < count; x++) { + String traceID = "trace" + RandomStringUtils.randomNumeric(4); + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = + storageContainerLocationClient.allocateContainer(containerName); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); + ContainerProtocolCalls.createContainer(client, traceID); + // This step is needed since we set private data on pipelines, when we + // read the list from CBlockServer. So we mimic that action here. + pipeline.setData(Longs.toByteArray(x)); + containerPipelines.add(pipeline); + } + return containerPipelines; + } + + /** + * This test creates a cache and performs a simple write / read. + * The operations are done by bypassing the cache. + * + * @throws IOException + */ + @Test + public void testDirectIO() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration cConfig = new OzoneConfiguration(); + cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + final long blockID = 0; + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + String dataHash = DigestUtils.sha256Hex(data); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(cConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Assert.assertFalse(cache.isShortCircuitIOEnabled()); + cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + // Please note that this read is directly from remote container + LogicalBlock block = cache.get(blockID); + Assert.assertEquals(1, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(1, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + + cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + // Please note that this read is directly from remote container + block = cache.get(blockID + 1); + Assert.assertEquals(2, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(2, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match.", dataHash, readHash); + GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); + cache.close(); + } + + /** + * This test writes some block to the cache and then shuts down the cache + * The cache is then restarted with "short.circuit.io" disable to check + * that the blocks are read correctly from the container. + * + * @throws IOException + */ + @Test + public void testContainerWrites() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); + XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + + int numUniqueBlocks = 4; + String[] data = new String[numUniqueBlocks]; + String[] dataHash = new String[numUniqueBlocks]; + for (int i = 0; i < numUniqueBlocks; i++) { + data[i] = RandomStringUtils.random(4 * KB); + dataHash[i] = DigestUtils.sha256Hex(data[i]); + } + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xcm, metrics); + List pipelines = getContainerPipeline(10); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + Assert.assertTrue(cache.isShortCircuitIOEnabled()); + // Write data to the cache + for (int i = 0; i < 512; i++) { + cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); + } + // Close the cache and flush the data to the containers + cache.close(); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(512, metrics.getNumWriteOps()); + Thread.sleep(3000); + flusher.shutdown(); + Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); + // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Assert.assertFalse(newCache.isShortCircuitIOEnabled()); + // this read will be from the container, also match the hash + for (int i = 0; i < 512; i++) { + LogicalBlock block = newCache.get(i); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match, for index:" + + i, dataHash[i % numUniqueBlocks], readHash); + } + Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); + newCache.close(); + newFlusher.shutdown(); + } + + @Test + public void testRetryLog() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); + + int numblocks = 10; + flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + + List fakeContainerPipelines = new LinkedList<>(); + Pipeline fakePipeline = new Pipeline("fake"); + fakePipeline.setData(Longs.toByteArray(1)); + fakeContainerPipelines.add(fakePipeline); + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(fakeContainerPipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + + for (int i = 0; i < numblocks; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + Assert.assertEquals(numblocks, metrics.getNumWriteOps()); + Thread.sleep(3000); + + // all the writes to the container will fail because of fake pipelines + Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); + Assert.assertTrue( + metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); + cache.close(); + flusher.shutdown(); + + // restart cache with correct pipelines, now blocks should be uploaded + // correctly + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread newFlushListenerThread = new Thread(newFlusher); + newFlushListenerThread.setDaemon(true); + newFlushListenerThread.start(); + Thread.sleep(3000); + Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); + Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java index 1efef3e805b..673330480f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java @@ -29,6 +29,10 @@ import org.junit.Test; import java.util.HashSet; import java.util.List; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -45,6 +49,8 @@ public class TestCBlockServer { public static void setup() throws Exception { ScmClient storageClient = new MockStorageClient(); conf = new OzoneConfiguration(); + conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); cBlockManager = new CBlockManager(conf, storageClient); cBlockManager.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java index f4f927f0f4e..7bcaae4e608 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java @@ -28,7 +28,12 @@ import java.io.File; import java.io.IOException; import java.util.List; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; import static org.junit.Assert.assertEquals; /** @@ -63,6 +68,8 @@ public class TestCBlockServerPersistence { "/testCblockPersistence.dat")); try { ScmClient storageClient = new MockStorageClient(); + conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); cBlockManager = new CBlockManager(conf, storageClient); cBlockManager.start(); cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize); @@ -84,6 +91,8 @@ public class TestCBlockServerPersistence { OzoneConfiguration conf1 = new OzoneConfiguration(); conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( "/testCblockPersistence.dat")); + conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); + conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); cBlockManager1 = new CBlockManager(conf1, storageClient1); cBlockManager1.start(); List allVolumes1 = cBlockManager1.getAllVolumes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index b03fb22ddc5..c54a2140941 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -62,13 +62,9 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; /** - * Tests for Tests for local cache. + * Tests for local cache. */ public class TestLocalBlockCache { private static final Logger LOG = @@ -444,247 +440,4 @@ public class TestLocalBlockCache { 100, 20 * 1000); ozoneStore.close(); } - - /** - * This test creates a cache and performs a simple write / read. - * The operations are done by bypassing the cache. - * - * @throws IOException - */ - @Test - public void testDirectIO() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration cConfig = new OzoneConfiguration(); - cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - String dataHash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(cConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Assert.assertFalse(cache.isShortCircuitIOEnabled()); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - // Please note that this read is directly from remote container - LogicalBlock block = cache.get(blockID); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(1, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - - cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - // Please note that this read is directly from remote container - block = cache.get(blockID + 1); - Assert.assertEquals(2, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(2, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match.", dataHash, readHash); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - } - - /** - * This test writes some block to the cache and then shuts down the cache - * The cache is then restarted with "short.circuit.io" disable to check - * that the blocks are read correctly from the container. - * - * @throws IOException - */ - @Test - public void testContainerWrites() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - URL p = flushTestConfig.getClass().getResource(""); - String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); - XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - - int numUniqueBlocks = 4; - String[] data = new String[numUniqueBlocks]; - String[] dataHash = new String[numUniqueBlocks]; - for (int i = 0; i < numUniqueBlocks; i++) { - data[i] = RandomStringUtils.random(4 * KB); - dataHash[i] = DigestUtils.sha256Hex(data[i]); - } - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xcm, metrics); - List pipelines = getContainerPipeline(10); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - Assert.assertTrue(cache.isShortCircuitIOEnabled()); - // Write data to the cache - for (int i = 0; i < 512; i++) { - cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); - } - // Close the cache and flush the data to the containers - cache.close(); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(512, metrics.getNumWriteOps()); - Thread.sleep(5000); - flusher.shutdown(); - Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Assert.assertFalse(newCache.isShortCircuitIOEnabled()); - // this read will be from the container, also match the hash - for (int i = 0; i < 512; i++) { - LogicalBlock block = newCache.get(i); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match, for index:" - + i, dataHash[i % numUniqueBlocks], readHash); - } - Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); - newCache.close(); - newFlusher.shutdown(); - } - - @Test - public void testRetryLog() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - URL p = flushTestConfig.getClass().getResource(""); - String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3); - - int numblocks = 10; - flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - - List fakeContainerPipelines = new LinkedList<>(); - Pipeline fakePipeline = new Pipeline("fake"); - fakePipeline.setData(Longs.toByteArray(1)); - fakeContainerPipelines.add(fakePipeline); - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(fakeContainerPipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - - for (int i = 0; i < numblocks; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(numblocks, metrics.getNumWriteOps()); - Thread.sleep(3000); - - // all the writes to the container will fail because of fake pipelines - Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); - Assert.assertTrue( - metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - cache.close(); - flusher.shutdown(); - - // restart cache with correct pipelines, now blocks should be uploaded - // correctly - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread newFlushListenerThread = new Thread(newFlusher); - newFlushListenerThread.setDaemon(true); - newFlushListenerThread.start(); - Thread.sleep(3000); - Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); - Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); - } }