Cleaned up TransportMessage and added transient context to it
- The context enables setting arbitrary transient data on the message (this data is not serialized with the request) - Changed header accessors/mutators so header manipulation will be done directly on the request (to void NPE with transport message headers when dealing with maps that can potentially be null)
This commit is contained in:
parent
e1a2d76626
commit
1f9bceb5c5
|
@ -19,36 +19,58 @@
|
||||||
|
|
||||||
package org.elasticsearch.transport;
|
package org.elasticsearch.transport;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {
|
public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {
|
||||||
|
|
||||||
|
// a transient (not serialized with the request) key/value registry
|
||||||
|
private final ConcurrentMap<Object, Object> context;
|
||||||
|
|
||||||
private Map<String, Object> headers;
|
private Map<String, Object> headers;
|
||||||
|
|
||||||
private TransportAddress remoteAddress;
|
private TransportAddress remoteAddress;
|
||||||
|
|
||||||
protected TransportMessage() {
|
protected TransportMessage() {
|
||||||
|
context = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TransportMessage(TM message) {
|
protected TransportMessage(TM message) {
|
||||||
// create a new copy of the headers, since we are creating a new request which might have
|
// create a new copy of the headers/context, since we are creating a new request
|
||||||
// its headers changed in the context of that specific request
|
// which might have its headers/context changed in the context of that specific request
|
||||||
if (message.getHeaders() != null) {
|
|
||||||
this.headers = new HashMap<>(message.getHeaders());
|
if (((TransportMessage<?>) message).headers != null) {
|
||||||
|
this.headers = new HashMap<>(((TransportMessage<?>) message).headers);
|
||||||
}
|
}
|
||||||
|
this.context = new ConcurrentHashMap<>(((TransportMessage<?>) message).context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The request context enables attaching transient data with the request - data
|
||||||
|
* that is not serialized along with the request.
|
||||||
|
*
|
||||||
|
* There are many use cases such data is required, for example, when processing the
|
||||||
|
* request headers and building other constructs from them, one could "cache" the
|
||||||
|
* already built construct to avoid reprocessing the header over and over again.
|
||||||
|
*
|
||||||
|
* @return The request context
|
||||||
|
*/
|
||||||
|
public ConcurrentMap<Object, Object> context() {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
public void remoteAddress(TransportAddress remoteAddress) {
|
public void remoteAddress(TransportAddress remoteAddress) {
|
||||||
this.remoteAddress = remoteAddress;
|
this.remoteAddress = remoteAddress;
|
||||||
|
@ -61,7 +83,7 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public final TM putHeader(String key, Object value) {
|
public final TM putHeader(String key, Object value) {
|
||||||
if (headers == null) {
|
if (headers == null) {
|
||||||
headers = Maps.newHashMap();
|
headers = new HashMap<>();
|
||||||
}
|
}
|
||||||
headers.put(key, value);
|
headers.put(key, value);
|
||||||
return (TM) this;
|
return (TM) this;
|
||||||
|
@ -69,22 +91,20 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public final <V> V getHeader(String key) {
|
public final <V> V getHeader(String key) {
|
||||||
if (headers == null) {
|
return headers != null ? (V) headers.get(key) : null;
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return (V) headers.get(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getHeaders() {
|
public final boolean hasHeader(String key) {
|
||||||
return this.headers;
|
return headers != null && headers.containsKey(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<String> getHeaders() {
|
||||||
|
return headers != null ? headers.keySet() : Collections.<String>emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
if (in.readBoolean()) {
|
headers = in.readBoolean() ? in.readMap() : null;
|
||||||
headers = in.readMap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -96,4 +116,5 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
|
||||||
out.writeMap(headers);
|
out.writeMap(headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,10 +53,12 @@ import org.elasticsearch.client.support.Headers;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
import org.elasticsearch.transport.TransportMessage;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
@ -135,9 +137,12 @@ public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase {
|
||||||
private final String action;
|
private final String action;
|
||||||
private final Map<String, Object> headers;
|
private final Map<String, Object> headers;
|
||||||
|
|
||||||
public InternalException(String action, Map<String, Object> headers) {
|
public InternalException(String action, TransportMessage<?> message) {
|
||||||
this.action = action;
|
this.action = action;
|
||||||
this.headers = headers;
|
this.headers = new HashMap<>();
|
||||||
|
for (String key : message.getHeaders()) {
|
||||||
|
headers.put(key, message.getHeader(key));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(ActionRequest request, ActionListener listener) {
|
protected void doExecute(ActionRequest request, ActionListener listener) {
|
||||||
listener.onFailure(new InternalException(actionName, request.getHeaders()));
|
listener.onFailure(new InternalException(actionName, request));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests {
|
||||||
((TransportResponseHandler<NodesInfoResponse>) handler).handleResponse(new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[0]));
|
((TransportResponseHandler<NodesInfoResponse>) handler).handleResponse(new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[0]));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
handler.handleException(new TransportException("", new InternalException(action, request.getHeaders())));
|
handler.handleException(new TransportException("", new InternalException(action, request)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -325,8 +325,8 @@ public class HeadersCopyClientTests extends ElasticsearchTestCase {
|
||||||
} else {
|
} else {
|
||||||
assertThat(request.getHeaders(), notNullValue());
|
assertThat(request.getHeaders(), notNullValue());
|
||||||
assertThat(request.getHeaders().size(), equalTo(headers.size()));
|
assertThat(request.getHeaders().size(), equalTo(headers.size()));
|
||||||
for (Map.Entry<String, Object> entry : request.getHeaders().entrySet()) {
|
for (String key : request.getHeaders()) {
|
||||||
assertThat(headers.get(entry.getKey()), equalTo(entry.getValue()));
|
assertThat(headers.get(key), equalTo(request.getHeader(key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,14 @@
|
||||||
package org.elasticsearch.test;
|
package org.elasticsearch.test;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
||||||
import com.carrotsearch.randomizedtesting.annotations.*;
|
import com.carrotsearch.randomizedtesting.annotations.Listeners;
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.lucene.search.FieldCache;
|
import org.apache.lucene.search.FieldCache;
|
||||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||||
import org.apache.lucene.util.AbstractRandomizedTest;
|
import org.apache.lucene.util.AbstractRandomizedTest;
|
||||||
|
|
|
@ -95,8 +95,8 @@ public class ConfigurableErrorNettyTransportModule extends AbstractModule {
|
||||||
final TransportRequest request = handler.newInstance();
|
final TransportRequest request = handler.newInstance();
|
||||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||||
request.readFrom(buffer);
|
request.readFrom(buffer);
|
||||||
if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) {
|
if (request.hasHeader("ERROR")) {
|
||||||
throw new ElasticsearchException((String) request.getHeaders().get("ERROR"));
|
throw new ElasticsearchException((String) request.getHeader("ERROR"));
|
||||||
}
|
}
|
||||||
if (handler.executor() == ThreadPool.Names.SAME) {
|
if (handler.executor() == ThreadPool.Names.SAME) {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.transport;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TransportMessageTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransientContext() throws Exception {
|
||||||
|
Message message = new Message();
|
||||||
|
message.putHeader("key1", "value1");
|
||||||
|
message.putHeader("key2", "value2");
|
||||||
|
message.context().put("key3", "value3");
|
||||||
|
|
||||||
|
BytesStreamOutput out = new BytesStreamOutput();
|
||||||
|
out.setVersion(Version.CURRENT);
|
||||||
|
message.writeTo(out);
|
||||||
|
BytesStreamInput in = new BytesStreamInput(out.bytes());
|
||||||
|
in.setVersion(Version.CURRENT);
|
||||||
|
message = new Message();
|
||||||
|
message.readFrom(in);
|
||||||
|
assertThat(message.getHeaders().size(), is(2));
|
||||||
|
assertThat((String) message.getHeader("key1"), equalTo("value1"));
|
||||||
|
assertThat((String) message.getHeader("key2"), equalTo("value2"));
|
||||||
|
assertThat(message.context().isEmpty(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyHeadersAndContext() throws Exception {
|
||||||
|
Message m1 = new Message();
|
||||||
|
m1.putHeader("key1", "value1");
|
||||||
|
m1.putHeader("key2", "value2");
|
||||||
|
m1.context().put("key3", "value3");
|
||||||
|
|
||||||
|
Message m2 = new Message(m1);
|
||||||
|
|
||||||
|
assertThat(m2.getHeaders().size(), is(2));
|
||||||
|
assertThat((String) m2.getHeader("key1"), equalTo("value1"));
|
||||||
|
assertThat((String) m2.getHeader("key2"), equalTo("value2"));
|
||||||
|
assertThat((String) m2.context().get("key3"), equalTo("value3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Message extends TransportMessage<Message> {
|
||||||
|
|
||||||
|
private Message() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private Message(Message message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue