diff --git a/pom.xml b/pom.xml
index 2dbcd88bf..753adf9ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,9 +81,19 @@
commons-logging
commons-logging
+
+ org.elasticsearch.plugin
+ transport-netty3-client
+
+
+ org.elasticsearch.plugin
+ transport-netty4-client
+ ${elasticsearch}
+
+
org.slf4j
log4j-over-slf4j
@@ -162,14 +172,6 @@
test
-
-
- org.elasticsearch.plugin
- transport-netty4-client
- ${elasticsearch}
-
-
-
org.projectlombok
lombok
diff --git a/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java b/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java
index 2d4ee0f91..515608183 100644
--- a/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java
+++ b/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java
@@ -17,12 +17,29 @@ package org.springframework.data.elasticsearch.client;
import static org.apache.commons.lang.StringUtils.*;
+import io.netty.util.ThreadDeathWatcher;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.index.reindex.ReindexPlugin;
+import org.elasticsearch.join.ParentJoinPlugin;
+import org.elasticsearch.percolator.PercolatorPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.mustache.MustachePlugin;
+import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +47,7 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
/**
* TransportClientFactoryBean
@@ -88,7 +106,7 @@ public class TransportClientFactoryBean implements FactoryBean,
protected void buildClient() throws Exception {
- client = new PreBuiltTransportClient(settings());
+ client = new SpringDataTransportClient(settings());
Assert.hasText(clusterNodes, "[Assertion failed] clusterNodes settings missing.");
for (String clusterNode : split(clusterNodes, COMMA)) {
String hostName = substringBeforeLast(clusterNode, COLON);
@@ -153,4 +171,97 @@ public class TransportClientFactoryBean implements FactoryBean,
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ /**
+ * Pretty exact copy of {@link PreBuiltTransportClient} except that we're inspecting the classpath for Netty
+ * dependencies to only include the ones available. {@link PreBuiltTransportClient} expects both Netty 3 and Netty 4
+ * to be present.
+ *
+ * @author Oliver Gierke
+ * @see https://github.com/elastic/elasticsearch/issues/31240
+ */
+ @SuppressWarnings("unchecked")
+ private static class SpringDataTransportClient extends TransportClient {
+
+ /**
+ * Netty wants to do some unwelcome things like use unsafe and replace a private field, or use a poorly considered
+ * buffer recycler. This method disables these things by default, but can be overridden by setting the corresponding
+ * system properties.
+ */
+ private static void initializeNetty() {
+ /*
+ * We disable three pieces of Netty functionality here:
+ * - we disable Netty from being unsafe
+ * - we disable Netty from replacing the selector key set
+ * - we disable Netty from using the recycler
+ *
+ * While permissions are needed to read and set these, the permissions needed here are innocuous and thus should simply be granted
+ * rather than us handling a security exception here.
+ */
+ setSystemPropertyIfUnset("io.netty.noUnsafe", Boolean.toString(true));
+ setSystemPropertyIfUnset("io.netty.noKeySetOptimization", Boolean.toString(true));
+ setSystemPropertyIfUnset("io.netty.recycler.maxCapacityPerThread", Integer.toString(0));
+ }
+
+ @SuppressForbidden(reason = "set system properties to configure Netty")
+ private static void setSystemPropertyIfUnset(final String key, final String value) {
+ final String currentValue = System.getProperty(key);
+ if (currentValue == null) {
+ System.setProperty(key, value);
+ }
+ }
+
+ private static final List OPTIONAL_DEPENDENCIES = Arrays.asList( //
+ "org.elasticsearch.transport.Netty3Plugin", //
+ "org.elasticsearch.transport.Netty4Plugin");
+
+ private static final Collection> PRE_INSTALLED_PLUGINS;
+
+ static {
+
+ initializeNetty();
+
+ List> plugins = new ArrayList<>();
+ boolean found = false;
+
+ for (String dependency : OPTIONAL_DEPENDENCIES) {
+ try {
+ plugins.add((Class extends Plugin>) ClassUtils.forName(dependency,
+ SpringDataTransportClient.class.getClassLoader()));
+ found = true;
+ } catch (ClassNotFoundException | LinkageError e) {}
+ }
+
+ Assert.state(found, "Neither Netty 3 or Netty 4 plugin found on the classpath. One of them is required to run the transport client!");
+
+ plugins.add(ReindexPlugin.class);
+ plugins.add(PercolatorPlugin.class);
+ plugins.add(MustachePlugin.class);
+ plugins.add(ParentJoinPlugin.class);
+
+ PRE_INSTALLED_PLUGINS = Collections.unmodifiableList(plugins);
+ }
+
+ public SpringDataTransportClient(Settings settings) {
+ super(settings, PRE_INSTALLED_PLUGINS);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings) == false
+ || NetworkModule.TRANSPORT_TYPE_SETTING.get(settings).equals(Netty4Plugin.NETTY_TRANSPORT_NAME)) {
+ try {
+ GlobalEventExecutor.INSTANCE.awaitInactivity(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ try {
+ ThreadDeathWatcher.awaitInactivity(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
}