Merge remote-tracking branch 'upstream/master'
Original commit: elastic/x-pack-elasticsearch@8a529e7890
This commit is contained in:
commit
df61103b05
|
@ -8,18 +8,25 @@ package org.elasticsearch.xpack.core.security.authz.accesscontrol;
|
|||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterDirectoryReader;
|
||||
import org.apache.lucene.index.FilterLeafReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.FilteredDocIdSetIterator;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.BitSet;
|
||||
import org.apache.lucene.util.BitSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.cache.Cache;
|
||||
import org.elasticsearch.common.cache.CacheBuilder;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* A reader that only exposes documents via {@link #getLiveDocs()} that matches with the provided role query.
|
||||
|
@ -31,6 +38,74 @@ public final class DocumentSubsetReader extends FilterLeafReader {
|
|||
return new DocumentSubsetDirectoryReader(in, bitsetFilterCache, roleQuery);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache of the number of live docs for a given (segment, role query) pair.
|
||||
* This is useful because numDocs() is called eagerly by BaseCompositeReader so computing
|
||||
* numDocs() lazily doesn't help. Plus it helps reuse the result of the computation either
|
||||
* between refreshes, or across refreshes if no more documents were deleted in the
|
||||
* considered segment. The size of the top-level map is bounded by the number of segments
|
||||
* on the node.
|
||||
*/
|
||||
static final Map<IndexReader.CacheKey, Cache<Query, Integer>> NUM_DOCS_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Compute the number of live documents. This method is SLOW.
|
||||
*/
|
||||
private static int computeNumDocs(LeafReader reader, Query roleQuery, BitSet roleQueryBits) {
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
if (roleQueryBits == null) {
|
||||
return 0;
|
||||
} else if (liveDocs == null) {
|
||||
// slow
|
||||
return roleQueryBits.cardinality();
|
||||
} else {
|
||||
// very slow, but necessary in order to be correct
|
||||
int numDocs = 0;
|
||||
DocIdSetIterator it = new BitSetIterator(roleQueryBits, 0L); // we don't use the cost
|
||||
try {
|
||||
for (int doc = it.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = it.nextDoc()) {
|
||||
if (liveDocs.get(doc)) {
|
||||
numDocs++;
|
||||
}
|
||||
}
|
||||
return numDocs;
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link #computeNumDocs} but caches results.
|
||||
*/
|
||||
private static int getNumDocs(LeafReader reader, Query roleQuery, BitSet roleQueryBits) throws IOException, ExecutionException {
|
||||
IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper(); // this one takes deletes into account
|
||||
if (cacheHelper == null) {
|
||||
throw new IllegalStateException("Reader " + reader + " does not support caching");
|
||||
}
|
||||
final boolean[] added = new boolean[] { false };
|
||||
Cache<Query, Integer> perReaderCache = NUM_DOCS_CACHE.computeIfAbsent(cacheHelper.getKey(),
|
||||
key -> {
|
||||
added[0] = true;
|
||||
return CacheBuilder.<Query, Integer>builder()
|
||||
// Not configurable, this limit only exists so that if a role query is updated
|
||||
// then we won't risk OOME because of old role queries that are not used anymore
|
||||
.setMaximumWeight(1000)
|
||||
.weigher((k, v) -> 1) // just count
|
||||
.build();
|
||||
});
|
||||
if (added[0]) {
|
||||
IndexReader.ClosedListener closedListener = NUM_DOCS_CACHE::remove;
|
||||
try {
|
||||
cacheHelper.addClosedListener(closedListener);
|
||||
} catch (AlreadyClosedException e) {
|
||||
closedListener.onClose(cacheHelper.getKey());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return perReaderCache.computeIfAbsent(roleQuery, q -> computeNumDocs(reader, roleQuery, roleQueryBits));
|
||||
}
|
||||
|
||||
public static final class DocumentSubsetDirectoryReader extends FilterDirectoryReader {
|
||||
|
||||
private final Query roleQuery;
|
||||
|
@ -78,11 +153,12 @@ public final class DocumentSubsetReader extends FilterLeafReader {
|
|||
}
|
||||
|
||||
private final BitSet roleQueryBits;
|
||||
private volatile int numDocs = -1;
|
||||
private final int numDocs;
|
||||
|
||||
private DocumentSubsetReader(final LeafReader in, BitsetFilterCache bitsetFilterCache, final Query roleQuery) throws Exception {
|
||||
super(in);
|
||||
this.roleQueryBits = bitsetFilterCache.getBitSetProducer(roleQuery).getBitSet(in.getContext());
|
||||
this.numDocs = getNumDocs(in, roleQuery, roleQueryBits);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,36 +189,6 @@ public final class DocumentSubsetReader extends FilterLeafReader {
|
|||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
// The reason the implement this method is that numDocs should be equal to the number of set bits in liveDocs. (would be weird
|
||||
// otherwise)
|
||||
// for the security DSL use case this get invoked in the QueryPhase class (in core ES) if match_all query is used as main query
|
||||
// and this is also invoked in tests.
|
||||
if (numDocs == -1) {
|
||||
final Bits liveDocs = in.getLiveDocs();
|
||||
if (roleQueryBits == null) {
|
||||
numDocs = 0;
|
||||
} else if (liveDocs == null) {
|
||||
numDocs = roleQueryBits.cardinality();
|
||||
} else {
|
||||
// this is slow, but necessary in order to be correct:
|
||||
try {
|
||||
DocIdSetIterator iterator = new FilteredDocIdSetIterator(new BitSetIterator(roleQueryBits, roleQueryBits
|
||||
.approximateCardinality())) {
|
||||
@Override
|
||||
protected boolean match(int doc) {
|
||||
return liveDocs.get(doc);
|
||||
}
|
||||
};
|
||||
int counter = 0;
|
||||
for (int docId = iterator.nextDoc(); docId < DocIdSetIterator.NO_MORE_DOCS; docId = iterator.nextDoc()) {
|
||||
counter++;
|
||||
}
|
||||
numDocs = counter;
|
||||
} catch (IOException e) {
|
||||
throw ExceptionsHelper.convertToElastic(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return numDocs;
|
||||
}
|
||||
|
||||
|
@ -152,8 +198,6 @@ public final class DocumentSubsetReader extends FilterLeafReader {
|
|||
return true;
|
||||
}
|
||||
|
||||
// Don't delegate getCombinedCoreAndDeletesKey(), because we change the live docs here.
|
||||
|
||||
@Override
|
||||
public CacheHelper getCoreCacheHelper() {
|
||||
return in.getCoreCacheHelper();
|
||||
|
@ -161,7 +205,8 @@ public final class DocumentSubsetReader extends FilterLeafReader {
|
|||
|
||||
@Override
|
||||
public CacheHelper getReaderCacheHelper() {
|
||||
return in.getReaderCacheHelper();
|
||||
// Not delegated since we change the live docs
|
||||
return null;
|
||||
}
|
||||
|
||||
BitSet getRoleQueryBits() {
|
||||
|
|
|
@ -49,6 +49,11 @@ public class DocumentSubsetReaderTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void setUpDirectory() {
|
||||
// We check it is empty at the end of the test, so make sure it is empty in the
|
||||
// beginning as well so that we can easily distinguish from garbage added by
|
||||
// this test and garbage not cleaned up by other tests.
|
||||
assertTrue(DocumentSubsetReader.NUM_DOCS_CACHE.toString(),
|
||||
DocumentSubsetReader.NUM_DOCS_CACHE.isEmpty());
|
||||
directory = newDirectory();
|
||||
IndexSettings settings = IndexSettingsModule.newIndexSettings("_index", Settings.EMPTY);
|
||||
bitsetFilterCache = new BitsetFilterCache(settings, new BitsetFilterCache.Listener() {
|
||||
|
@ -69,6 +74,8 @@ public class DocumentSubsetReaderTests extends ESTestCase {
|
|||
if (directoryReader != null) {
|
||||
directoryReader.close();
|
||||
}
|
||||
assertTrue(DocumentSubsetReader.NUM_DOCS_CACHE.toString(),
|
||||
DocumentSubsetReader.NUM_DOCS_CACHE.isEmpty());
|
||||
directory.close();
|
||||
bitsetFilterCache.close();
|
||||
}
|
||||
|
@ -225,6 +232,9 @@ public class DocumentSubsetReaderTests extends ESTestCase {
|
|||
assertEquals(1, ir2.leaves().size());
|
||||
assertSame(ir.leaves().get(0).reader().getCoreCacheHelper().getKey(),
|
||||
ir2.leaves().get(0).reader().getCoreCacheHelper().getKey());
|
||||
// However we don't support caching on the reader cache key since we override deletes
|
||||
assertNull(ir.leaves().get(0).reader().getReaderCacheHelper());
|
||||
assertNull(ir2.leaves().get(0).reader().getReaderCacheHelper());
|
||||
|
||||
TestUtil.checkReader(ir);
|
||||
IOUtils.close(ir, ir2, iw, dir);
|
||||
|
|
|
@ -78,7 +78,6 @@
|
|||
- do:
|
||||
search:
|
||||
index: .monitoring-data-*
|
||||
type: kibana
|
||||
|
||||
- match: { hits.total: 0 }
|
||||
|
||||
|
@ -122,7 +121,6 @@
|
|||
- do:
|
||||
search:
|
||||
index: .monitoring-data-*
|
||||
type: kibana
|
||||
|
||||
- match: { hits.total: 0 }
|
||||
|
||||
|
|
|
@ -74,12 +74,10 @@ teardown:
|
|||
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
|
||||
search:
|
||||
index: ".security"
|
||||
type: "doc"
|
||||
|
||||
- do:
|
||||
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
|
||||
search:
|
||||
index: ".secu*rity"
|
||||
type: "doc"
|
||||
- match: { hits.total: 0 }
|
||||
|
||||
|
|
|
@ -74,12 +74,10 @@ teardown:
|
|||
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
|
||||
search:
|
||||
index: ".security-6"
|
||||
type: "doc"
|
||||
|
||||
- do:
|
||||
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
|
||||
search:
|
||||
index: ".security*6"
|
||||
type: "doc"
|
||||
- match: { hits.total: 0 }
|
||||
|
||||
|
|
|
@ -167,9 +167,7 @@ subprojects {
|
|||
setting 'xpack.security.system_key.required', 'true'
|
||||
}
|
||||
if (version.onOrAfter('6.0.0')) {
|
||||
setupCommand 'create-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'create'
|
||||
setupCommand 'add-key-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'add-file', 'xpack.watcher.encryption_key',
|
||||
"${mainProject.projectDir}/src/test/resources/system_key"
|
||||
keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
} else {
|
||||
extraConfigFile 'x-pack/system_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
}
|
||||
|
@ -212,9 +210,7 @@ subprojects {
|
|||
extraConfigFile 'testnode.jks', new File(outputDir + '/testnode.jks')
|
||||
if (withSystemKey) {
|
||||
setting 'xpack.watcher.encrypt_sensitive_data', 'true'
|
||||
setupCommand 'create-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'create'
|
||||
setupCommand 'add-key-elasticsearch-keystore',
|
||||
'bin/elasticsearch-keystore', 'add-file', 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -228,7 +228,7 @@ public class MlJobIT extends ESRestTestCase {
|
|||
responseAsString = responseEntityToString(response);
|
||||
assertThat(responseAsString, containsString("\"count\":2"));
|
||||
|
||||
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/_search");
|
||||
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/_search");
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
responseAsString = responseEntityToString(response);
|
||||
assertThat(responseAsString, containsString("\"total\":2"));
|
||||
|
|
|
@ -150,9 +150,7 @@ subprojects {
|
|||
setting 'xpack.security.system_key.required', 'true'
|
||||
}
|
||||
if (version.onOrAfter('6.0.0')) {
|
||||
setupCommand 'create-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'create'
|
||||
setupCommand 'add-key-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'add-file', 'xpack.watcher.encryption_key',
|
||||
"${mainProject.projectDir}/src/test/resources/system_key"
|
||||
keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
} else {
|
||||
extraConfigFile 'x-pack/system_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
}
|
||||
|
@ -194,9 +192,7 @@ subprojects {
|
|||
extraConfigFile 'testnode.jks', new File(outputDir + '/testnode.jks')
|
||||
if (withSystemKey) {
|
||||
setting 'xpack.watcher.encrypt_sensitive_data', 'true'
|
||||
setupCommand 'create-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'create'
|
||||
setupCommand 'add-key-elasticsearch-keystore',
|
||||
'bin/elasticsearch-keystore', 'add-file', 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,9 +230,7 @@ subprojects {
|
|||
extraConfigFile 'testnode.jks', new File(outputDir + '/testnode.jks')
|
||||
if (withSystemKey) {
|
||||
setting 'xpack.watcher.encrypt_sensitive_data', 'true'
|
||||
setupCommand 'create-elasticsearch-keystore', 'bin/elasticsearch-keystore', 'create'
|
||||
setupCommand 'add-key-elasticsearch-keystore',
|
||||
'bin/elasticsearch-keystore', 'add-file', 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
keystoreFile 'xpack.watcher.encryption_key', "${mainProject.projectDir}/src/test/resources/system_key"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.smoketest;
|
|||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
|
@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -50,20 +52,37 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase {
|
|||
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
adminClient().performRequest("POST", "_xpack/watcher/_start");
|
||||
|
||||
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
|
||||
assertOK(adminClient().performRequest("HEAD", "_template/" + template));
|
||||
}
|
||||
|
||||
Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats");
|
||||
ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse);
|
||||
String state = objectPath.evaluate("stats.0.watcher_state");
|
||||
assertThat(state, is("started"));
|
||||
|
||||
switch (state) {
|
||||
case "stopped":
|
||||
Response startResponse = adminClient().performRequest("POST", "_xpack/watcher/_start");
|
||||
assertOK(startResponse);
|
||||
String body = EntityUtils.toString(startResponse.getEntity());
|
||||
assertThat(body, containsString("\"acknowledged\":true"));
|
||||
break;
|
||||
case "stopping":
|
||||
throw new AssertionError("waiting until stopping state reached stopped state to start again");
|
||||
case "starting":
|
||||
throw new AssertionError("waiting until starting state reached started state");
|
||||
case "started":
|
||||
// all good here, we are done
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unknown state[" + state + "]");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
|
||||
assertOK(adminClient().performRequest("HEAD", "_template/" + template));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -73,11 +92,27 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase {
|
|||
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
adminClient().performRequest("POST", "_xpack/watcher/_stop", Collections.emptyMap());
|
||||
Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats");
|
||||
ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse);
|
||||
String state = objectPath.evaluate("stats.0.watcher_state");
|
||||
assertThat(state, is("stopped"));
|
||||
|
||||
switch (state) {
|
||||
case "stopped":
|
||||
// all good here, we are done
|
||||
break;
|
||||
case "stopping":
|
||||
throw new AssertionError("waiting until stopping state reached stopped state");
|
||||
case "starting":
|
||||
throw new AssertionError("waiting until starting state reached started state to stop");
|
||||
case "started":
|
||||
Response stopResponse = adminClient().performRequest("POST", "_xpack/watcher/_stop", Collections.emptyMap());
|
||||
assertOK(stopResponse);
|
||||
String body = EntityUtils.toString(stopResponse.getEntity());
|
||||
assertThat(body, containsString("\"acknowledged\":true"));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unknown state[" + state + "]");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue