Add AssertingTransport that deploys basic BW-Compat serialization checks

This commit is contained in:
Simon Willnauer 2013-10-09 18:25:45 +02:00
parent a15f149dc6
commit 15b4d70298
7 changed files with 274 additions and 32 deletions

View File

@ -35,6 +35,8 @@ import org.elasticsearch.transport.netty.NettyTransportModule;
public class TransportModule extends AbstractModule implements SpawnModules {
private final Settings settings;
public static final String TRANSPORT_TYPE_KEY = "transport.type";
public TransportModule(Settings settings) {
this.settings = settings;
@ -48,7 +50,7 @@ public class TransportModule extends AbstractModule implements SpawnModules {
} else {
defaultTransportModule = NettyTransportModule.class;
}
return ImmutableList.of(Modules.createModule(settings.getAsClass("transport.type", defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
}
@Override

View File

@ -186,7 +186,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return this.threadPool;
}
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
try {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data, false);
@ -244,7 +244,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
protected void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
try {
response.readFrom(buffer);
@ -252,6 +252,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
handleParsedRespone(response, handler);
}
protected void handleParsedRespone(final TransportResponse response, final TransportResponseHandler handler) {
threadPool.executor(handler.executor()).execute(new Runnable() {
@SuppressWarnings({"unchecked"})
@Override

View File

@ -21,10 +21,12 @@ package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.annotations.*;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.junit.listeners.LoggingListener;
@ -35,9 +37,11 @@ import org.junit.BeforeClass;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticSearchThreadFilter.class})
@ -45,7 +49,7 @@ import java.util.concurrent.TimeUnit;
@TimeoutSuite(millis = TimeUnits.HOUR) // timeout the suite after 1h and fail the test.
@Listeners(LoggingListener.class)
public abstract class ElasticSearchTestCase extends AbstractRandomizedTest {
protected final ESLogger logger = Loggers.getLogger(getClass());
public static final String CHILD_VM_ID = System.getProperty("junit4.childvm.id", "" + System.currentTimeMillis());
@ -166,5 +170,47 @@ public abstract class ElasticSearchTestCase extends AbstractRandomizedTest {
public static boolean maybeDocValues() {
return LuceneTestCase.defaultCodecSupportsSortedSet() && randomBoolean();
}
}
private static final List<Version> SORTED_VERSIONS;
static {
Field[] declaredFields = Version.class.getDeclaredFields();
Set<Integer> ids = new HashSet<Integer>();
for (Field field : declaredFields) {
final int mod = field.getModifiers();
if (Modifier.isStatic(mod) && Modifier.isFinal(mod) && Modifier.isPublic(mod)) {
if (field.getType() == Version.class) {
try {
Version object = (Version) field.get(null);
ids.add(object.id);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
List<Integer> idList = new ArrayList<Integer>(ids);
Collections.sort(idList);
Collections.reverse(idList);
ImmutableList.Builder<Version> version = ImmutableList.builder();
for (Integer integer : idList) {
version.add(Version.fromId(integer));
}
SORTED_VERSIONS = version.build();
}
public static Version getPreviousVersion() {
Version version = SORTED_VERSIONS.get(1);
assert version.before(Version.CURRENT);
return version;
}
public static Version randomVersion() {
return randomVersion(getRandom());
}
public static Version randomVersion(Random random) {
return SORTED_VERSIONS.get(random.nextInt(SORTED_VERSIONS.size()));
}
}

View File

@ -52,6 +52,8 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.mock.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.junit.Assert;
@ -117,20 +119,30 @@ public class TestCluster implements Closeable, Iterable<Client> {
sharedNodesSeeds[i] = random.nextLong();
}
logger.info("Setup TestCluster [{}] with seed [{}] using [{}] nodes", clusterName, SeedUtils.formatSeed(clusterSeed), numSharedNodes);
this.defaultSettings = ImmutableSettings.settingsBuilder()
/* use RAM directories in 10% of the runs */
// .put("index.store.type", random.nextInt(10) == 0 ? MockRamIndexStoreModule.class.getName() : MockFSIndexStoreModule.class.getName())
.put("index.store.type", MockFSIndexStoreModule.class.getName()) // no RAM dir for now!
.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName())
.put("cluster.name", clusterName)
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
// default to non gateway
.put("gateway.type", "none")
.build();
Builder builder = ImmutableSettings.settingsBuilder()
/* use RAM directories in 10% of the runs */
// .put("index.store.type", random.nextInt(10) == 0 ? MockRamIndexStoreModule.class.getName() : MockFSIndexStoreModule.class.getName())
.put("index.store.type", MockFSIndexStoreModule.class.getName()) // no RAM dir for now!
.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName())
.put("cluster.name", clusterName)
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
// default to non gateway
.put("gateway.type", "none");
if (isLocalTransportConfigured()) {
builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName());
}
this.defaultSettings = builder.build();
this.nodeSettingsSource = nodeSettingsSource;
}
private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
}
return Boolean.parseBoolean(System.getProperty("es.node.local", "false"));
}
private Settings getSettings(int nodeOrdinal, Settings others) {
Builder builder = ImmutableSettings.settingsBuilder().put(defaultSettings);
Settings settings = nodeSettingsSource.settings(nodeOrdinal);

View File

@ -21,7 +21,9 @@ package org.elasticsearch.test.hamcrest;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
@ -34,10 +36,20 @@ import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ElasticSearchTestCase;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@ -51,14 +63,13 @@ import static org.junit.Assert.fail;
*/
public class ElasticSearchAssertions {
public static void assertAcked(PutMappingRequestBuilder builder) {
assertAcked(builder.get());
}
private static void assertAcked(PutMappingResponse response) {
assertThat("Put Mapping failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
}
public static void assertAcked(DeleteIndexRequestBuilder builder) {
@ -71,15 +82,17 @@ public class ElasticSearchAssertions {
public static void assertAcked(DeleteIndexResponse response) {
assertThat("Delete Index failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
}
public static void assertAcked(CreateIndexResponse response) {
assertThat("Create Index failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
}
public static String formatShardStatus(BroadcastOperationResponse response) {
String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() +
" & " + response.getFailedShards() + " shard failures:";
String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() + " & "
+ response.getFailedShards() + " shard failures:";
for (ShardOperationFailedException failure : response.getShardFailures()) {
msg += "\n " + failure.toString();
}
@ -87,8 +100,8 @@ public class ElasticSearchAssertions {
}
public static String formatShardStatus(SearchResponse response) {
String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() +
" & " + response.getFailedShards() + " shard failures:";
String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() + " & "
+ response.getFailedShards() + " shard failures:";
for (ShardSearchFailure failure : response.getShardFailures()) {
msg += "\n " + failure.toString();
}
@ -100,8 +113,10 @@ public class ElasticSearchAssertions {
*/
public static void assertHitCount(SearchResponse searchResponse, long expectedHitCount) {
if (searchResponse.getHits().totalHits() != expectedHitCount) {
fail("Hit count is " + searchResponse.getHits().totalHits() + " but " + expectedHitCount + " was expected. " + formatShardStatus(searchResponse));
fail("Hit count is " + searchResponse.getHits().totalHits() + " but " + expectedHitCount + " was expected. "
+ formatShardStatus(searchResponse));
}
assertVersionSerializable(searchResponse);
}
public static void assertSearchHits(SearchResponse searchResponse, String... ids) {
@ -110,9 +125,12 @@ public class ElasticSearchAssertions {
Set<String> idsSet = new HashSet<String>(Arrays.asList(ids));
for (SearchHit hit : searchResponse.getHits()) {
assertThat("Expected id: " + hit.getId() + " in the result but wasn't." + shardStatus, idsSet.remove(hit.getId()), equalTo(true));
assertThat("Expected id: " + hit.getId() + " in the result but wasn't." + shardStatus, idsSet.remove(hit.getId()),
equalTo(true));
}
assertThat("Expected ids: " + Arrays.toString(idsSet.toArray(new String[0])) + " in the result - result size differs." + shardStatus, idsSet.size(), equalTo(0));
assertThat("Expected ids: " + Arrays.toString(idsSet.toArray(new String[0])) + " in the result - result size differs."
+ shardStatus, idsSet.size(), equalTo(0));
assertVersionSerializable(searchResponse);
}
public static void assertOrderedSearchHits(SearchResponse searchResponse, String... ids) {
@ -122,13 +140,14 @@ public class ElasticSearchAssertions {
SearchHit hit = searchResponse.getHits().hits()[i];
assertThat("Expected id: " + ids[i] + " at position " + i + " but wasn't." + shardStatus, hit.getId(), equalTo(ids[i]));
}
assertVersionSerializable(searchResponse);
}
public static void assertHitCount(CountResponse countResponse, long expectedHitCount) {
if (countResponse.getCount() != expectedHitCount) {
fail("Count is " + countResponse.getCount() + " but " + expectedHitCount + " was expected. " +
formatShardStatus(countResponse));
fail("Count is " + countResponse.getCount() + " but " + expectedHitCount + " was expected. " + formatShardStatus(countResponse));
}
assertVersionSerializable(countResponse);
}
public static void assertFirstHit(SearchResponse searchResponse, Matcher<SearchHit> matcher) {
@ -148,18 +167,23 @@ public class ElasticSearchAssertions {
assertThat("SearchHit number must be greater than 0", number, greaterThan(0));
assertThat(searchResponse.getHits().totalHits(), greaterThanOrEqualTo((long) number));
assertSearchHit(searchResponse.getHits().getAt(number - 1), matcher);
assertVersionSerializable(searchResponse);
}
public static void assertNoFailures(SearchResponse searchResponse) {
assertThat("Unexpected ShardFailures: " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat("Unexpected ShardFailures: " + Arrays.toString(searchResponse.getShardFailures()),
searchResponse.getShardFailures().length, equalTo(0));
assertVersionSerializable(searchResponse);
}
public static void assertNoFailures(BroadcastOperationResponse response) {
assertThat("Unexpectd ShardFailures: " + Arrays.toString(response.getShardFailures()), response.getFailedShards(), equalTo(0));
assertVersionSerializable(response);
}
public static void assertSearchHit(SearchHit searchHit, Matcher<SearchHit> matcher) {
assertThat(searchHit, matcher);
assertVersionSerializable(searchHit);
}
public static void assertHighlight(SearchResponse resp, int hit, String field, int fragment, Matcher<String> matcher) {
@ -168,6 +192,7 @@ public class ElasticSearchAssertions {
assertThat(resp.getHits().hits()[hit].getHighlightFields().get(field), notNullValue());
assertThat(resp.getHits().hits()[hit].getHighlightFields().get(field).fragments().length, greaterThan(fragment));
assertThat(resp.getHits().hits()[hit].highlightFields().get(field).fragments()[fragment].string(), matcher);
assertVersionSerializable(resp);
}
public static void assertSuggestionSize(Suggest searchSuggest, int entry, int size, String key) {
@ -176,7 +201,7 @@ public class ElasticSearchAssertions {
assertThat(searchSuggest.getSuggestion(key).getName(), equalTo(key));
assertThat(searchSuggest.getSuggestion(key).getEntries().size(), greaterThanOrEqualTo(entry));
assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().size(), equalTo(size));
assertVersionSerializable(searchSuggest);
}
public static void assertSuggestion(Suggest searchSuggest, int entry, int ord, String key, String text) {
@ -186,6 +211,7 @@ public class ElasticSearchAssertions {
assertThat(searchSuggest.getSuggestion(key).getEntries().size(), greaterThanOrEqualTo(entry));
assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().size(), greaterThan(ord));
assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().get(ord).getText().string(), equalTo(text));
assertVersionSerializable(searchSuggest);
}
/**
@ -196,7 +222,8 @@ public class ElasticSearchAssertions {
}
/**
* Assert suggestion returns size suggestions and the first are the provided text.
* Assert suggestion returns size suggestions and the first are the provided
* text.
*/
public static void assertSuggestion(Suggest searchSuggest, int entry, String key, int size, String... text) {
assertSuggestionSize(searchSuggest, entry, size, key);
@ -260,4 +287,53 @@ public class ElasticSearchAssertions {
}
}
private static BytesReference serialize(Version version, Streamable streamable) throws IOException {
BytesStreamOutput output = new BytesStreamOutput();
output.setVersion(version);
streamable.writeTo(output);
output.flush();
return output.bytes();
}
public static void assertVersionSerializable(Streamable streamable) {
assert Version.CURRENT.after(ElasticSearchTestCase.getPreviousVersion());
assertVersionSerializable(ElasticSearchTestCase.randomVersion(), streamable);
}
public static void assertVersionSerializable(Version version, Streamable streamable) {
try {
Streamable newInstance = tryCreateNewInstance(streamable);
if (newInstance == null) {
return; // can't create a new instance - we never modify a
// streamable that comes in.
}
if (streamable instanceof ActionRequest) {
((ActionRequest<?>)streamable).validate();
}
BytesReference orig = serialize(version, streamable);
StreamInput input = new BytesStreamInput(orig);
input.setVersion(version);
newInstance.readFrom(input);
assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0));
assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", serialize(version, streamable), equalTo(orig));
} catch (Throwable ex) {
throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex);
}
}
private static Streamable tryCreateNewInstance(Streamable streamable) throws NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
try {
Class<? extends Streamable> clazz = streamable.getClass();
Constructor<? extends Streamable> constructor = clazz.getDeclaredConstructor();
assertThat(constructor, Matchers.notNullValue());
constructor.setAccessible(true);
Streamable newInstance = constructor.newInstance();
return newInstance;
} catch (Throwable e) {
return null;
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.elasticsearch.test.ElasticSearchTestCase;
import org.elasticsearch.test.hamcrest.ElasticSearchAssertions;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.local.LocalTransport;
import java.io.IOException;
import java.util.Random;
/**
*
*/
public class AssertingLocalTransport extends LocalTransport {
private final Random random;
@Inject
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings, threadPool, version);
final long seed = settings.getAsLong(AbstractIntegrationTest.INDEX_SEED_SETTING, 0l);
random = new Random(seed);
}
@Override
protected void handleParsedRespone(final TransportResponse response, final TransportResponseHandler handler) {
ElasticSearchAssertions.assertVersionSerializable(ElasticSearchTestCase.randomVersion(random), response);
super.handleParsedRespone(response, handler);
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
ElasticSearchAssertions.assertVersionSerializable(ElasticSearchTestCase.randomVersion(random), request);
super.sendRequest(node, requestId, action, request, options);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.Transport;
/**
*
*/
public class AssertingLocalTransportModule extends AbstractModule {
private final Settings settings;
public AssertingLocalTransportModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(AssertingLocalTransport.class).asEagerSingleton();
bind(Transport.class).to(AssertingLocalTransport.class).asEagerSingleton();
}
}