Add defense against broken scrolls (elastic/x-pack-elasticsearch#1327)
Adds defenseagainst broken scrolls to the fetching roles and users. While Elasticsearch *shouldn't* send broken scrolls it has done so in the past and when it does this causes security to consume the entire heap and crash. This changes it so we instead fail the request with a message about the scroll being broken. Relates to elastic/x-pack-elasticsearch#1299 Original commit: elastic/x-pack-elasticsearch@dfef87e757
This commit is contained in:
parent
0f3f22deb2
commit
387944b95a
|
@ -100,7 +100,8 @@ public class InternalClient extends FilterClient {
|
||||||
}
|
}
|
||||||
final Consumer<SearchResponse> clearScroll = (response) -> {
|
final Consumer<SearchResponse> clearScroll = (response) -> {
|
||||||
if (response != null && response.getScrollId() != null) {
|
if (response != null && response.getScrollId() != null) {
|
||||||
ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(response.getScrollId()).request();
|
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
|
||||||
|
clearScrollRequest.addScrollId(response.getScrollId());
|
||||||
client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {}));
|
client.clearScroll(clearScrollRequest, ActionListener.wrap((r) -> {}, (e) -> {}));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -120,10 +121,17 @@ public class InternalClient extends FilterClient {
|
||||||
results.add(oneResult);
|
results.add(oneResult);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SearchScrollRequest scrollRequest = client.prepareSearchScroll(resp.getScrollId())
|
|
||||||
.setScroll(request.scroll().keepAlive()).request();
|
|
||||||
client.searchScroll(scrollRequest, this);
|
|
||||||
|
|
||||||
|
if (results.size() > resp.getHits().getTotalHits()) {
|
||||||
|
clearScroll.accept(lastResponse);
|
||||||
|
listener.onFailure(new IllegalStateException("scrolling returned more hits [" + results.size()
|
||||||
|
+ "] than expected [" + resp.getHits().getTotalHits() + "] so bailing out to prevent unbounded "
|
||||||
|
+ "memory consumption."));
|
||||||
|
} else {
|
||||||
|
SearchScrollRequest scrollRequest = new SearchScrollRequest(resp.getScrollId());
|
||||||
|
scrollRequest.scroll(request.scroll().keepAlive());
|
||||||
|
client.searchScroll(scrollRequest, this);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
clearScroll.accept(resp);
|
clearScroll.accept(resp);
|
||||||
// Finally, return the list of users
|
// Finally, return the list of users
|
||||||
|
|
|
@ -6,17 +6,32 @@
|
||||||
package org.elasticsearch.xpack.security;
|
package org.elasticsearch.xpack.security;
|
||||||
|
|
||||||
import org.apache.lucene.util.CollectionUtil;
|
import org.apache.lucene.util.CollectionUtil;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
import org.elasticsearch.search.SearchHits;
|
||||||
|
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.anyObject;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
|
||||||
public class InternalClientIntegTests extends ESSingleNodeTestCase {
|
public class InternalClientIntegTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
|
@ -45,4 +60,50 @@ public class InternalClientIntegTests extends ESSingleNodeTestCase {
|
||||||
assertEquals(list.get(i).intValue(), i);
|
assertEquals(list.get(i).intValue(), i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that
|
||||||
|
* {@link InternalClient#fetchAllByEntity(Client, SearchRequest, org.elasticsearch.action.ActionListener, java.util.function.Function)}
|
||||||
|
* defends against scrolls broken in such a way that the remote Elasticsearch returns infinite results. While Elasticsearch
|
||||||
|
* <strong>shouldn't</strong> do this it has in the past and it is <strong>very</strong> when it does. It takes out the whole node. So
|
||||||
|
* this makes sure we defend against it properly.
|
||||||
|
*/
|
||||||
|
public void testFetchAllByEntityWithBrokenScroll() {
|
||||||
|
Client client = mock(Client.class);
|
||||||
|
SearchRequest request = new SearchRequest();
|
||||||
|
|
||||||
|
String scrollId = randomAlphaOfLength(5);
|
||||||
|
SearchHit[] hits = new SearchHit[] {new SearchHit(1)};
|
||||||
|
InternalSearchResponse internalResponse = new InternalSearchResponse(new SearchHits(hits, 1, 1), null, null, null, false, false, 1);
|
||||||
|
SearchResponse response = new SearchResponse(internalResponse, scrollId, 1, 1, 0, ShardSearchFailure.EMPTY_ARRAY);
|
||||||
|
|
||||||
|
Answer<?> returnResponse = invocation -> {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[1];
|
||||||
|
listener.onResponse(response);
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
doAnswer(returnResponse).when(client).search(eq(request), anyObject());
|
||||||
|
/* The line below simulates the evil cluster. A working cluster would return
|
||||||
|
* a response with 0 hits. Our simulated broken cluster returns the same
|
||||||
|
* response over and over again. */
|
||||||
|
doAnswer(returnResponse).when(client).searchScroll(anyObject(), anyObject());
|
||||||
|
|
||||||
|
AtomicReference<Exception> failure = new AtomicReference<>();
|
||||||
|
InternalClient.fetchAllByEntity(client, request, new ActionListener<Collection<SearchHit>>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(Collection<SearchHit> response) {
|
||||||
|
fail("This shouldn't succeed.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
failure.set(e);
|
||||||
|
}
|
||||||
|
}, Function.identity());
|
||||||
|
|
||||||
|
assertNotNull("onFailure wasn't called", failure.get());
|
||||||
|
assertEquals("scrolling returned more hits [2] than expected [1] so bailing out to prevent unbounded memory consumption.",
|
||||||
|
failure.get().getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue