mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
This commit fixes an issue in synchronization in Exporters class. The export() method is synchronized and when used with LocalExport can provoke a deadlock. LocalExporter exports data locally using bulk requests that can trigger cluster state updates for mapping updates. If a exporters settings update sneaks in, the settings update waits for the export to terminate but the export waits for the settings to be updated... and boom. This commit removes the synchronized and refactor Exporters/LocalExporter to use state and dedicated instance of LocalBulk for each export so that synchronizing methods is not necessary anymore. It also lower down some random settings in MonitoringBulkTests because the previous settings almost always fill the bulk thread pool. closes elastic/elasticsearch#1769 Original commit: elastic/x-pack-elasticsearch@f50c916f8b
360 lines
12 KiB
Groovy
360 lines
12 KiB
Groovy
import org.elasticsearch.gradle.LoggedExec
|
|
import org.elasticsearch.gradle.MavenFilteringHack
|
|
|
|
apply plugin: 'elasticsearch.rest-test'
|
|
|
|
dependencies {
|
|
testCompile project(path: ':x-plugins:elasticsearch:x-pack', configuration: 'runtime')
|
|
}
|
|
|
|
// needed to be consistent with ssl host checking
|
|
Object san = new SanEvaluator()
|
|
|
|
// location of generated keystores and certificates
|
|
File keystoreDir = new File(project.buildDir, 'keystore')
|
|
|
|
// Generate the node's keystore
|
|
File nodeKeystore = new File(keystoreDir, 'test-node.jks')
|
|
task createNodeKeyStore(type: LoggedExec) {
|
|
doFirst {
|
|
if (nodeKeystore.parentFile.exists() == false) {
|
|
nodeKeystore.parentFile.mkdirs()
|
|
}
|
|
if (nodeKeystore.exists()) {
|
|
delete nodeKeystore
|
|
}
|
|
}
|
|
executable = 'keytool'
|
|
standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8'))
|
|
args '-genkey',
|
|
'-alias', 'test-node',
|
|
'-keystore', nodeKeystore,
|
|
'-keyalg', 'RSA',
|
|
'-keysize', '2048',
|
|
'-validity', '712',
|
|
'-dname', 'CN=smoke-test-plugins-ssl',
|
|
'-keypass', 'keypass',
|
|
'-storepass', 'keypass',
|
|
'-ext', san
|
|
}
|
|
|
|
// Generate the client's keystore
|
|
File clientKeyStore = new File(keystoreDir, 'test-client.jks')
|
|
task createClientKeyStore(type: LoggedExec) {
|
|
doFirst {
|
|
if (clientKeyStore.parentFile.exists() == false) {
|
|
clientKeyStore.parentFile.mkdirs()
|
|
}
|
|
if (clientKeyStore.exists()) {
|
|
delete clientKeyStore
|
|
}
|
|
}
|
|
executable = 'keytool'
|
|
standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8'))
|
|
args '-genkey',
|
|
'-alias', 'test-client',
|
|
'-keystore', clientKeyStore,
|
|
'-keyalg', 'RSA',
|
|
'-keysize', '2048',
|
|
'-validity', '712',
|
|
'-dname', 'CN=smoke-test-plugins-ssl',
|
|
'-keypass', 'keypass',
|
|
'-storepass', 'keypass',
|
|
'-ext', san
|
|
}
|
|
|
|
// Export the node's certificate
|
|
File nodeCertificate = new File(keystoreDir, 'test-node.cert')
|
|
task exportNodeCertificate(type: LoggedExec) {
|
|
doFirst {
|
|
if (nodeCertificate.parentFile.exists() == false) {
|
|
nodeCertificate.parentFile.mkdirs()
|
|
}
|
|
if (nodeCertificate.exists()) {
|
|
delete nodeCertificate
|
|
}
|
|
}
|
|
executable = 'keytool'
|
|
args '-export',
|
|
'-alias', 'test-node',
|
|
'-keystore', nodeKeystore,
|
|
'-storepass', 'keypass',
|
|
'-file', nodeCertificate
|
|
}
|
|
|
|
// Import the node certificate in the client's keystore
|
|
task importNodeCertificateInClientKeyStore(type: LoggedExec) {
|
|
dependsOn exportNodeCertificate
|
|
executable = 'keytool'
|
|
args '-import',
|
|
'-alias', 'test-node',
|
|
'-keystore', clientKeyStore,
|
|
'-storepass', 'keypass',
|
|
'-file', nodeCertificate,
|
|
'-noprompt'
|
|
}
|
|
|
|
// Export the client's certificate
|
|
File clientCertificate = new File(keystoreDir, 'test-client.cert')
|
|
task exportClientCertificate(type: LoggedExec) {
|
|
doFirst {
|
|
if (clientCertificate.parentFile.exists() == false) {
|
|
clientCertificate.parentFile.mkdirs()
|
|
}
|
|
if (clientCertificate.exists()) {
|
|
delete clientCertificate
|
|
}
|
|
}
|
|
executable = 'keytool'
|
|
args '-export',
|
|
'-alias', 'test-client',
|
|
'-keystore', clientKeyStore,
|
|
'-storepass', 'keypass',
|
|
'-file', clientCertificate
|
|
}
|
|
|
|
// Import the client certificate in the node's keystore
|
|
task importClientCertificateInNodeKeyStore(type: LoggedExec) {
|
|
dependsOn exportClientCertificate
|
|
executable = 'keytool'
|
|
args '-import',
|
|
'-alias', 'test-client',
|
|
'-keystore', nodeKeystore,
|
|
'-storepass', 'keypass',
|
|
'-file', clientCertificate,
|
|
'-noprompt'
|
|
}
|
|
|
|
forbiddenPatterns {
|
|
exclude '**/*.cert'
|
|
}
|
|
|
|
// Add keystores to test classpath: it expects it there
|
|
sourceSets.test.resources.srcDir(keystoreDir)
|
|
processTestResources.dependsOn(
|
|
createNodeKeyStore, createClientKeyStore,
|
|
importNodeCertificateInClientKeyStore, importClientCertificateInNodeKeyStore
|
|
)
|
|
|
|
ext.pluginsCount = 1 // we install xpack explicitly
|
|
project.rootProject.subprojects.findAll { it.path.startsWith(':plugins:') }.each { subproj ->
|
|
// need to get a non-decorated project object, so must re-lookup the project by path
|
|
integTest.cluster.plugin(subproj.name, project(subproj.path))
|
|
pluginsCount += 1
|
|
}
|
|
|
|
integTest {
|
|
cluster {
|
|
setting 'xpack.monitoring.agent.interval', '3s'
|
|
setting 'xpack.monitoring.agent.exporters._http.type', 'http'
|
|
setting 'xpack.monitoring.agent.exporters._http.enabled', 'false'
|
|
setting 'xpack.monitoring.agent.exporters._http.ssl.truststore.path', clientKeyStore.name
|
|
setting 'xpack.monitoring.agent.exporters._http.ssl.truststore.password', 'keypass'
|
|
setting 'xpack.monitoring.agent.exporters._http.auth.username', 'monitoring_agent'
|
|
setting 'xpack.monitoring.agent.exporters._http.auth.password', 'changeme'
|
|
|
|
setting 'shield.transport.ssl', 'true'
|
|
setting 'shield.http.ssl', 'true'
|
|
setting 'shield.ssl.keystore.path', nodeKeystore.name
|
|
setting 'shield.ssl.keystore.password', 'keypass'
|
|
|
|
plugin 'x-pack', project(':x-plugins:elasticsearch:x-pack')
|
|
|
|
// copy keystores into config/
|
|
extraConfigFile nodeKeystore.name, nodeKeystore
|
|
extraConfigFile clientKeyStore.name, clientKeyStore
|
|
|
|
setupCommand 'setupTestUser',
|
|
'bin/x-pack/users', 'useradd', 'test_user', '-p', 'changeme', '-r', 'admin'
|
|
setupCommand 'setupMarvelUser',
|
|
'bin/x-pack/users', 'useradd', 'monitoring_agent', '-p', 'changeme', '-r', 'remote_monitoring_agent'
|
|
|
|
waitCondition = { node, ant ->
|
|
// HTTPS check is tricky to do, so we wait for the log file to indicate that the node is started
|
|
String waitForNodeStartProp = "waitForNodeStart${name}"
|
|
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '100', checkeveryunit: 'millisecond',
|
|
timeoutproperty: waitForNodeStartProp) {
|
|
and {
|
|
resourcecontains(resource: "${node.startLog.toString()}", substring: 'started')
|
|
resourcecontains(resource: "${node.startLog.toString()}", substring: 'monitoring service started')
|
|
}
|
|
}
|
|
|
|
if (ant.project.getProperty(waitForNodeStartProp)) {
|
|
println "Timed out when looking for node startup in log file ${node.startLog.toString()}"
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
ext.expansions = [
|
|
'expected.plugins.count': pluginsCount
|
|
]
|
|
|
|
processTestResources {
|
|
from(sourceSets.test.resources.srcDirs) {
|
|
include '**/*.yaml'
|
|
inputs.properties(expansions)
|
|
MavenFilteringHack.filter(it, expansions)
|
|
}
|
|
}
|
|
|
|
/** A lazy evaluator to find the san to use for certificate generation. */
|
|
class SanEvaluator {
|
|
|
|
private static String san = null
|
|
|
|
String toString() {
|
|
synchronized (SanEvaluator.class) {
|
|
if (san == null) {
|
|
san = getSubjectAlternativeNameString()
|
|
}
|
|
}
|
|
return san
|
|
}
|
|
|
|
// Code stolen from NetworkUtils/InetAddresses/NetworkAddress to support SAN
|
|
/** Return all interfaces (and subinterfaces) on the system */
|
|
private static List<NetworkInterface> getInterfaces() throws SocketException {
|
|
List<NetworkInterface> all = new ArrayList<>();
|
|
addAllInterfaces(all, Collections.list(NetworkInterface.getNetworkInterfaces()));
|
|
Collections.sort(all, new Comparator<NetworkInterface>() {
|
|
@Override
|
|
public int compare(NetworkInterface left, NetworkInterface right) {
|
|
return Integer.compare(left.getIndex(), right.getIndex());
|
|
}
|
|
});
|
|
return all;
|
|
}
|
|
|
|
/** Helper for getInterfaces, recursively adds subinterfaces to {@code target} */
|
|
private static void addAllInterfaces(List<NetworkInterface> target, List<NetworkInterface> level) {
|
|
if (!level.isEmpty()) {
|
|
target.addAll(level);
|
|
for (NetworkInterface intf : level) {
|
|
addAllInterfaces(target, Collections.list(intf.getSubInterfaces()));
|
|
}
|
|
}
|
|
}
|
|
|
|
private static String getSubjectAlternativeNameString() {
|
|
List<InetAddress> list = new ArrayList<>();
|
|
for (NetworkInterface intf : getInterfaces()) {
|
|
if (intf.isUp()) {
|
|
// NOTE: some operating systems (e.g. BSD stack) assign a link local address to the loopback interface
|
|
// while technically not a loopback address, some of these treat them as one (e.g. OS X "localhost") so we must too,
|
|
// otherwise things just won't work out of box. So we include all addresses from loopback interfaces.
|
|
for (InetAddress address : Collections.list(intf.getInetAddresses())) {
|
|
if (intf.isLoopback() || address.isLoopbackAddress()) {
|
|
list.add(address);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (list.isEmpty()) {
|
|
throw new IllegalArgumentException("no up-and-running loopback addresses found, got " + getInterfaces());
|
|
}
|
|
|
|
StringBuilder builder = new StringBuilder("san=");
|
|
for (int i = 0; i < list.size(); i++) {
|
|
InetAddress address = list.get(i);
|
|
String hostAddress;
|
|
if (address instanceof Inet6Address) {
|
|
hostAddress = compressedIPV6Address((Inet6Address)address);
|
|
} else {
|
|
hostAddress = address.getHostAddress();
|
|
}
|
|
builder.append("ip:").append(hostAddress);
|
|
String hostname = address.getHostName();
|
|
if (hostname.equals(address.getHostAddress()) == false) {
|
|
builder.append(",dns:").append(hostname);
|
|
}
|
|
|
|
if (i != (list.size() - 1)) {
|
|
builder.append(",");
|
|
}
|
|
}
|
|
|
|
return builder.toString();
|
|
}
|
|
|
|
private static String compressedIPV6Address(Inet6Address inet6Address) {
|
|
byte[] bytes = inet6Address.getAddress();
|
|
int[] hextets = new int[8];
|
|
for (int i = 0; i < hextets.length; i++) {
|
|
hextets[i] = (bytes[2 * i] & 255) << 8 | bytes[2 * i + 1] & 255;
|
|
}
|
|
compressLongestRunOfZeroes(hextets);
|
|
return hextetsToIPv6String(hextets);
|
|
}
|
|
|
|
/**
|
|
* Identify and mark the longest run of zeroes in an IPv6 address.
|
|
*
|
|
* <p>Only runs of two or more hextets are considered. In case of a tie, the
|
|
* leftmost run wins. If a qualifying run is found, its hextets are replaced
|
|
* by the sentinel value -1.
|
|
*
|
|
* @param hextets {@code int[]} mutable array of eight 16-bit hextets
|
|
*/
|
|
private static void compressLongestRunOfZeroes(int[] hextets) {
|
|
int bestRunStart = -1;
|
|
int bestRunLength = -1;
|
|
int runStart = -1;
|
|
for (int i = 0; i < hextets.length + 1; i++) {
|
|
if (i < hextets.length && hextets[i] == 0) {
|
|
if (runStart < 0) {
|
|
runStart = i;
|
|
}
|
|
} else if (runStart >= 0) {
|
|
int runLength = i - runStart;
|
|
if (runLength > bestRunLength) {
|
|
bestRunStart = runStart;
|
|
bestRunLength = runLength;
|
|
}
|
|
runStart = -1;
|
|
}
|
|
}
|
|
if (bestRunLength >= 2) {
|
|
Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Convert a list of hextets into a human-readable IPv6 address.
|
|
*
|
|
* <p>In order for "::" compression to work, the input should contain negative
|
|
* sentinel values in place of the elided zeroes.
|
|
*
|
|
* @param hextets {@code int[]} array of eight 16-bit hextets, or -1s
|
|
*/
|
|
private static String hextetsToIPv6String(int[] hextets) {
|
|
/*
|
|
* While scanning the array, handle these state transitions:
|
|
* start->num => "num" start->gap => "::"
|
|
* num->num => ":num" num->gap => "::"
|
|
* gap->num => "num" gap->gap => ""
|
|
*/
|
|
StringBuilder buf = new StringBuilder(39);
|
|
boolean lastWasNumber = false;
|
|
for (int i = 0; i < hextets.length; i++) {
|
|
boolean thisIsNumber = hextets[i] >= 0;
|
|
if (thisIsNumber) {
|
|
if (lastWasNumber) {
|
|
buf.append(':');
|
|
}
|
|
buf.append(Integer.toHexString(hextets[i]));
|
|
} else {
|
|
if (i == 0 || lastWasNumber) {
|
|
buf.append("::");
|
|
}
|
|
}
|
|
lastWasNumber = thisIsNumber;
|
|
}
|
|
return buf.toString();
|
|
}
|
|
}
|
|
|