diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index e5ca1744bbd..7396dbb4fc9 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -39,6 +39,7 @@
nanos
newcount
ngram
+ nospawn
param
params
pluggable
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java
index e0336f8941d..13581b8ed75 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java
@@ -171,6 +171,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
if (docIdsToLoad.isEmpty()) {
+ releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim();
}
@@ -218,6 +219,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
}
+
+ releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, Node node) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java
index f63e61219b0..adcc6f5b5b1 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java
@@ -44,7 +44,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@@ -85,6 +85,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
final Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
if (docIdsToLoad.isEmpty()) {
+ releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
finishHim();
}
@@ -133,6 +134,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
}
}
+
+ releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, Node node) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java
index d94af64e3fa..3f15f2c1faa 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java
@@ -36,16 +36,19 @@ import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.internal.InternalSearchRequest;
+import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.settings.Settings;
+import org.elasticsearch.util.trove.ExtTIntArrayList;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public abstract class TransportSearchTypeAction extends BaseAction {
@@ -271,6 +274,18 @@ public abstract class TransportSearchTypeAction extends BaseAction queryResults,
+ Map docIdsToLoad) {
+ for (Map.Entry entry : queryResults.entrySet()) {
+ if (!docIdsToLoad.containsKey(entry.getKey())) {
+ searchService.sendFreeContext(nodes.get(entry.getKey().nodeId()), entry.getValue().id());
+ }
+ }
+ }
+
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
index b50c02e9dca..687af98b459 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
@@ -42,10 +42,10 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java
index e46d6c487c2..deb15e97a51 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java
@@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java
index 83f790721b6..e6a1a686b90 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java
@@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java
index 2d6df8f2def..82c0618416d 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java
@@ -29,10 +29,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
index d4a0b8d24d2..be28fc628bb 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
@@ -36,10 +36,10 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.component.AbstractComponent;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java
index 6f3116e9df2..2bf4380ce8f 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java
@@ -43,10 +43,10 @@ import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.StopWatch;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.CloseableComponent;
-import org.elasticsearch.util.io.VoidStreamable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java
index 7f09ba1e8c1..15d60e413b0 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/action/GetJmxServiceUrlAction.java
@@ -29,8 +29,8 @@ import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.component.AbstractComponent;
-import org.elasticsearch.util.io.StringStreamable;
-import org.elasticsearch.util.io.VoidStreamable;
+import org.elasticsearch.util.io.stream.StringStreamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
/**
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java
index 0679307fb9a..88a23754af2 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java
@@ -27,8 +27,10 @@ import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.service.IndexService;
+import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.dfs.CachedDfSource;
@@ -125,25 +127,56 @@ public class SearchService extends AbstractLifecycleComponent {
@Override protected void doClose() throws ElasticSearchException {
}
+ public void releaseContextsForIndex(Index index) {
+ for (SearchContext context : activeContexts.values()) {
+ if (context.shardTarget().index().equals(index.name())) {
+ freeContext(context);
+ }
+ }
+ }
+
+ public void releaseContextsForShard(ShardId shardId) {
+ for (SearchContext context : activeContexts.values()) {
+ if (context.shardTarget().index().equals(shardId.index().name()) && context.shardTarget().shardId() == shardId.id()) {
+ freeContext(context);
+ }
+ }
+ }
+
public DfsSearchResult executeDfsPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
- dfsPhase.execute(context);
- return context.dfsResult();
+ try {
+ dfsPhase.execute(context);
+ return context.dfsResult();
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
+ }
}
public QuerySearchResult executeQueryPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
- queryPhase.execute(context);
- return context.queryResult();
+ try {
+ queryPhase.execute(context);
+ return context.queryResult();
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
+ }
}
public QuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
- processScroll(request, context);
- queryPhase.execute(context);
- return context.queryResult();
+ try {
+ processScroll(request, context);
+ queryPhase.execute(context);
+ return context.queryResult();
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
+ }
}
public QuerySearchResult executeQueryPhase(QuerySearchRequest request) throws ElasticSearchException {
@@ -151,21 +184,32 @@ public class SearchService extends AbstractLifecycleComponent {
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
+ freeContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
- queryPhase.execute(context);
- return context.queryResult();
+ try {
+ queryPhase.execute(context);
+ return context.queryResult();
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
+ }
}
public QueryFetchSearchResult executeFetchPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
- queryPhase.execute(context);
- shortcutDocIdsToLoad(context);
- fetchPhase.execute(context);
- if (context.scroll() != null) {
- activeContexts.put(context.id(), context);
+ try {
+ queryPhase.execute(context);
+ shortcutDocIdsToLoad(context);
+ fetchPhase.execute(context);
+ if (context.scroll() != null) {
+ activeContexts.put(context.id(), context);
+ }
+ return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
}
- return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) throws ElasticSearchException {
@@ -173,37 +217,53 @@ public class SearchService extends AbstractLifecycleComponent {
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
+ freeContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
- queryPhase.execute(context);
- shortcutDocIdsToLoad(context);
- fetchPhase.execute(context);
- if (context.scroll() != null) {
- activeContexts.put(context.id(), context);
+ try {
+ queryPhase.execute(context);
+ shortcutDocIdsToLoad(context);
+ fetchPhase.execute(context);
+ if (context.scroll() != null) {
+ activeContexts.put(context.id(), context);
+ }
+ return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
}
- return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public QueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
- processScroll(request, context);
- queryPhase.execute(context);
- shortcutDocIdsToLoad(context);
- fetchPhase.execute(context);
- if (context.scroll() == null) {
- freeContext(request.id());
+ try {
+ processScroll(request, context);
+ queryPhase.execute(context);
+ shortcutDocIdsToLoad(context);
+ fetchPhase.execute(context);
+ if (context.scroll() == null) {
+ freeContext(request.id());
+ }
+ return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
}
- return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
- context.docIdsToLoad(request.docIds());
- fetchPhase.execute(context);
- if (context.scroll() == null) {
- freeContext(request.id());
+ try {
+ context.docIdsToLoad(request.docIds());
+ fetchPhase.execute(context);
+ if (context.scroll() == null) {
+ freeContext(request.id());
+ }
+ return context.fetchResult();
+ } catch (RuntimeException e) {
+ freeContext(context);
+ throw e;
}
- return context.fetchResult();
}
private SearchContext findContext(long id) throws SearchContextMissingException {
@@ -255,7 +315,7 @@ public class SearchService extends AbstractLifecycleComponent {
return context;
}
- private void freeContext(long id) {
+ public void freeContext(long id) {
SearchContext context = activeContexts.remove(id);
if (context == null) {
return;
@@ -264,6 +324,7 @@ public class SearchService extends AbstractLifecycleComponent {
}
private void freeContext(SearchContext context) {
+ activeContexts.remove(context.id());
context.release();
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java
index 4dfa069f100..5d869050ba4 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java
@@ -32,6 +32,8 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.*;
+import org.elasticsearch.util.io.stream.LongStreamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@@ -52,6 +54,7 @@ public class SearchServiceTransportAction {
this.clusterService = clusterService;
this.searchService = searchService;
+ transportService.registerHandler(SearchFreeContextTransportHandler.ACTION, new SearchFreeContextTransportHandler());
transportService.registerHandler(SearchDfsTransportHandler.ACTION, new SearchDfsTransportHandler());
transportService.registerHandler(SearchQueryTransportHandler.ACTION, new SearchQueryTransportHandler());
transportService.registerHandler(SearchQueryByIdTransportHandler.ACTION, new SearchQueryByIdTransportHandler());
@@ -62,6 +65,14 @@ public class SearchServiceTransportAction {
transportService.registerHandler(SearchFetchByIdTransportHandler.ACTION, new SearchFetchByIdTransportHandler());
}
+ public void sendFreeContext(Node node, final long contextId) {
+ if (clusterService.state().nodes().localNodeId().equals(node.id())) {
+ searchService.freeContext(contextId);
+ } else {
+ transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
+ }
+ }
+
public void sendExecuteDfs(Node node, final InternalSearchRequest request, final SearchServiceListener listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
@@ -302,6 +313,20 @@ public class SearchServiceTransportAction {
}
}
+ private class SearchFreeContextTransportHandler extends BaseTransportRequestHandler {
+
+ static final String ACTION = "search/freeContext";
+
+ @Override public LongStreamable newInstance() {
+ return new LongStreamable();
+ }
+
+ @Override public void messageReceived(LongStreamable request, TransportChannel channel) throws Exception {
+ searchService.freeContext(request.get());
+ channel.sendResponse(VoidStreamable.INSTANCE);
+ }
+ }
+
private class SearchDfsTransportHandler extends BaseTransportRequestHandler {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java
index 78be81aa807..ef16e6c6369 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java
@@ -21,6 +21,7 @@ package org.elasticsearch.search.internal;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
+import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.engine.Engine;
@@ -122,6 +123,8 @@ public class SearchContext implements Releasable {
searcher.close();
} catch (IOException e) {
// ignore this exception
+ } catch (AlreadyClosedException e) {
+ // ignore this as well
}
engineSearcher.release();
if (!keepAliveTimeout.isCancelled()) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java
index 04930cb8b1f..ed06bd767a1 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/VoidTransportResponseHandler.java
@@ -19,7 +19,7 @@
package org.elasticsearch.transport;
-import org.elasticsearch.util.io.VoidStreamable;
+import org.elasticsearch.util.io.stream.VoidStreamable;
/**
* @author kimchy (Shay Banon)
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/LongStreamable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/LongStreamable.java
new file mode 100644
index 00000000000..cf0343c4a49
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/LongStreamable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.util.io.stream;
+
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class LongStreamable implements Streamable {
+
+ private long value;
+
+ public LongStreamable() {
+ }
+
+ public LongStreamable(long value) {
+ this.value = value;
+ }
+
+ public void set(long newValue) {
+ value = newValue;
+ }
+
+ public long get() {
+ return this.value;
+ }
+
+ @Override public void readFrom(StreamInput in) throws IOException {
+ value = in.readLong();
+ }
+
+ @Override public void writeTo(StreamOutput out) throws IOException {
+ out.writeLong(value);
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringStreamable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StringStreamable.java
similarity index 86%
rename from modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringStreamable.java
rename to modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StringStreamable.java
index fa3ec324f6a..b2bee187fca 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringStreamable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StringStreamable.java
@@ -17,16 +17,12 @@
* under the License.
*/
-package org.elasticsearch.util.io;
-
-import org.elasticsearch.util.io.stream.StreamInput;
-import org.elasticsearch.util.io.stream.StreamOutput;
-import org.elasticsearch.util.io.stream.Streamable;
+package org.elasticsearch.util.io.stream;
import java.io.IOException;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public class StringStreamable implements Streamable {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/VoidStreamable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/VoidStreamable.java
similarity index 83%
rename from modules/elasticsearch/src/main/java/org/elasticsearch/util/io/VoidStreamable.java
rename to modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/VoidStreamable.java
index 85a047d0656..f6ba90778be 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/VoidStreamable.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/VoidStreamable.java
@@ -17,16 +17,12 @@
* under the License.
*/
-package org.elasticsearch.util.io;
-
-import org.elasticsearch.util.io.stream.StreamInput;
-import org.elasticsearch.util.io.stream.StreamOutput;
-import org.elasticsearch.util.io.stream.Streamable;
+package org.elasticsearch.util.io.stream;
import java.io.IOException;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public class VoidStreamable implements Streamable {