Add tests for _remote/info API (elastic/x-pack-elasticsearch#1009)

Relates to elastic/elasticsearch#23925
Depends on elastic/elasticsearch#23969

Original commit: elastic/x-pack-elasticsearch@d1e8754a57
This commit is contained in:
Simon Willnauer 2017-04-11 11:24:22 +02:00 committed by GitHub
parent d4d505fb97
commit 617c3ead5c
6 changed files with 87 additions and 185 deletions

View File

@ -1,68 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* An action listener that delegates it's results to another listener once
* it has received one or more failures or N results. This allows synchronous
* tasks to be forked off in a loop with the same listener and respond to a higher level listener once all tasks responded.
*/
public final class GroupedActionListener<T> implements ActionListener<T> {
private final CountDown countDown;
private final AtomicInteger pos = new AtomicInteger();
private final AtomicArray<T> roles;
private final ActionListener<Collection<T>> delegate;
private final Collection<T> defaults;
private final AtomicReference<Exception> failure = new AtomicReference<>();
/**
* Creates a new listener
* @param delegate the delegate listener
* @param groupSize the group size
*/
public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize, Collection<T> defaults) {
roles = new AtomicArray<>(groupSize);
countDown = new CountDown(groupSize);
this.delegate = delegate;
this.defaults = defaults;
}
@Override
public void onResponse(T element) {
roles.set(pos.incrementAndGet() - 1, element);
if (countDown.countDown()) {
if (failure.get() != null) {
delegate.onFailure(failure.get());
} else {
List<T> collect = this.roles.asList();
collect.addAll(defaults);
delegate.onResponse(Collections.unmodifiableList(collect));
}
}
}
@Override
public void onFailure(Exception e) {
if (failure.compareAndSet(null, e) == false) {
failure.get().addSuppressed(e);
}
if (countDown.countDown()) {
delegate.onFailure(failure.get());
}
}
}

View File

@ -5,18 +5,15 @@
*/
package org.elasticsearch.xpack.security.action.user;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.common.GroupedActionListener;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
import org.elasticsearch.xpack.security.user.SystemUser;
@ -30,8 +27,6 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.common.Strings.arrayToDelimitedString;
public class TransportGetUsersAction extends HandledTransportAction<GetUsersRequest, GetUsersResponse> {
private final NativeUsersStore usersStore;

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.authc.esnative;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
@ -18,7 +19,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.common.GroupedActionListener;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.elasticsearch.xpack.security.authc.support.Hasher;
@ -40,8 +40,6 @@ import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
import java.util.ArrayList;
/**
* Performs migration steps for the {@link NativeRealm} and {@link ReservedRealm}.
* When upgrading an Elasticsearch/X-Pack installation from a previous version, this class is responsible for ensuring that user/role

View File

@ -1,107 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class GroupedActionListenerTests extends ESTestCase {
public void testNotifications() throws InterruptedException {
AtomicReference<Collection<Integer>> resRef = new AtomicReference<>();
ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() {
@Override
public void onResponse(Collection<Integer> integers) {
resRef.set(integers);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
};
final int groupSize = randomIntBetween(10, 1000);
AtomicInteger count = new AtomicInteger();
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList();
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize, defaults);
int numThreads = randomIntBetween(2, 5);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError(e);
}
int c = 0;
while((c = count.incrementAndGet()) <= groupSize) {
listener.onResponse(c-1);
}
}
};
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
assertNotNull(resRef.get());
ArrayList<Integer> list = new ArrayList<>(resRef.get());
Collections.sort(list);
int expectedSize = groupSize + defaults.size();
assertEquals(expectedSize, resRef.get().size());
int expectedValue = defaults.isEmpty() ? 0 : -1;
for (int i = 0; i < expectedSize; i++) {
assertEquals(Integer.valueOf(expectedValue++), list.get(i));
}
}
public void testFailed() {
AtomicReference<Collection<Integer>> resRef = new AtomicReference<>();
AtomicReference<Exception> excRef = new AtomicReference<>();
ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() {
@Override
public void onResponse(Collection<Integer> integers) {
resRef.set(integers);
}
@Override
public void onFailure(Exception e) {
excRef.set(e);
}
};
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList();
int size = randomIntBetween(3, 4);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size, defaults);
listener.onResponse(0);
IOException ioException = new IOException();
RuntimeException rtException = new RuntimeException();
listener.onFailure(rtException);
listener.onFailure(ioException);
if (size == 4) {
listener.onResponse(2);
}
assertNotNull(excRef.get());
assertEquals(rtException, excRef.get());
assertEquals(1, excRef.get().getSuppressed().length);
assertEquals(ioException, excRef.get().getSuppressed()[0]);
assertNull(resRef.get());
listener.onResponse(1);
assertNull(resRef.get());
}
}

View File

@ -0,0 +1,84 @@
---
setup:
- skip:
features: headers
- do:
cluster.health:
wait_for_status: yellow
- do:
xpack.security.put_user:
username: "joe"
body: >
{
"password": "s3krit",
"roles" : [ "x_cluster_role" ]
}
- do:
xpack.security.put_role:
name: "x_cluster_role"
body: >
{
"cluster": ["monitor"]
}
---
teardown:
- do:
xpack.security.delete_user:
username: "joe"
ignore: 404
- do:
xpack.security.delete_role:
name: "monitor_role"
ignore: 404
---
"Fetch remote cluster info for existing cluster":
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
remote.info: {}
- match: { my_remote_cluster.connected: true }
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- is_true: my_remote_cluster.http_addresses.0
---
"Add transient remote cluster based on the preset cluster and check remote info":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
remote.info: {}
- set: { my_remote_cluster.http_addresses.0: remote_http }
- match: { test_remote_cluster.http_addresses.0: $remote_http }
- match: { test_remote_cluster.connected: true }
- match: { my_remote_cluster.connected: true }
- match: { test_remote_cluster.seeds.0: $remote_ip }
- match: { my_remote_cluster.seeds.0: $remote_ip }
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { test_remote_cluster.max_connections_per_cluster: 1}
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { test_remote_cluster.num_nodes_connected: 1}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.initial_connect_timeout: "30s" }

View File

@ -19,7 +19,7 @@ setup:
name: "x_cluster_role"
body: >
{
"cluster": ["all"],
"cluster": ["monitor"],
"indices": [
{
"names": "*",