parent
31d45b3c95
commit
34adfd9611
|
@ -138,7 +138,8 @@ public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin
|
|||
Map<String, Exporter.Factory> exporterFactories = new HashMap<>();
|
||||
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext()));
|
||||
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, cleanerService));
|
||||
exporters = new Exporters(settings, exporterFactories, clusterService, getLicenseState(), threadPool.getThreadContext());
|
||||
exporters = new Exporters(settings, exporterFactories, clusterService, getLicenseState(), threadPool.getThreadContext(),
|
||||
dynamicSSLService);
|
||||
|
||||
Set<Collector> collectors = new HashSet<>();
|
||||
collectors.add(new IndexStatsCollector(clusterService, getLicenseState(), client));
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter;
|
||||
|
@ -51,7 +52,7 @@ public class Exporters extends AbstractLifecycleComponent {
|
|||
|
||||
public Exporters(Settings settings, Map<String, Exporter.Factory> factories,
|
||||
ClusterService clusterService, XPackLicenseState licenseState,
|
||||
ThreadContext threadContext) {
|
||||
ThreadContext threadContext, SSLService sslService) {
|
||||
this.settings = settings;
|
||||
this.factories = factories;
|
||||
this.exporters = new AtomicReference<>(emptyMap());
|
||||
|
@ -62,7 +63,7 @@ public class Exporters extends AbstractLifecycleComponent {
|
|||
final List<Setting.AffixSetting<?>> dynamicSettings =
|
||||
getSettings().stream().filter(Setting::isDynamic).collect(Collectors.toList());
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(this::setExportersSetting, dynamicSettings);
|
||||
HttpExporter.registerSettingValidators(clusterService);
|
||||
HttpExporter.registerSettingValidators(clusterService, sslService);
|
||||
// this ensures that logging is happening by adding an empty consumer per affix setting
|
||||
for (Setting.AffixSetting<?> affixSetting : dynamicSettings) {
|
||||
clusterService.getClusterSettings().addAffixUpdateConsumer(affixSetting, (s, o) -> {}, (s, o) -> {});
|
||||
|
|
|
@ -307,6 +307,7 @@ public class HttpExporter extends Exporter {
|
|||
"ssl",
|
||||
(key) -> Setting.groupSetting(key + ".", Property.Dynamic, Property.NodeScope, Property.Filtered),
|
||||
TYPE_DEPENDENCY);
|
||||
|
||||
/**
|
||||
* Proxy setting to allow users to send requests to a remote cluster that requires a proxy base path.
|
||||
*/
|
||||
|
@ -482,24 +483,36 @@ public class HttpExporter extends Exporter {
|
|||
* Because it is not possible to re-read the secure settings during a dynamic update, we cannot rebuild the {@link SSLIOSessionStrategy}
|
||||
* (see {@link #configureSecurity(RestClientBuilder, Config, SSLService)} if this exporter has been configured with secure settings
|
||||
*/
|
||||
public static void registerSettingValidators(ClusterService clusterService) {
|
||||
public static void registerSettingValidators(ClusterService clusterService, SSLService sslService) {
|
||||
clusterService.getClusterSettings().addAffixUpdateConsumer(SSL_SETTING,
|
||||
(ignoreKey, ignoreSettings) -> {
|
||||
// no-op update. We only care about the validator
|
||||
},
|
||||
(namespace, settings) -> {
|
||||
final List<String> secureSettings = SSLConfigurationSettings.withoutPrefix()
|
||||
.getSecureSettingsInUse(settings)
|
||||
.stream()
|
||||
.map(Setting::getKey)
|
||||
.collect(Collectors.toList());
|
||||
if (secureSettings.isEmpty() == false) {
|
||||
throw new IllegalStateException("Cannot dynamically update SSL settings for the exporter [" + namespace
|
||||
+ "] as it depends on the secure setting(s) [" + Strings.collectionToCommaDelimitedString(secureSettings) + "]");
|
||||
}
|
||||
(key, settings) -> {
|
||||
validateSslSettings(key, settings);
|
||||
configureSslStrategy(settings, null, sslService);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that secure settings are not being used to rebuild the {@link SSLIOSessionStrategy}.
|
||||
*
|
||||
* @param exporter Name of the exporter to validate
|
||||
* @param settings Settings for the exporter
|
||||
* @throws IllegalStateException if any secure settings are used in the SSL configuration
|
||||
*/
|
||||
private static void validateSslSettings(String exporter, Settings settings) {
|
||||
final List<String> secureSettings = SSLConfigurationSettings.withoutPrefix()
|
||||
.getSecureSettingsInUse(settings)
|
||||
.stream()
|
||||
.map(Setting::getKey)
|
||||
.collect(Collectors.toList());
|
||||
if (secureSettings.isEmpty() == false) {
|
||||
throw new IllegalStateException("Cannot dynamically update SSL settings for the exporter [" + exporter
|
||||
+ "] as it depends on the secure setting(s) [" + Strings.collectionToCommaDelimitedString(secureSettings) + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link RestClientBuilder} from the HTTP Exporter's {@code config}.
|
||||
*
|
||||
|
@ -667,6 +680,30 @@ public class HttpExporter extends Exporter {
|
|||
private static void configureSecurity(final RestClientBuilder builder, final Config config, final SSLService sslService) {
|
||||
final Setting<Settings> concreteSetting = SSL_SETTING.getConcreteSettingForNamespace(config.name());
|
||||
final Settings sslSettings = concreteSetting.get(config.settings());
|
||||
final SSLIOSessionStrategy sslStrategy = configureSslStrategy(sslSettings, concreteSetting, sslService);
|
||||
final CredentialsProvider credentialsProvider = createCredentialsProvider(config);
|
||||
List<String> hostList = HOST_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings());
|
||||
// sending credentials in plaintext!
|
||||
if (credentialsProvider != null && hostList.stream().findFirst().orElse("").startsWith("https") == false) {
|
||||
logger.warn("exporter [{}] is not using https, but using user authentication with plaintext " +
|
||||
"username/password!", config.name());
|
||||
}
|
||||
|
||||
if (sslStrategy != null) {
|
||||
builder.setHttpClientConfigCallback(new SecurityHttpClientConfigCallback(sslStrategy, credentialsProvider));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the {@link SSLIOSessionStrategy} to use. Relies on {@link #registerSettingValidators(ClusterService, SSLService)}
|
||||
* to prevent invalid usage of secure settings in the SSL strategy.
|
||||
* @param sslSettings The exporter's SSL settings
|
||||
* @param concreteSetting Settings to use for {@link SSLConfiguration} if secure settings are used
|
||||
* @param sslService The SSL Service used to create the SSL Context necessary for TLS / SSL communication
|
||||
* @return Appropriately configured instance of {@link SSLIOSessionStrategy}
|
||||
*/
|
||||
private static SSLIOSessionStrategy configureSslStrategy(final Settings sslSettings, final Setting<Settings> concreteSetting,
|
||||
final SSLService sslService) {
|
||||
final SSLIOSessionStrategy sslStrategy;
|
||||
if (SSLConfigurationSettings.withoutPrefix().getSecureSettingsInUse(sslSettings).isEmpty()) {
|
||||
// This configuration does not use secure settings, so it is possible that is has been dynamically updated.
|
||||
|
@ -679,17 +716,7 @@ public class HttpExporter extends Exporter {
|
|||
final SSLConfiguration sslConfiguration = sslService.getSSLConfiguration(concreteSetting.getKey());
|
||||
sslStrategy = sslService.sslIOSessionStrategy(sslConfiguration);
|
||||
}
|
||||
final CredentialsProvider credentialsProvider = createCredentialsProvider(config);
|
||||
List<String> hostList = HOST_SETTING.getConcreteSettingForNamespace(config.name()).get(config.settings());
|
||||
// sending credentials in plaintext!
|
||||
if (credentialsProvider != null && hostList.stream().findFirst().orElse("").startsWith("https") == false) {
|
||||
logger.warn("exporter [{}] is not using https, but using user authentication with plaintext " +
|
||||
"username/password!", config.name());
|
||||
}
|
||||
|
||||
if (sslStrategy != null) {
|
||||
builder.setHttpClientConfigCallback(new SecurityHttpClientConfigCallback(sslStrategy, credentialsProvider));
|
||||
}
|
||||
return sslStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.license.XPackLicenseState;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
||||
import org.junit.After;
|
||||
|
@ -39,6 +40,7 @@ public class MonitoringServiceTests extends ESTestCase {
|
|||
private XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
private ClusterService clusterService;
|
||||
private ClusterSettings clusterSettings;
|
||||
private SSLService sslService;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -59,6 +61,7 @@ public class MonitoringServiceTests extends ESTestCase {
|
|||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
when(clusterService.state()).thenReturn(mock(ClusterState.class));
|
||||
when(clusterService.lifecycleState()).thenReturn(Lifecycle.State.STARTED);
|
||||
sslService = mock(SSLService.class);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -144,7 +147,7 @@ public class MonitoringServiceTests extends ESTestCase {
|
|||
private final AtomicInteger exports = new AtomicInteger(0);
|
||||
|
||||
CountingExporter() {
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService, licenseState, threadPool.getThreadContext());
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService, licenseState, threadPool.getThreadContext(), sslService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringService;
|
||||
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter;
|
||||
|
@ -68,6 +69,7 @@ public class ExportersTests extends ESTestCase {
|
|||
private Map<String, Exporter.Factory> factories;
|
||||
private ClusterService clusterService;
|
||||
private ClusterState state;
|
||||
private SSLService sslService;
|
||||
private final ClusterBlocks blocks = mock(ClusterBlocks.class);
|
||||
private final MetaData metadata = mock(MetaData.class);
|
||||
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
|
@ -93,11 +95,12 @@ public class ExportersTests extends ESTestCase {
|
|||
when(clusterService.state()).thenReturn(state);
|
||||
when(state.blocks()).thenReturn(blocks);
|
||||
when(state.metaData()).thenReturn(metadata);
|
||||
sslService = mock(SSLService.class);
|
||||
|
||||
// we always need to have the local exporter as it serves as the default one
|
||||
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, mock(CleanerService.class)));
|
||||
|
||||
exporters = new Exporters(Settings.EMPTY, factories, clusterService, licenseState, threadContext);
|
||||
exporters = new Exporters(Settings.EMPTY, factories, clusterService, licenseState, threadContext, sslService);
|
||||
}
|
||||
|
||||
public void testHostsMustBeSetIfTypeIsHttp() {
|
||||
|
@ -229,7 +232,7 @@ public class ExportersTests extends ESTestCase {
|
|||
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Exporters.getSettings()));
|
||||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
|
||||
exporters = new Exporters(nodeSettings, factories, clusterService, licenseState, threadContext) {
|
||||
exporters = new Exporters(nodeSettings, factories, clusterService, licenseState, threadContext, sslService) {
|
||||
@Override
|
||||
Map<String, Exporter> initExporters(Settings settings) {
|
||||
settingsHolder.set(settings);
|
||||
|
@ -275,7 +278,7 @@ public class ExportersTests extends ESTestCase {
|
|||
settings.put("xpack.monitoring.exporters._name" + String.valueOf(i) + ".type", "record");
|
||||
}
|
||||
|
||||
final Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext);
|
||||
final Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext, sslService);
|
||||
|
||||
// synchronously checks the cluster state
|
||||
exporters.wrapExportBulk(ActionListener.wrap(
|
||||
|
@ -295,7 +298,7 @@ public class ExportersTests extends ESTestCase {
|
|||
.put("xpack.monitoring.exporters.explicitly_disabled.type", "local")
|
||||
.put("xpack.monitoring.exporters.explicitly_disabled.enabled", false);
|
||||
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext);
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext, sslService);
|
||||
exporters.start();
|
||||
|
||||
assertThat(exporters.getEnabledExporters(), empty());
|
||||
|
@ -319,7 +322,7 @@ public class ExportersTests extends ESTestCase {
|
|||
|
||||
factories.put("record", (s) -> new CountingExporter(s, threadContext));
|
||||
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext);
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService, licenseState, threadContext, sslService);
|
||||
exporters.start();
|
||||
|
||||
assertThat(exporters.getEnabledExporters(), hasSize(nbExporters));
|
||||
|
|
Loading…
Reference in New Issue