add profile name to TransportChannel

Today, only the NettyTransportChannel implements the getProfileName method
and the other channel implementations do not. The profile name is useful for some
plugins to perform custom actions based on the name. Rather than checking the
type of the channel, it makes sense to always expose the profile name.

For DirectResponseChannels we use a name that cannot be used in the settings
to define another profile with that name. For LocalTransportChannel we use the
same name as the default profile.

Closes #10483
This commit is contained in:
jaymode 2015-05-20 12:41:32 -04:00
parent 6b3918a97c
commit 8060cd0794
7 changed files with 53 additions and 9 deletions

View File

@ -28,6 +28,8 @@ public interface TransportChannel {
String action(); String action();
String getProfileName();
void sendResponse(TransportResponse response) throws IOException; void sendResponse(TransportResponse response) throws IOException;
void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException; void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException;

View File

@ -20,7 +20,6 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings; import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.cluster.settings.DynamicSettings;
@ -56,6 +55,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
*/ */
public class TransportService extends AbstractLifecycleComponent<TransportService> { public class TransportService extends AbstractLifecycleComponent<TransportService> {
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
protected final Transport transport; protected final Transport transport;
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
@ -722,6 +723,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return action; return action;
} }
@Override
public String getProfileName() {
return DIRECT_RESPONSE_PROFILE;
}
@Override @Override
public void sendResponse(TransportResponse response) throws IOException { public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY); sendResponse(response, TransportResponseOptions.EMPTY);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.local;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus; import org.elasticsearch.transport.support.TransportStatus;
@ -34,6 +33,8 @@ import java.io.NotSerializableException;
*/ */
public class LocalTransportChannel implements TransportChannel { public class LocalTransportChannel implements TransportChannel {
private static final String LOCAL_TRANSPORT_PROFILE = "default";
private final LocalTransport sourceTransport; private final LocalTransport sourceTransport;
private final TransportServiceAdapter sourceTransportServiceAdapter; private final TransportServiceAdapter sourceTransportServiceAdapter;
// the transport we will *send to* // the transport we will *send to*
@ -56,6 +57,11 @@ public class LocalTransportChannel implements TransportChannel {
return action; return action;
} }
@Override
public String getProfileName() {
return LOCAL_TRANSPORT_PROFILE;
}
@Override @Override
public void sendResponse(TransportResponse response) throws IOException { public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY); sendResponse(response, TransportResponseOptions.EMPTY);

View File

@ -252,22 +252,23 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Settings fallbackSettings = createFallbackSettings(); Settings fallbackSettings = createFallbackSettings();
Settings defaultSettings = profiles.get(DEFAULT_PROFILE); Settings defaultSettings = profiles.get(DEFAULT_PROFILE);
// loop through all profiles and strart them app, special handling for default one // loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : profiles.entrySet()) { for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
Settings profileSettings = entry.getValue(); Settings profileSettings = entry.getValue();
String name = entry.getKey(); String name = entry.getKey();
if (DEFAULT_PROFILE.equals(name)) { if (!Strings.hasLength(name)) {
logger.info("transport profile configured without a name. skipping profile with settings [{}]", profileSettings.toDelimitedString(','));
continue;
} else if (DEFAULT_PROFILE.equals(name)) {
profileSettings = settingsBuilder() profileSettings = settingsBuilder()
.put(profileSettings) .put(profileSettings)
.put("port", profileSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE))) .put("port", profileSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)))
.build(); .build();
} else { } else if (profileSettings.get("port") == null) {
// if profile does not have a port, skip it // if profile does not have a port, skip it
if (profileSettings.get("port") == null) { logger.info("No port configured for profile [{}], not binding", name);
logger.info("No port configured for profile [{}], not binding", name); continue;
continue;
}
} }
// merge fallback settings with default settings with profile settings so we have complete settings with default values // merge fallback settings with default settings with profile settings so we have complete settings with default values

View File

@ -61,6 +61,7 @@ public class NettyTransportChannel implements TransportChannel {
this.profileName = profileName; this.profileName = profileName;
} }
@Override
public String getProfileName() { public String getProfileName() {
return profileName; return profileName;
} }

View File

@ -838,6 +838,11 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
return null; return null;
} }
@Override
public String getProfileName() {
return "";
}
@Override @Override
public void sendResponse(TransportResponse response) throws IOException { public void sendResponse(TransportResponse response) throws IOException {
} }

View File

@ -34,6 +34,7 @@ import org.elasticsearch.test.junit.rule.RepeatOnExceptionRule;
import org.elasticsearch.test.cache.recycler.MockBigArrays; import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -155,6 +156,28 @@ public class NettyTransportMultiPortTests extends ElasticsearchTestCase {
} }
} }
@Test
public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
// mimics someone trying to define a profile for .local which is the profile for a node request to itself
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", ports[1])
.put("transport.profiles..port", ports[2])
.build();
ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) {
assertPortIsBound(ports[0]);
assertConnectionRefused(ports[1]);
assertConnectionRefused(ports[2]);
} finally {
terminate(threadPool);
}
}
private int[] getRandomPorts(int numberOfPorts) { private int[] getRandomPorts(int numberOfPorts) {
IntHashSet ports = new IntHashSet(); IntHashSet ports = new IntHashSet();