parent
b6334381df
commit
e952ecf807
|
@ -25,6 +25,10 @@ import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.RemovalListener;
|
import com.google.common.cache.RemovalListener;
|
||||||
import com.google.common.cache.RemovalNotification;
|
import com.google.common.cache.RemovalNotification;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdds.conf.Config;
|
||||||
|
import org.apache.hadoop.hdds.conf.ConfigGroup;
|
||||||
|
import org.apache.hadoop.hdds.conf.ConfigType;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
@ -38,14 +42,9 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
|
||||||
.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
|
||||||
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
|
||||||
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* XceiverClientManager is responsible for the lifecycle of XceiverClient
|
* XceiverClientManager is responsible for the lifecycle of XceiverClient
|
||||||
|
@ -76,20 +75,21 @@ public class XceiverClientManager implements Closeable {
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
*/
|
*/
|
||||||
public XceiverClientManager(Configuration conf) {
|
public XceiverClientManager(Configuration conf) {
|
||||||
|
this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
public XceiverClientManager(Configuration conf, ScmClientConfig clientConf) {
|
||||||
|
Preconditions.checkNotNull(clientConf);
|
||||||
Preconditions.checkNotNull(conf);
|
Preconditions.checkNotNull(conf);
|
||||||
int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
|
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
|
||||||
SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
|
|
||||||
long staleThresholdMs = conf.getTimeDuration(
|
|
||||||
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
|
|
||||||
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
|
|
||||||
this.useRatis = conf.getBoolean(
|
this.useRatis = conf.getBoolean(
|
||||||
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
|
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
|
||||||
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
|
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
|
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
|
||||||
this.clientCache = CacheBuilder.newBuilder()
|
this.clientCache = CacheBuilder.newBuilder()
|
||||||
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
|
.expireAfterAccess(staleThresholdMs, MILLISECONDS)
|
||||||
.maximumSize(maxSize)
|
.maximumSize(clientConf.getMaxSize())
|
||||||
.removalListener(
|
.removalListener(
|
||||||
new RemovalListener<String, XceiverClientSpi>() {
|
new RemovalListener<String, XceiverClientSpi>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -299,4 +299,65 @@ public class XceiverClientManager implements Closeable {
|
||||||
|
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration for HDDS client.
|
||||||
|
*/
|
||||||
|
@ConfigGroup(prefix = "scm.container.client")
|
||||||
|
public static class ScmClientConfig {
|
||||||
|
|
||||||
|
private int maxSize;
|
||||||
|
private long staleThreshold;
|
||||||
|
private int maxOutstandingRequests;
|
||||||
|
|
||||||
|
public long getStaleThreshold(TimeUnit unit) {
|
||||||
|
return unit.convert(staleThreshold, MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Config(key = "idle.threshold",
|
||||||
|
type = ConfigType.TIME, timeUnit = MILLISECONDS,
|
||||||
|
defaultValue = "10s",
|
||||||
|
tags = { OZONE, PERFORMANCE },
|
||||||
|
description =
|
||||||
|
"In the standalone pipelines, the SCM clients use netty to "
|
||||||
|
+ " communicate with the container. It also uses connection pooling"
|
||||||
|
+ " to reduce client side overheads. This allows a connection to"
|
||||||
|
+ " stay idle for a while before the connection is closed."
|
||||||
|
)
|
||||||
|
public void setStaleThreshold(long staleThreshold) {
|
||||||
|
this.staleThreshold = staleThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxSize() {
|
||||||
|
return maxSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Config(key = "max.size",
|
||||||
|
defaultValue = "256",
|
||||||
|
tags = { OZONE, PERFORMANCE },
|
||||||
|
description =
|
||||||
|
"Controls the maximum number of connections that are cached via"
|
||||||
|
+ " client connection pooling. If the number of connections"
|
||||||
|
+ " exceed this count, then the oldest idle connection is evicted."
|
||||||
|
)
|
||||||
|
public void setMaxSize(int maxSize) {
|
||||||
|
this.maxSize = maxSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxOutstandingRequests() {
|
||||||
|
return maxOutstandingRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Config(key = "max.outstanding.requests",
|
||||||
|
defaultValue = "100",
|
||||||
|
tags = { OZONE, PERFORMANCE },
|
||||||
|
description =
|
||||||
|
"Controls the maximum number of outstanding async requests that can"
|
||||||
|
+ " be handled by the Standalone as well as Ratis client."
|
||||||
|
)
|
||||||
|
public void setMaxOutstandingRequests(int maxOutstandingRequests) {
|
||||||
|
this.maxOutstandingRequests = maxOutstandingRequests;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
|
||||||
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
|
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||||
|
@ -285,10 +285,9 @@ public final class HddsClientUtils {
|
||||||
* Standalone and Ratis client.
|
* Standalone and Ratis client.
|
||||||
*/
|
*/
|
||||||
public static int getMaxOutstandingRequests(Configuration config) {
|
public static int getMaxOutstandingRequests(Configuration config) {
|
||||||
return config
|
return OzoneConfiguration.of(config)
|
||||||
.getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
|
.getObject(ScmClientConfig.class)
|
||||||
ScmConfigKeys
|
.getMaxOutstandingRequests();
|
||||||
.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
@ -46,6 +47,14 @@ public class OzoneConfiguration extends Configuration {
|
||||||
activate();
|
activate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static OzoneConfiguration of(Configuration conf) {
|
||||||
|
Preconditions.checkNotNull(conf);
|
||||||
|
|
||||||
|
return conf instanceof OzoneConfiguration
|
||||||
|
? (OzoneConfiguration) conf
|
||||||
|
: new OzoneConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
public OzoneConfiguration() {
|
public OzoneConfiguration() {
|
||||||
OzoneConfiguration.activate();
|
OzoneConfiguration.activate();
|
||||||
loadDefaults();
|
loadDefaults();
|
||||||
|
|
|
@ -36,21 +36,6 @@ public final class ScmConfigKeys {
|
||||||
// performance.
|
// performance.
|
||||||
public static final String OZONE_SCM_DB_DIRS = "ozone.scm.db.dirs";
|
public static final String OZONE_SCM_DB_DIRS = "ozone.scm.db.dirs";
|
||||||
|
|
||||||
public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY =
|
|
||||||
"scm.container.client.idle.threshold";
|
|
||||||
public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
|
|
||||||
"10s";
|
|
||||||
|
|
||||||
public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY =
|
|
||||||
"scm.container.client.max.size";
|
|
||||||
public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
|
|
||||||
256;
|
|
||||||
|
|
||||||
public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS =
|
|
||||||
"scm.container.client.max.outstanding.requests";
|
|
||||||
public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT
|
|
||||||
= 100;
|
|
||||||
|
|
||||||
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
|
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
|
||||||
= "dfs.container.ratis.enabled";
|
= "dfs.container.ratis.enabled";
|
||||||
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
|
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
|
||||||
|
|
|
@ -1061,39 +1061,6 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!--Client Settings-->
|
|
||||||
<property>
|
|
||||||
<name>scm.container.client.idle.threshold</name>
|
|
||||||
<value>10s</value>
|
|
||||||
<tag>OZONE, PERFORMANCE</tag>
|
|
||||||
<description>
|
|
||||||
In the standalone pipelines, the SCM clients use netty to
|
|
||||||
communicate with the container. It also uses connection pooling to
|
|
||||||
reduce client side overheads. This allows a connection to stay idle for
|
|
||||||
a while before the connection is closed.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>scm.container.client.max.size</name>
|
|
||||||
<value>256</value>
|
|
||||||
<tag>OZONE, PERFORMANCE</tag>
|
|
||||||
<description>
|
|
||||||
Controls the maximum number of connections that we cached via
|
|
||||||
clientconnection pooling. If the number of connection
|
|
||||||
exceed this count then the oldest idle connection is evicted.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>scm.container.client.max.outstanding.requests</name>
|
|
||||||
<value>100</value>
|
|
||||||
<tag>OZONE, PERFORMANCE</tag>
|
|
||||||
<description>
|
|
||||||
Controls the maximum number of outstanding async requests that can be
|
|
||||||
handled by the Standalone as well as Ratis client.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>ozone.scm.container.creation.lease.timeout</name>
|
<name>ozone.scm.container.creation.lease.timeout</name>
|
||||||
<value>60s</value>
|
<value>60s</value>
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.ozone.scm;
|
package org.apache.hadoop.ozone.scm;
|
||||||
|
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
|
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
|
@ -39,8 +40,6 @@ import java.io.IOException;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
|
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
|
||||||
import static org.apache.hadoop.hdds.scm
|
|
||||||
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for XceiverClientManager caching and eviction.
|
* Test for XceiverClientManager caching and eviction.
|
||||||
|
@ -110,11 +109,13 @@ public class TestXceiverClientManager {
|
||||||
@Test
|
@Test
|
||||||
public void testFreeByReference() throws IOException {
|
public void testFreeByReference() throws IOException {
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
|
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
|
||||||
|
clientConfig.setMaxSize(1);
|
||||||
String metaDir = GenericTestUtils.getTempPath(
|
String metaDir = GenericTestUtils.getTempPath(
|
||||||
TestXceiverClientManager.class.getName() + UUID.randomUUID());
|
TestXceiverClientManager.class.getName() + UUID.randomUUID());
|
||||||
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
|
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
XceiverClientManager clientManager =
|
||||||
|
new XceiverClientManager(conf, clientConfig);
|
||||||
Cache<String, XceiverClientSpi> cache =
|
Cache<String, XceiverClientSpi> cache =
|
||||||
clientManager.getClientCache();
|
clientManager.getClientCache();
|
||||||
|
|
||||||
|
@ -166,11 +167,13 @@ public class TestXceiverClientManager {
|
||||||
@Test
|
@Test
|
||||||
public void testFreeByEviction() throws IOException {
|
public void testFreeByEviction() throws IOException {
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
|
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
|
||||||
|
clientConfig.setMaxSize(1);
|
||||||
String metaDir = GenericTestUtils.getTempPath(
|
String metaDir = GenericTestUtils.getTempPath(
|
||||||
TestXceiverClientManager.class.getName() + UUID.randomUUID());
|
TestXceiverClientManager.class.getName() + UUID.randomUUID());
|
||||||
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
|
conf.set(HDDS_METADATA_DIR_NAME, metaDir);
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
XceiverClientManager clientManager =
|
||||||
|
new XceiverClientManager(conf, clientConfig);
|
||||||
Cache<String, XceiverClientSpi> cache =
|
Cache<String, XceiverClientSpi> cache =
|
||||||
clientManager.getClientCache();
|
clientManager.getClientCache();
|
||||||
|
|
||||||
|
@ -216,8 +219,10 @@ public class TestXceiverClientManager {
|
||||||
@Test
|
@Test
|
||||||
public void testFreeByRetryFailure() throws IOException {
|
public void testFreeByRetryFailure() throws IOException {
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
|
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
clientConfig.setMaxSize(1);
|
||||||
|
XceiverClientManager clientManager =
|
||||||
|
new XceiverClientManager(conf, clientConfig);
|
||||||
Cache<String, XceiverClientSpi> cache =
|
Cache<String, XceiverClientSpi> cache =
|
||||||
clientManager.getClientCache();
|
clientManager.getClientCache();
|
||||||
|
|
||||||
|
|
|
@ -109,12 +109,7 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
|
||||||
ClassLoader contextClassLoader =
|
ClassLoader contextClassLoader =
|
||||||
Thread.currentThread().getContextClassLoader();
|
Thread.currentThread().getContextClassLoader();
|
||||||
Thread.currentThread().setContextClassLoader(null);
|
Thread.currentThread().setContextClassLoader(null);
|
||||||
OzoneConfiguration conf;
|
OzoneConfiguration conf = OzoneConfiguration.of(hadoopConf);
|
||||||
if (hadoopConf instanceof OzoneConfiguration) {
|
|
||||||
conf = (OzoneConfiguration) hadoopConf;
|
|
||||||
} else {
|
|
||||||
conf = new OzoneConfiguration(hadoopConf);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
|
if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
|
||||||
// When the host name or service id isn't given
|
// When the host name or service id isn't given
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.ozone.OmUtils;
|
|
||||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
Loading…
Reference in New Issue