HDFS-11822. Block Storage: Fix TestCBlockCLI, failing because of " Address already in use". Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
bf8c168a80
commit
d67542c115
|
@ -24,13 +24,17 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
|
||||||
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
|
||||||
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
|
import org.apache.hadoop.cblock.proto.CBlockServiceProtocol;
|
||||||
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto
|
||||||
|
.CBlockClientServerProtocolProtos;
|
||||||
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.cblock.protocolPB
|
||||||
|
.CBlockClientServerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
||||||
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.cblock.protocolPB
|
||||||
|
.CBlockServiceProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.scm.XceiverClientManager;
|
import org.apache.hadoop.scm.XceiverClientManager;
|
||||||
|
@ -42,7 +46,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.scm.protocolPB
|
||||||
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
|
@ -58,22 +63,34 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
|
.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_IPADDRESS_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_DEFAULT;
|
.DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SCM_PORT_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT;
|
.DFS_CBLOCK_SCM_IPADDRESS_DEFAULT;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
|
.DFS_CBLOCK_SCM_IPADDRESS_KEY;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
|
.DFS_CBLOCK_SCM_PORT_DEFAULT;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
.DFS_CBLOCK_SCM_PORT_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main entry point of CBlock operations, ALL the CBlock operations
|
* The main entry point of CBlock operations, ALL the CBlock operations
|
||||||
|
@ -118,10 +135,8 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
|
RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
// start service for client command-to-cblock server service
|
// start service for client command-to-cblock server service
|
||||||
InetSocketAddress serviceRpcAddr = NetUtils.createSocketAddr(
|
InetSocketAddress serviceRpcAddr =
|
||||||
conf.getTrimmed(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY,
|
OzoneClientUtils.getCblockServiceRpcAddr(conf);
|
||||||
DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT), -1,
|
|
||||||
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
|
|
||||||
BlockingService cblockProto =
|
BlockingService cblockProto =
|
||||||
CBlockServiceProtocolProtos
|
CBlockServiceProtocolProtos
|
||||||
.CBlockServiceProtocolService
|
.CBlockServiceProtocolService
|
||||||
|
@ -133,14 +148,15 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
|
DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY,
|
||||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
||||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
||||||
|
InetSocketAddress cblockServiceRpcAddress =
|
||||||
|
OzoneClientUtils.updateListenAddress(conf,
|
||||||
|
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService);
|
||||||
LOG.info("CBlock manager listening for client commands on: {}",
|
LOG.info("CBlock manager listening for client commands on: {}",
|
||||||
serviceRpcAddr);
|
cblockServiceRpcAddress);
|
||||||
// now start service for cblock client-to-cblock server communication
|
// now start service for cblock client-to-cblock server communication
|
||||||
InetSocketAddress serverRpcAddr = NetUtils.createSocketAddr(
|
|
||||||
conf.get(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY,
|
InetSocketAddress serverRpcAddr =
|
||||||
DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT), -1,
|
OzoneClientUtils.getCblockServerRpcAddr(conf);
|
||||||
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
|
|
||||||
BlockingService serverProto =
|
BlockingService serverProto =
|
||||||
CBlockClientServerProtocolProtos
|
CBlockClientServerProtocolProtos
|
||||||
.CBlockClientServerProtocolService
|
.CBlockClientServerProtocolService
|
||||||
|
@ -153,8 +169,11 @@ public class CBlockManager implements CBlockServiceProtocol,
|
||||||
DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
|
DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY,
|
||||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
|
||||||
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
|
||||||
|
InetSocketAddress cblockServerRpcAddress =
|
||||||
|
OzoneClientUtils.updateListenAddress(conf,
|
||||||
|
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
|
||||||
LOG.info("CBlock server listening for client commands on: {}",
|
LOG.info("CBlock server listening for client commands on: {}",
|
||||||
serverRpcAddr);
|
cblockServerRpcAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
@ -30,11 +31,6 @@ import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY;
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICERPC_PORT_KEY;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of client used by CBlock command line tool.
|
* Implementation of client used by CBlock command line tool.
|
||||||
*/
|
*/
|
||||||
|
@ -45,12 +41,7 @@ public class CBlockVolumeClient {
|
||||||
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
|
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
|
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
|
||||||
String serverAddress = conf.get(DFS_CBLOCK_SERVICERPC_HOSTNAME_KEY,
|
InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
|
||||||
DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT);
|
|
||||||
int serverPort = conf.getInt(DFS_CBLOCK_SERVICERPC_PORT_KEY,
|
|
||||||
DFS_CBLOCK_SERVICERPC_PORT_DEFAULT);
|
|
||||||
InetSocketAddress address = new InetSocketAddress(
|
|
||||||
serverAddress, serverPort);
|
|
||||||
// currently the largest supported volume is about 8TB, which might take
|
// currently the largest supported volume is about 8TB, which might take
|
||||||
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
|
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
|
||||||
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
|
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.net.HostAndPort;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||||
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.config.RequestConfig;
|
||||||
|
@ -39,6 +40,17 @@ import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
.OZONE_KSM_BIND_HOST_DEFAULT;
|
.OZONE_KSM_BIND_HOST_DEFAULT;
|
||||||
|
@ -299,6 +311,44 @@ public final class OzoneClientUtils {
|
||||||
port.or(OZONE_KSM_PORT_DEFAULT));
|
port.or(OZONE_KSM_PORT_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the socket address that is used by CBlock Service.
|
||||||
|
* @param conf
|
||||||
|
* @return Target InetSocketAddress for the CBlock Service endpoint.
|
||||||
|
*/
|
||||||
|
public static InetSocketAddress getCblockServiceRpcAddr(
|
||||||
|
Configuration conf) {
|
||||||
|
final Optional<String> host = getHostNameFromConfigKeys(conf,
|
||||||
|
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
|
||||||
|
|
||||||
|
// If no port number is specified then we'll just try the defaultBindPort.
|
||||||
|
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
|
||||||
|
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
|
||||||
|
|
||||||
|
return NetUtils.createSocketAddr(
|
||||||
|
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
|
||||||
|
port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the socket address that is used by CBlock Server.
|
||||||
|
* @param conf
|
||||||
|
* @return Target InetSocketAddress for the CBlock Server endpoint.
|
||||||
|
*/
|
||||||
|
public static InetSocketAddress getCblockServerRpcAddr(
|
||||||
|
Configuration conf) {
|
||||||
|
final Optional<String> host = getHostNameFromConfigKeys(conf,
|
||||||
|
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
|
||||||
|
|
||||||
|
// If no port number is specified then we'll just try the defaultBindPort.
|
||||||
|
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
|
||||||
|
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
|
||||||
|
|
||||||
|
return NetUtils.createSocketAddr(
|
||||||
|
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
|
||||||
|
port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the hostname, trying the supplied config keys in order.
|
* Retrieve the hostname, trying the supplied config keys in order.
|
||||||
* Each config value may be absent, or if present in the format
|
* Each config value may be absent, or if present in the format
|
||||||
|
@ -560,6 +610,28 @@ public final class OzoneClientUtils {
|
||||||
.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After starting an RPC server, updates configuration with the actual
|
||||||
|
* listening address of that server. The listening address may be different
|
||||||
|
* from the configured address if, for example, the configured address uses
|
||||||
|
* port 0 to request use of an ephemeral port.
|
||||||
|
*
|
||||||
|
* @param conf configuration to update
|
||||||
|
* @param rpcAddressKey configuration key for RPC server address
|
||||||
|
* @param addr configured address
|
||||||
|
* @param rpcServer started RPC server.
|
||||||
|
*/
|
||||||
|
public static InetSocketAddress updateListenAddress(
|
||||||
|
OzoneConfiguration conf, String rpcAddressKey,
|
||||||
|
InetSocketAddress addr, RPC.Server rpcServer) {
|
||||||
|
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
|
||||||
|
InetSocketAddress updatedAddr = new InetSocketAddress(
|
||||||
|
addr.getHostString(), listenAddr.getPort());
|
||||||
|
conf.set(rpcAddressKey,
|
||||||
|
listenAddr.getHostString() + ":" + listenAddr.getPort());
|
||||||
|
return updatedAddr;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Releases a http connection if the request is not null.
|
* Releases a http connection if the request is not null.
|
||||||
* @param request
|
* @param request
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
|
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
|
||||||
KeySpaceManagerProtocolPB.class, ksmService,
|
KeySpaceManagerProtocolPB.class, ksmService,
|
||||||
handlerCount);
|
handlerCount);
|
||||||
ksmRpcAddress = updateListenAddress(conf,
|
ksmRpcAddress = OzoneClientUtils.updateListenAddress(conf,
|
||||||
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
||||||
metadataManager = new MetadataManagerImpl(conf);
|
metadataManager = new MetadataManagerImpl(conf);
|
||||||
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
||||||
|
@ -190,27 +190,6 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
String.format("%s not started", description);
|
String.format("%s not started", description);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* After starting an RPC server, updates configuration with the actual
|
|
||||||
* listening address of that server. The listening address may be different
|
|
||||||
* from the configured address if, for example, the configured address uses
|
|
||||||
* port 0 to request use of an ephemeral port.
|
|
||||||
*
|
|
||||||
* @param conf configuration to update
|
|
||||||
* @param rpcAddressKey configuration key for RPC server address
|
|
||||||
* @param addr configured address
|
|
||||||
* @param rpcServer started RPC server.
|
|
||||||
*/
|
|
||||||
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
|
|
||||||
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
|
|
||||||
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
|
|
||||||
InetSocketAddress updatedAddr = new InetSocketAddress(
|
|
||||||
addr.getHostString(), listenAddr.getPort());
|
|
||||||
conf.set(rpcAddressKey,
|
|
||||||
listenAddr.getHostString() + ":" + listenAddr.getPort());
|
|
||||||
return updatedAddr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start service.
|
* Start service.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -209,7 +209,7 @@ public class StorageContainerManager
|
||||||
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
|
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
|
||||||
StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
|
StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
|
||||||
handlerCount);
|
handlerCount);
|
||||||
datanodeRpcAddress = updateListenAddress(conf,
|
datanodeRpcAddress = OzoneClientUtils.updateListenAddress(conf,
|
||||||
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
|
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
|
||||||
|
|
||||||
// SCM Container Service RPC
|
// SCM Container Service RPC
|
||||||
|
@ -224,7 +224,7 @@ public class StorageContainerManager
|
||||||
clientRpcServer = startRpcServer(conf, scmAddress,
|
clientRpcServer = startRpcServer(conf, scmAddress,
|
||||||
StorageContainerLocationProtocolPB.class, storageProtoPbService,
|
StorageContainerLocationProtocolPB.class, storageProtoPbService,
|
||||||
handlerCount);
|
handlerCount);
|
||||||
clientRpcAddress = updateListenAddress(conf,
|
clientRpcAddress = OzoneClientUtils.updateListenAddress(conf,
|
||||||
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
|
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
|
||||||
|
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ public class StorageContainerManager
|
||||||
blockRpcServer = startRpcServer(conf, scmBlockAddress,
|
blockRpcServer = startRpcServer(conf, scmBlockAddress,
|
||||||
ScmBlockLocationProtocolPB.class, blockProtoPbService,
|
ScmBlockLocationProtocolPB.class, blockProtoPbService,
|
||||||
handlerCount);
|
handlerCount);
|
||||||
blockRpcAddress = updateListenAddress(conf,
|
blockRpcAddress = OzoneClientUtils.updateListenAddress(conf,
|
||||||
OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
|
OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
|
||||||
|
|
||||||
registerMXBean();
|
registerMXBean();
|
||||||
|
@ -302,27 +302,6 @@ public class StorageContainerManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* After starting an RPC server, updates configuration with the actual
|
|
||||||
* listening address of that server. The listening address may be different
|
|
||||||
* from the configured address if, for example, the configured address uses
|
|
||||||
* port 0 to request use of an ephemeral port.
|
|
||||||
*
|
|
||||||
* @param conf configuration to update
|
|
||||||
* @param rpcAddressKey configuration key for RPC server address
|
|
||||||
* @param addr configured address
|
|
||||||
* @param rpcServer started RPC server.
|
|
||||||
*/
|
|
||||||
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
|
|
||||||
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
|
|
||||||
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
|
|
||||||
InetSocketAddress updatedAddr = new InetSocketAddress(
|
|
||||||
addr.getHostString(), listenAddr.getPort());
|
|
||||||
conf.set(rpcAddressKey,
|
|
||||||
listenAddr.getHostString() + ":" + listenAddr.getPort());
|
|
||||||
return updatedAddr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main entry point for starting StorageContainerManager.
|
* Main entry point for starting StorageContainerManager.
|
||||||
*
|
*
|
||||||
|
|
|
@ -55,7 +55,7 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for Tests for local cache.
|
* Tests for Local Cache Buffer Manager.
|
||||||
*/
|
*/
|
||||||
public class TestBufferManager {
|
public class TestBufferManager {
|
||||||
private final static long GB = 1024 * 1024 * 1024;
|
private final static long GB = 1024 * 1024 * 1024;
|
||||||
|
@ -310,6 +310,8 @@ public class TestBufferManager {
|
||||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
flushTestConfig
|
||||||
|
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 120);
|
||||||
|
|
||||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
|
|
@ -22,17 +22,24 @@ import org.apache.hadoop.cblock.meta.VolumeDescriptor;
|
||||||
import org.apache.hadoop.cblock.util.MockStorageClient;
|
import org.apache.hadoop.cblock.util.MockStorageClient;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.scm.client.ScmClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -53,7 +60,16 @@ public class TestCBlockCLI {
|
||||||
outContent = new ByteArrayOutputStream();
|
outContent = new ByteArrayOutputStream();
|
||||||
ScmClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, "/tmp/testCblockCli.dat");
|
String path = GenericTestUtils
|
||||||
|
.getTempPath(TestCBlockCLI.class.getSimpleName());
|
||||||
|
File filePath = new File(path);
|
||||||
|
if (!filePath.exists() && !filePath.mkdirs()) {
|
||||||
|
throw new IOException("Unable to create test DB dir");
|
||||||
|
}
|
||||||
|
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
|
||||||
|
"/testCblockCli.dat"));
|
||||||
cBlockManager = new CBlockManager(conf, storageClient);
|
cBlockManager = new CBlockManager(conf, storageClient);
|
||||||
cBlockManager.start();
|
cBlockManager.start();
|
||||||
testPrintOut = new PrintStream(outContent);
|
testPrintOut = new PrintStream(outContent);
|
||||||
|
|
|
@ -0,0 +1,366 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.cblock;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
|
||||||
|
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
|
||||||
|
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
|
||||||
|
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||||
|
import org.apache.hadoop.scm.XceiverClientManager;
|
||||||
|
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.scm.protocolPB
|
||||||
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_TRACE_IO;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for Cblock read write functionality.
|
||||||
|
*/
|
||||||
|
public class TestCBlockReadWrite {
|
||||||
|
private final static long GB = 1024 * 1024 * 1024;
|
||||||
|
private final static int KB = 1024;
|
||||||
|
private static MiniOzoneCluster cluster;
|
||||||
|
private static OzoneConfiguration config;
|
||||||
|
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
|
storageContainerLocationClient;
|
||||||
|
private static XceiverClientManager xceiverClientManager;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init() throws IOException {
|
||||||
|
config = new OzoneConfiguration();
|
||||||
|
URL p = config.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(
|
||||||
|
TestOzoneContainer.class.getSimpleName());
|
||||||
|
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
|
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
cluster = new MiniOzoneCluster.Builder(config)
|
||||||
|
.numDataNodes(1)
|
||||||
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||||
|
storageContainerLocationClient = cluster
|
||||||
|
.createStorageContainerLocationClient();
|
||||||
|
xceiverClientManager = new XceiverClientManager(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdown() throws InterruptedException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getContainerPipelines creates a set of containers and returns the
|
||||||
|
* Pipelines that define those containers.
|
||||||
|
*
|
||||||
|
* @param count - Number of containers to create.
|
||||||
|
* @return - List of Pipelines.
|
||||||
|
* @throws IOException throws Exception
|
||||||
|
*/
|
||||||
|
private List<Pipeline> getContainerPipeline(int count) throws IOException {
|
||||||
|
List<Pipeline> containerPipelines = new LinkedList<>();
|
||||||
|
for (int x = 0; x < count; x++) {
|
||||||
|
String traceID = "trace" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String containerName = "container" + RandomStringUtils.randomNumeric(10);
|
||||||
|
Pipeline pipeline =
|
||||||
|
storageContainerLocationClient.allocateContainer(containerName);
|
||||||
|
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
|
||||||
|
ContainerProtocolCalls.createContainer(client, traceID);
|
||||||
|
// This step is needed since we set private data on pipelines, when we
|
||||||
|
// read the list from CBlockServer. So we mimic that action here.
|
||||||
|
pipeline.setData(Longs.toByteArray(x));
|
||||||
|
containerPipelines.add(pipeline);
|
||||||
|
}
|
||||||
|
return containerPipelines;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test creates a cache and performs a simple write / read.
|
||||||
|
* The operations are done by bypassing the cache.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDirectIO() throws IOException,
|
||||||
|
InterruptedException, TimeoutException {
|
||||||
|
OzoneConfiguration cConfig = new OzoneConfiguration();
|
||||||
|
cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
|
||||||
|
cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
final long blockID = 0;
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String data = RandomStringUtils.random(4 * KB);
|
||||||
|
String dataHash = DigestUtils.sha256Hex(data);
|
||||||
|
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
|
||||||
|
xceiverClientManager, metrics);
|
||||||
|
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(cConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(getContainerPipeline(10))
|
||||||
|
.setClientManager(xceiverClientManager)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(flusher)
|
||||||
|
.setCBlockTargetMetrics(metrics)
|
||||||
|
.build();
|
||||||
|
cache.start();
|
||||||
|
Assert.assertFalse(cache.isShortCircuitIOEnabled());
|
||||||
|
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
|
||||||
|
Assert.assertEquals(1, metrics.getNumWriteOps());
|
||||||
|
// Please note that this read is directly from remote container
|
||||||
|
LogicalBlock block = cache.get(blockID);
|
||||||
|
Assert.assertEquals(1, metrics.getNumReadOps());
|
||||||
|
Assert.assertEquals(0, metrics.getNumReadCacheHits());
|
||||||
|
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
|
||||||
|
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
|
||||||
|
|
||||||
|
cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
|
||||||
|
Assert.assertEquals(2, metrics.getNumWriteOps());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
|
||||||
|
// Please note that this read is directly from remote container
|
||||||
|
block = cache.get(blockID + 1);
|
||||||
|
Assert.assertEquals(2, metrics.getNumReadOps());
|
||||||
|
Assert.assertEquals(0, metrics.getNumReadCacheHits());
|
||||||
|
Assert.assertEquals(2, metrics.getNumReadCacheMiss());
|
||||||
|
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
|
||||||
|
String readHash = DigestUtils.sha256Hex(block.getData().array());
|
||||||
|
Assert.assertEquals("File content does not match.", dataHash, readHash);
|
||||||
|
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||||
|
cache.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test writes some block to the cache and then shuts down the cache
|
||||||
|
* The cache is then restarted with "short.circuit.io" disable to check
|
||||||
|
* that the blocks are read correctly from the container.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testContainerWrites() throws IOException,
|
||||||
|
InterruptedException, TimeoutException {
|
||||||
|
// Create a new config so that this tests write metafile to new location
|
||||||
|
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||||
|
URL p = flushTestConfig.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||||
|
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
|
||||||
|
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
|
||||||
|
int numUniqueBlocks = 4;
|
||||||
|
String[] data = new String[numUniqueBlocks];
|
||||||
|
String[] dataHash = new String[numUniqueBlocks];
|
||||||
|
for (int i = 0; i < numUniqueBlocks; i++) {
|
||||||
|
data[i] = RandomStringUtils.random(4 * KB);
|
||||||
|
dataHash[i] = DigestUtils.sha256Hex(data[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xcm, metrics);
|
||||||
|
List<Pipeline> pipelines = getContainerPipeline(10);
|
||||||
|
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(pipelines)
|
||||||
|
.setClientManager(xcm)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(flusher)
|
||||||
|
.setCBlockTargetMetrics(metrics)
|
||||||
|
.build();
|
||||||
|
cache.start();
|
||||||
|
Thread flushListenerThread = new Thread(flusher);
|
||||||
|
flushListenerThread.setDaemon(true);
|
||||||
|
flushListenerThread.start();
|
||||||
|
Assert.assertTrue(cache.isShortCircuitIOEnabled());
|
||||||
|
// Write data to the cache
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
// Close the cache and flush the data to the containers
|
||||||
|
cache.close();
|
||||||
|
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||||
|
Assert.assertEquals(512, metrics.getNumWriteOps());
|
||||||
|
Thread.sleep(3000);
|
||||||
|
flusher.shutdown();
|
||||||
|
Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
|
||||||
|
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||||
|
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
|
||||||
|
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
|
||||||
|
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher newFlusher =
|
||||||
|
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
|
||||||
|
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(pipelines)
|
||||||
|
.setClientManager(xcm)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(newFlusher)
|
||||||
|
.setCBlockTargetMetrics(newMetrics)
|
||||||
|
.build();
|
||||||
|
newCache.start();
|
||||||
|
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
|
||||||
|
// this read will be from the container, also match the hash
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
LogicalBlock block = newCache.get(i);
|
||||||
|
String readHash = DigestUtils.sha256Hex(block.getData().array());
|
||||||
|
Assert.assertEquals("File content does not match, for index:"
|
||||||
|
+ i, dataHash[i % numUniqueBlocks], readHash);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
|
||||||
|
newCache.close();
|
||||||
|
newFlusher.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryLog() throws IOException,
|
||||||
|
InterruptedException, TimeoutException {
|
||||||
|
// Create a new config so that this tests write metafile to new location
|
||||||
|
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||||
|
URL p = flushTestConfig.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||||
|
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
|
||||||
|
|
||||||
|
int numblocks = 10;
|
||||||
|
flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
|
||||||
|
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String data = RandomStringUtils.random(4 * KB);
|
||||||
|
|
||||||
|
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
|
||||||
|
Pipeline fakePipeline = new Pipeline("fake");
|
||||||
|
fakePipeline.setData(Longs.toByteArray(1));
|
||||||
|
fakeContainerPipelines.add(fakePipeline);
|
||||||
|
|
||||||
|
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xceiverClientManager, metrics);
|
||||||
|
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(fakeContainerPipelines)
|
||||||
|
.setClientManager(xceiverClientManager)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(flusher)
|
||||||
|
.setCBlockTargetMetrics(metrics)
|
||||||
|
.build();
|
||||||
|
cache.start();
|
||||||
|
Thread flushListenerThread = new Thread(flusher);
|
||||||
|
flushListenerThread.setDaemon(true);
|
||||||
|
flushListenerThread.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < numblocks; i++) {
|
||||||
|
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
Assert.assertEquals(numblocks, metrics.getNumWriteOps());
|
||||||
|
Thread.sleep(3000);
|
||||||
|
|
||||||
|
// all the writes to the container will fail because of fake pipelines
|
||||||
|
Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
|
||||||
|
Assert.assertTrue(
|
||||||
|
metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
|
||||||
|
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
|
||||||
|
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
|
||||||
|
cache.close();
|
||||||
|
flusher.shutdown();
|
||||||
|
|
||||||
|
// restart cache with correct pipelines, now blocks should be uploaded
|
||||||
|
// correctly
|
||||||
|
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher newFlusher =
|
||||||
|
new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xceiverClientManager, newMetrics);
|
||||||
|
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(getContainerPipeline(10))
|
||||||
|
.setClientManager(xceiverClientManager)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(newFlusher)
|
||||||
|
.setCBlockTargetMetrics(newMetrics)
|
||||||
|
.build();
|
||||||
|
newCache.start();
|
||||||
|
Thread newFlushListenerThread = new Thread(newFlusher);
|
||||||
|
newFlushListenerThread.setDaemon(true);
|
||||||
|
newFlushListenerThread.start();
|
||||||
|
Thread.sleep(3000);
|
||||||
|
Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,6 +29,10 @@ import org.junit.Test;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -45,6 +49,8 @@ public class TestCBlockServer {
|
||||||
public static void setup() throws Exception {
|
public static void setup() throws Exception {
|
||||||
ScmClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
|
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
cBlockManager = new CBlockManager(conf, storageClient);
|
cBlockManager = new CBlockManager(conf, storageClient);
|
||||||
cBlockManager.start();
|
cBlockManager.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,12 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||||
|
.DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,6 +68,8 @@ public class TestCBlockServerPersistence {
|
||||||
"/testCblockPersistence.dat"));
|
"/testCblockPersistence.dat"));
|
||||||
try {
|
try {
|
||||||
ScmClient storageClient = new MockStorageClient();
|
ScmClient storageClient = new MockStorageClient();
|
||||||
|
conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
cBlockManager = new CBlockManager(conf, storageClient);
|
cBlockManager = new CBlockManager(conf, storageClient);
|
||||||
cBlockManager.start();
|
cBlockManager.start();
|
||||||
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
|
cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize);
|
||||||
|
@ -84,6 +91,8 @@ public class TestCBlockServerPersistence {
|
||||||
OzoneConfiguration conf1 = new OzoneConfiguration();
|
OzoneConfiguration conf1 = new OzoneConfiguration();
|
||||||
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
|
conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat(
|
||||||
"/testCblockPersistence.dat"));
|
"/testCblockPersistence.dat"));
|
||||||
|
conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
|
conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
cBlockManager1 = new CBlockManager(conf1, storageClient1);
|
cBlockManager1 = new CBlockManager(conf1, storageClient1);
|
||||||
cBlockManager1.start();
|
cBlockManager1.start();
|
||||||
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();
|
List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes();
|
||||||
|
|
|
@ -62,13 +62,9 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||||
DFS_CBLOCK_TRACE_IO;
|
DFS_CBLOCK_TRACE_IO;
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
|
||||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
|
||||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
|
||||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for Tests for local cache.
|
* Tests for local cache.
|
||||||
*/
|
*/
|
||||||
public class TestLocalBlockCache {
|
public class TestLocalBlockCache {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
|
@ -444,247 +440,4 @@ public class TestLocalBlockCache {
|
||||||
100, 20 * 1000);
|
100, 20 * 1000);
|
||||||
ozoneStore.close();
|
ozoneStore.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This test creates a cache and performs a simple write / read.
|
|
||||||
* The operations are done by bypassing the cache.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testDirectIO() throws IOException,
|
|
||||||
InterruptedException, TimeoutException {
|
|
||||||
OzoneConfiguration cConfig = new OzoneConfiguration();
|
|
||||||
cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
|
|
||||||
cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
|
||||||
final long blockID = 0;
|
|
||||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
|
||||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
|
||||||
String data = RandomStringUtils.random(4 * KB);
|
|
||||||
String dataHash = DigestUtils.sha256Hex(data);
|
|
||||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
|
||||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
|
|
||||||
xceiverClientManager, metrics);
|
|
||||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
|
||||||
.setConfiguration(cConfig)
|
|
||||||
.setVolumeName(volumeName)
|
|
||||||
.setUserName(userName)
|
|
||||||
.setPipelines(getContainerPipeline(10))
|
|
||||||
.setClientManager(xceiverClientManager)
|
|
||||||
.setBlockSize(4 * KB)
|
|
||||||
.setVolumeSize(50 * GB)
|
|
||||||
.setFlusher(flusher)
|
|
||||||
.setCBlockTargetMetrics(metrics)
|
|
||||||
.build();
|
|
||||||
cache.start();
|
|
||||||
Assert.assertFalse(cache.isShortCircuitIOEnabled());
|
|
||||||
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
|
|
||||||
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
|
|
||||||
Assert.assertEquals(1, metrics.getNumWriteOps());
|
|
||||||
// Please note that this read is directly from remote container
|
|
||||||
LogicalBlock block = cache.get(blockID);
|
|
||||||
Assert.assertEquals(1, metrics.getNumReadOps());
|
|
||||||
Assert.assertEquals(0, metrics.getNumReadCacheHits());
|
|
||||||
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
|
|
||||||
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
|
|
||||||
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
|
|
||||||
|
|
||||||
cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
|
|
||||||
Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
|
|
||||||
Assert.assertEquals(2, metrics.getNumWriteOps());
|
|
||||||
Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
|
|
||||||
// Please note that this read is directly from remote container
|
|
||||||
block = cache.get(blockID + 1);
|
|
||||||
Assert.assertEquals(2, metrics.getNumReadOps());
|
|
||||||
Assert.assertEquals(0, metrics.getNumReadCacheHits());
|
|
||||||
Assert.assertEquals(2, metrics.getNumReadCacheMiss());
|
|
||||||
Assert.assertEquals(0, metrics.getNumReadLostBlocks());
|
|
||||||
String readHash = DigestUtils.sha256Hex(block.getData().array());
|
|
||||||
Assert.assertEquals("File content does not match.", dataHash, readHash);
|
|
||||||
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
|
||||||
cache.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This test writes some block to the cache and then shuts down the cache
|
|
||||||
* The cache is then restarted with "short.circuit.io" disable to check
|
|
||||||
* that the blocks are read correctly from the container.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testContainerWrites() throws IOException,
|
|
||||||
InterruptedException, TimeoutException {
|
|
||||||
// Create a new config so that this tests write metafile to new location
|
|
||||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
|
||||||
URL p = flushTestConfig.getClass().getResource("");
|
|
||||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
|
||||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
|
||||||
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
|
|
||||||
XceiverClientManager xcm = new XceiverClientManager(flushTestConfig);
|
|
||||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
|
||||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
|
||||||
|
|
||||||
int numUniqueBlocks = 4;
|
|
||||||
String[] data = new String[numUniqueBlocks];
|
|
||||||
String[] dataHash = new String[numUniqueBlocks];
|
|
||||||
for (int i = 0; i < numUniqueBlocks; i++) {
|
|
||||||
data[i] = RandomStringUtils.random(4 * KB);
|
|
||||||
dataHash[i] = DigestUtils.sha256Hex(data[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
|
||||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
|
||||||
xcm, metrics);
|
|
||||||
List<Pipeline> pipelines = getContainerPipeline(10);
|
|
||||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
|
||||||
.setConfiguration(flushTestConfig)
|
|
||||||
.setVolumeName(volumeName)
|
|
||||||
.setUserName(userName)
|
|
||||||
.setPipelines(pipelines)
|
|
||||||
.setClientManager(xcm)
|
|
||||||
.setBlockSize(4 * KB)
|
|
||||||
.setVolumeSize(50 * GB)
|
|
||||||
.setFlusher(flusher)
|
|
||||||
.setCBlockTargetMetrics(metrics)
|
|
||||||
.build();
|
|
||||||
cache.start();
|
|
||||||
Thread flushListenerThread = new Thread(flusher);
|
|
||||||
flushListenerThread.setDaemon(true);
|
|
||||||
flushListenerThread.start();
|
|
||||||
Assert.assertTrue(cache.isShortCircuitIOEnabled());
|
|
||||||
// Write data to the cache
|
|
||||||
for (int i = 0; i < 512; i++) {
|
|
||||||
cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
// Close the cache and flush the data to the containers
|
|
||||||
cache.close();
|
|
||||||
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
|
||||||
Assert.assertEquals(512, metrics.getNumWriteOps());
|
|
||||||
Thread.sleep(5000);
|
|
||||||
flusher.shutdown();
|
|
||||||
Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1);
|
|
||||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
|
||||||
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
|
|
||||||
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
|
|
||||||
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
|
|
||||||
// Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache
|
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
|
|
||||||
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
|
||||||
ContainerCacheFlusher newFlusher =
|
|
||||||
new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics);
|
|
||||||
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
|
||||||
.setConfiguration(flushTestConfig)
|
|
||||||
.setVolumeName(volumeName)
|
|
||||||
.setUserName(userName)
|
|
||||||
.setPipelines(pipelines)
|
|
||||||
.setClientManager(xcm)
|
|
||||||
.setBlockSize(4 * KB)
|
|
||||||
.setVolumeSize(50 * GB)
|
|
||||||
.setFlusher(newFlusher)
|
|
||||||
.setCBlockTargetMetrics(newMetrics)
|
|
||||||
.build();
|
|
||||||
newCache.start();
|
|
||||||
Assert.assertFalse(newCache.isShortCircuitIOEnabled());
|
|
||||||
// this read will be from the container, also match the hash
|
|
||||||
for (int i = 0; i < 512; i++) {
|
|
||||||
LogicalBlock block = newCache.get(i);
|
|
||||||
String readHash = DigestUtils.sha256Hex(block.getData().array());
|
|
||||||
Assert.assertEquals("File content does not match, for index:"
|
|
||||||
+ i, dataHash[i % numUniqueBlocks], readHash);
|
|
||||||
}
|
|
||||||
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
|
|
||||||
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
|
|
||||||
newCache.close();
|
|
||||||
newFlusher.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRetryLog() throws IOException,
|
|
||||||
InterruptedException, TimeoutException {
|
|
||||||
// Create a new config so that this tests write metafile to new location
|
|
||||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
|
||||||
URL p = flushTestConfig.getClass().getResource("");
|
|
||||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
|
||||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
|
||||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
|
||||||
flushTestConfig.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 3);
|
|
||||||
|
|
||||||
int numblocks = 10;
|
|
||||||
flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks);
|
|
||||||
|
|
||||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
|
||||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
|
||||||
String data = RandomStringUtils.random(4 * KB);
|
|
||||||
|
|
||||||
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
|
|
||||||
Pipeline fakePipeline = new Pipeline("fake");
|
|
||||||
fakePipeline.setData(Longs.toByteArray(1));
|
|
||||||
fakeContainerPipelines.add(fakePipeline);
|
|
||||||
|
|
||||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
|
||||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
|
||||||
xceiverClientManager, metrics);
|
|
||||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
|
||||||
.setConfiguration(flushTestConfig)
|
|
||||||
.setVolumeName(volumeName)
|
|
||||||
.setUserName(userName)
|
|
||||||
.setPipelines(fakeContainerPipelines)
|
|
||||||
.setClientManager(xceiverClientManager)
|
|
||||||
.setBlockSize(4 * KB)
|
|
||||||
.setVolumeSize(50 * GB)
|
|
||||||
.setFlusher(flusher)
|
|
||||||
.setCBlockTargetMetrics(metrics)
|
|
||||||
.build();
|
|
||||||
cache.start();
|
|
||||||
Thread flushListenerThread = new Thread(flusher);
|
|
||||||
flushListenerThread.setDaemon(true);
|
|
||||||
flushListenerThread.start();
|
|
||||||
|
|
||||||
for (int i = 0; i < numblocks; i++) {
|
|
||||||
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
Assert.assertEquals(numblocks, metrics.getNumWriteOps());
|
|
||||||
Thread.sleep(3000);
|
|
||||||
|
|
||||||
// all the writes to the container will fail because of fake pipelines
|
|
||||||
Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead());
|
|
||||||
Assert.assertTrue(
|
|
||||||
metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks);
|
|
||||||
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
|
|
||||||
Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites());
|
|
||||||
Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB());
|
|
||||||
cache.close();
|
|
||||||
flusher.shutdown();
|
|
||||||
|
|
||||||
// restart cache with correct pipelines, now blocks should be uploaded
|
|
||||||
// correctly
|
|
||||||
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
|
||||||
ContainerCacheFlusher newFlusher =
|
|
||||||
new ContainerCacheFlusher(flushTestConfig,
|
|
||||||
xceiverClientManager, newMetrics);
|
|
||||||
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
|
||||||
.setConfiguration(flushTestConfig)
|
|
||||||
.setVolumeName(volumeName)
|
|
||||||
.setUserName(userName)
|
|
||||||
.setPipelines(getContainerPipeline(10))
|
|
||||||
.setClientManager(xceiverClientManager)
|
|
||||||
.setBlockSize(4 * KB)
|
|
||||||
.setVolumeSize(50 * GB)
|
|
||||||
.setFlusher(newFlusher)
|
|
||||||
.setCBlockTargetMetrics(newMetrics)
|
|
||||||
.build();
|
|
||||||
newCache.start();
|
|
||||||
Thread newFlushListenerThread = new Thread(newFlusher);
|
|
||||||
newFlushListenerThread.setDaemon(true);
|
|
||||||
newFlushListenerThread.start();
|
|
||||||
Thread.sleep(3000);
|
|
||||||
Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks);
|
|
||||||
Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks());
|
|
||||||
Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks());
|
|
||||||
Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue