HDFS-11822. Block Storage: Fix TestCBlockCLI, failing because of " Address already in use". Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-06-05 10:33:00 -07:00
parent d50c8de46c
commit 7f50a36749
11 changed files with 532 additions and 340 deletions

View File

@ -24,13 +24,17 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
import org.apache.hadoop.cblock.protocol.proto
.CBlockClientServerProtocolProtos;
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.protocolPB
.CBlockClientServerProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.cblock.protocolPB
.CBlockServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager;
@ -42,7 +46,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.utils.LevelDBStore;
@ -58,22 +63,34 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SCM_IPADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SCM_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SCM_PORT_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
/**
* The main entry point of CBlock operations, ALL the CBlock operations
@ -118,10 +135,8 @@ public class CBlockManager implements CBlockServiceProtocol,
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
ProtobufRpcEngine.class);
// start service for client command-to-cblock server service
InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr(
conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY,
DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
InetSocketAddress serviceRpcAddr =
OzoneClientUtils.getCblockServiceRpcAddr(conf);
BlockingService cblockProto =
CBlockServiceProtocolProtos
.CBlockServiceProtocolService
@ -133,14 +148,15 @@ public class CBlockManager implements CBlockServiceProtocol,
DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
InetSocketAddress cblockServiceRpcAddress =
OzoneClientUtils.updateListenAddress(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService);
LOG.info("CBlock manager listening for client commands on: {}",
serviceRpcAddr);
cblockServiceRpcAddress);
// now start service for cblock client-to-cblock server communication
InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr(
conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY,
DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
InetSocketAddress serverRpcAddr =
OzoneClientUtils.getCblockServerRpcAddr(conf);
BlockingService serverProto =
CBlockClientServerProtocolProtos
.CBlockClientServerProtocolService
@ -153,8 +169,11 @@ public class CBlockManager implements CBlockServiceProtocol,
DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
InetSocketAddress cblockServerRpcAddress =
OzoneClientUtils.updateListenAddress(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
LOG.info("CBlock server listening for client commands on: {}",
serverRpcAddr);
cblockServerRpcAddress);
}
public void start() {

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
@ -30,11 +31,6 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_KEY;
/**
* Implementation of client used by CBlock command line tool.
*/
@ -45,12 +41,7 @@ public class CBlockVolumeClient {
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
this.conf = conf;
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
String serverAddress = conf.get(DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY,
DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT);
int serverPort = conf.getInt(DFS_CBLOCK_SERVICERPC_PORT_KEY,
DFS_CBLOCK_SERVICERPC_PORT_DEFAULT);
InetSocketAddress address = new InetSocketAddress(
serverAddress, serverPort);
InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
// currently the largest supported volume is about 8TB, which might take
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(

View File

@ -23,6 +23,7 @@ import com.google.common.net.HostAndPort;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.http.client.config.RequestConfig;
@ -39,6 +40,17 @@ import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_BIND_HOST_DEFAULT;
@ -299,6 +311,44 @@ public final class OzoneClientUtils {
port.or(OZONE_KSM_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by CBlock Service.
* @param conf
* @return Target InetSocketAddress for the CBlock Service endpoint.
*/
public static InetSocketAddress getCblockServiceRpcAddr(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by CBlock Server.
* @param conf
* @return Target InetSocketAddress for the CBlock Server endpoint.
*/
public static InetSocketAddress getCblockServerRpcAddr(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
}
/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
@ -560,6 +610,28 @@ public final class OzoneClientUtils {
.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
public static InetSocketAddress updateListenAddress(
OzoneConfiguration conf, String rpcAddressKey,
InetSocketAddress addr, RPC.Server rpcServer) {
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
listenAddr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}
/**
* Releases a http connection if the request is not null.
* @param request

View File

@ -91,7 +91,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
KeySpaceManagerProtocolPB.class, ksmService,
handlerCount);
ksmRpcAddress = updateListenAddress(conf,
ksmRpcAddress = OzoneClientUtils.updateListenAddress(conf,
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
metadataManager = new MetadataManagerImpl(conf);
volumeManager = new VolumeManagerImpl(metadataManager, conf);
@ -190,27 +190,6 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
String.format("%s not started", description);
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
listenAddr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}
/**
* Start service.
*/

View File

@ -209,7 +209,7 @@ public class StorageContainerManager
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
handlerCount);
datanodeRpcAddress = updateListenAddress(conf,
datanodeRpcAddress = OzoneClientUtils.updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
// SCM Container Service RPC
@ -224,7 +224,7 @@ public class StorageContainerManager
clientRpcServer = startRpcServer(conf, scmAddress,
StorageContainerLocationProtocolPB.class, storageProtoPbService,
handlerCount);
clientRpcAddress = updateListenAddress(conf,
clientRpcAddress = OzoneClientUtils.updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
@ -240,7 +240,7 @@ public class StorageContainerManager
blockRpcServer = startRpcServer(conf, scmBlockAddress,
ScmBlockLocationProtocolPB.class, blockProtoPbService,
handlerCount);
blockRpcAddress = updateListenAddress(conf,
blockRpcAddress = OzoneClientUtils.updateListenAddress(conf,
OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
registerMXBean();
@ -302,27 +302,6 @@ public class StorageContainerManager
}
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
listenAddr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}
/**
* Main entry point for starting StorageContainerManager.
*

View File

@ -55,7 +55,7 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
/**
* Tests for Tests for local cache.
* Tests for Local Cache Buffer Manager.
*/
public class TestBufferManager {
private final static long GB = 1024 * 1024 * 1024;
@ -310,6 +310,8 @@ public class TestBufferManager {
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 120);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);

View File

@ -22,17 +22,24 @@ import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.cblock.util.MockStorageClient;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -53,7 +60,16 @@ public class TestCBlockCLI {
outContent = new ByteArrayOutputStream();
ScmClient storageClient = new MockStorageClient();
conf = new OzoneConfiguration();
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, "/tmp/testCblockCli.dat");
String path = GenericTestUtils
.getTempPath(TestCBlockCLI.class.getSimpleName());
File filePath = new File(path);
if (!filePath.exists() && !filePath.mkdirs()) {
throw new IOException("Unable to create test DB dir");
}
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
"/testCblockCli.dat"));
cBlockManager = new CBlockManager(conf, storageClient);
cBlockManager.start();
testPrintOut = new PrintStream(outContent);

View File

@ -0,0 +1,366 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import com.google.common.primitives.Longs;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_TRACE_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
/**
* Tests for Cblock read write functionality.
*/
public class TestCBlockReadWrite {
private final static long GB = 1024 * 1024 * 1024;
private final static int KB = 1024;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration config;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static XceiverClientManager xceiverClientManager;
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
URL p = config.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
cluster = new MiniOzoneCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
xceiverClientManager = new XceiverClientManager(config);
}
@AfterClass
public static void shutdown() throws InterruptedException {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
/**
* getContainerPipelines creates a set of containers and returns the
* Pipelines that define those containers.
*
* @param count - Number of containers to create.
* @return - List of Pipelines.
* @throws IOException throws Exception
*/
private List<Pipeline> getContainerPipeline(int count) throws IOException {
List<Pipeline> containerPipelines = new LinkedList<>();
for (int x = 0; x < count; x++) {
String traceID = "trace" + RandomStringUtils.randomNumeric(4);
String containerName = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
// This step is needed since we set private data on pipelines, when we
// read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline);
}
return containerPipelines;
}
/**
* This test creates a cache and performs a simple write / read.
* The operations are done by bypassing the cache.
*
* @throws IOException
*/
@Test
public void testDirectIO() throws IOException,
InterruptedException, TimeoutException {
OzoneConfiguration cConfig = new OzoneConfiguration();
cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
final long blockID = 0;
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
String dataHash = DigestUtils.sha256Hex(data);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(cConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Assert.assertFalse(cache.isShortCircuitIOEnabled());
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
Assert.assertEquals(1, metrics.getNumWriteOps());
// Please note that this read is directly from remote container
LogicalBlock block = cache.get(blockID);
Assert.assertEquals(1, metrics.getNumReadOps());
Assert.assertEquals(0, metrics.getNumReadCacheHits());
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
Assert.assertEquals(2, metrics.getNumWriteOps());
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
// Please note that this read is directly from remote container
block = cache.get(blockID + 1);
Assert.assertEquals(2, metrics.getNumReadOps());
Assert.assertEquals(0, metrics.getNumReadCacheHits());
Assert.assertEquals(2, metrics.getNumReadCacheMiss());
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match.", dataHash, readHash);
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
}
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
* that the blocks are read correctly from the container.
*
* @throws IOException
*/
@Test
public void testContainerWrites() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
int numUniqueBlocks = 4;
String[] data = new String[numUniqueBlocks];
String[] dataHash = new String[numUniqueBlocks];
for (int i = 0; i < numUniqueBlocks; i++) {
data[i] = RandomStringUtils.random(4 * KB);
dataHash[i] = DigestUtils.sha256Hex(data[i]);
}
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xcm, metrics);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
}
// Close the cache and flush the data to the containers
cache.close();
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(512, metrics.getNumWriteOps());
Thread.sleep(3000);
flusher.shutdown();
Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
// this read will be from the container, also match the hash
for (int i = 0; i < 512; i++) {
LogicalBlock block = newCache.get(i);
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match, for index:"
+ i, dataHash[i % numUniqueBlocks], readHash);
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
newCache.close();
newFlusher.shutdown();
}
@Test
public void testRetryLog() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
int numblocks = 10;
flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
Pipeline fakePipeline = new Pipeline("fake");
fakePipeline.setData(Longs.toByteArray(1));
fakeContainerPipelines.add(fakePipeline);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(fakeContainerPipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
for (int i = 0; i < numblocks; i++) {
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(numblocks, metrics.getNumWriteOps());
Thread.sleep(3000);
// all the writes to the container will fail because of fake pipelines
Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
Assert.assertTrue(
metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
cache.close();
flusher.shutdown();
// restart cache with correct pipelines, now blocks should be uploaded
// correctly
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread newFlushListenerThread = new Thread(newFlusher);
newFlushListenerThread.setDaemon(true);
newFlushListenerThread.start();
Thread.sleep(3000);
Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
}
}

View File

@ -29,6 +29,10 @@ import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -45,6 +49,8 @@ public class TestCBlockServer {
public static void setup() throws Exception {
ScmClient storageClient = new MockStorageClient();
conf = new OzoneConfiguration();
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
cBlockManager = new CBlockManager(conf, storageClient);
cBlockManager.start();
}

View File

@ -28,7 +28,12 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
import static org.junit.Assert.assertEquals;
/**
@ -63,6 +68,8 @@ public class TestCBlockServerPersistence {
"/testCblockPersistence.dat"));
try {
ScmClient storageClient = new MockStorageClient();
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
cBlockManager = new CBlockManager(conf, storageClient);
cBlockManager.start();
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
@ -84,6 +91,8 @@ public class TestCBlockServerPersistence {
OzoneConfiguration conf1 = new OzoneConfiguration();
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
"/testCblockPersistence.dat"));
conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
cBlockManager1 = new CBlockManager(conf1, storageClient1);
cBlockManager1.start();
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();

View File

@ -62,13 +62,9 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_TRACE_IO;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
import static org.apache.hadoop.cblock.CBlockConfigKeys.
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
/**
* Tests for Tests for local cache.
* Tests for local cache.
*/
public class TestLocalBlockCache {
private static final Logger LOG =
@ -444,247 +440,4 @@ public class TestLocalBlockCache {
100, 20 * 1000);
ozoneStore.close();
}
/**
* This test creates a cache and performs a simple write / read.
* The operations are done by bypassing the cache.
*
* @throws IOException
*/
@Test
public void testDirectIO() throws IOException,
InterruptedException, TimeoutException {
OzoneConfiguration cConfig = new OzoneConfiguration();
cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
final long blockID = 0;
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
String dataHash = DigestUtils.sha256Hex(data);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(cConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Assert.assertFalse(cache.isShortCircuitIOEnabled());
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
Assert.assertEquals(1, metrics.getNumWriteOps());
// Please note that this read is directly from remote container
LogicalBlock block = cache.get(blockID);
Assert.assertEquals(1, metrics.getNumReadOps());
Assert.assertEquals(0, metrics.getNumReadCacheHits());
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
Assert.assertEquals(2, metrics.getNumWriteOps());
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
// Please note that this read is directly from remote container
block = cache.get(blockID + 1);
Assert.assertEquals(2, metrics.getNumReadOps());
Assert.assertEquals(0, metrics.getNumReadCacheHits());
Assert.assertEquals(2, metrics.getNumReadCacheMiss());
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match.", dataHash, readHash);
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
}
/**
* This test writes some block to the cache and then shuts down the cache
* The cache is then restarted with "short.circuit.io" disable to check
* that the blocks are read correctly from the container.
*
* @throws IOException
*/
@Test
public void testContainerWrites() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
int numUniqueBlocks = 4;
String[] data = new String[numUniqueBlocks];
String[] dataHash = new String[numUniqueBlocks];
for (int i = 0; i < numUniqueBlocks; i++) {
data[i] = RandomStringUtils.random(4 * KB);
dataHash[i] = DigestUtils.sha256Hex(data[i]);
}
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xcm, metrics);
List<Pipeline> pipelines = getContainerPipeline(10);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
Assert.assertTrue(cache.isShortCircuitIOEnabled());
// Write data to the cache
for (int i = 0; i < 512; i++) {
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
}
// Close the cache and flush the data to the containers
cache.close();
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(512, metrics.getNumWriteOps());
Thread.sleep(5000);
flusher.shutdown();
Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(pipelines)
.setClientManager(xcm)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
// this read will be from the container, also match the hash
for (int i = 0; i < 512; i++) {
LogicalBlock block = newCache.get(i);
String readHash = DigestUtils.sha256Hex(block.getData().array());
Assert.assertEquals("File content does not match, for index:"
+ i, dataHash[i % numUniqueBlocks], readHash);
}
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
newCache.close();
newFlusher.shutdown();
}
@Test
public void testRetryLog() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
int numblocks = 10;
flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
Pipeline fakePipeline = new Pipeline("fake");
fakePipeline.setData(Longs.toByteArray(1));
fakeContainerPipelines.add(fakePipeline);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(fakeContainerPipelines)
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Thread flushListenerThread = new Thread(flusher);
flushListenerThread.setDaemon(true);
flushListenerThread.start();
for (int i = 0; i < numblocks; i++) {
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
}
Assert.assertEquals(numblocks, metrics.getNumWriteOps());
Thread.sleep(3000);
// all the writes to the container will fail because of fake pipelines
Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
Assert.assertTrue(
metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
cache.close();
flusher.shutdown();
// restart cache with correct pipelines, now blocks should be uploaded
// correctly
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(newFlusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
Thread newFlushListenerThread = new Thread(newFlusher);
newFlushListenerThread.setDaemon(true);
newFlushListenerThread.start();
Thread.sleep(3000);
Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
}
}