Read full message on free context

Since #5730 we write a boolean in the FreeContextResponse which should be deserialized

Closes #6147
This commit is contained in:
Simon Willnauer 2014-05-13 12:23:47 +02:00
parent 1feddac315
commit d8c02c2599

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
@ -54,26 +53,48 @@ import java.util.concurrent.Callable;
*/ */
public class SearchServiceTransportAction extends AbstractComponent { public class SearchServiceTransportAction extends AbstractComponent {
static final class FreeContextResponseHandler extends EmptyTransportResponseHandler { static final class FreeContextResponseHandler implements TransportResponseHandler<SearchFreeContextResponse> {
private final ESLogger logger; private final ActionListener<Boolean> listener;
FreeContextResponseHandler(ESLogger logger) { FreeContextResponseHandler(final ActionListener<Boolean> listener) {
super(ThreadPool.Names.SAME); this.listener = listener;
this.logger = logger; }
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
@Override
public void handleResponse(SearchFreeContextResponse response) {
listener.onResponse(response.freed);
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.warn("Failed to send release search context", exp); listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
} }
} }
//
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final TransportService transportService; private final TransportService transportService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final SearchService searchService; private final SearchService searchService;
private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger); private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {}
@Override
public void onFailure(Throwable exp) {
logger.warn("Failed to send release search context", exp);
}
});
@Inject @Inject
public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
@ -110,27 +131,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
boolean freed = searchService.freeContext(contextId); boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(freed); actionListener.onResponse(freed);
} else { } else {
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler<SearchFreeContextResponse>() { transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
@Override
public void handleResponse(SearchFreeContextResponse response) {
actionListener.onResponse(response.isFreed());
}
@Override
public void handleException(TransportException exp) {
actionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} }
} }
@ -532,7 +533,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class SearchFreeContextRequest extends TransportRequest { static class SearchFreeContextRequest extends TransportRequest {
private long id; private long id;
@ -561,7 +562,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class SearchFreeContextResponse extends TransportResponse { static class SearchFreeContextResponse extends TransportResponse {
private boolean freed; private boolean freed;
@ -618,7 +619,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class ClearScrollContextsRequest extends TransportRequest { static class ClearScrollContextsRequest extends TransportRequest {
ClearScrollContextsRequest() { ClearScrollContextsRequest() {
} }