Fix startup

Original commit: elastic/x-pack-elasticsearch@20e5a15316
This commit is contained in:
Nik Everett 2017-06-26 14:24:02 -04:00
parent bf84de2fc4
commit 4d99a3c30e
15 changed files with 94 additions and 86 deletions

View File

@ -46,7 +46,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
@ -97,21 +96,17 @@ import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.SecurityFeatureSet;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.sql.plugin.SqlPlugin;
import org.elasticsearch.xpack.ssl.SSLConfigurationReloader;
import org.elasticsearch.xpack.ssl.SSLService;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
import javax.security.auth.DestroyFailedException;
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.GeneralSecurityException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
@ -126,6 +121,8 @@ import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.security.auth.DestroyFailedException;
public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin {
public static final String NAME = "x-pack";
@ -198,6 +195,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
protected Graph graph;
protected MachineLearning machineLearning;
protected Logstash logstash;
protected SqlPlugin sql;
public XPackPlugin(Settings settings) throws IOException, DestroyFailedException, OperatorCreationException, GeneralSecurityException {
this.settings = settings;
@ -213,6 +211,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
this.graph = new Graph(settings);
this.machineLearning = new MachineLearning(settings, env, licenseState);
this.logstash = new Logstash(settings);
this.sql = new SqlPlugin();
// Check if the node is a transport client.
if (transportClientMode == false) {
this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions());
@ -293,6 +292,9 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
components.addAll(logstash.createComponents(internalClient, clusterService));
components.addAll(
sql.createComponents(internalClient, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry));
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them
new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService);
return components;
@ -416,6 +418,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
actions.addAll(watcher.getActions());
actions.addAll(graph.getActions());
actions.addAll(machineLearning.getActions());
actions.addAll(sql.getActions());
return actions;
}
@ -449,6 +452,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
indexNameExpressionResolver, nodesInCluster));
handlers.addAll(machineLearning.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter,
indexNameExpressionResolver, nodesInCluster));
handlers.addAll(sql.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter,
indexNameExpressionResolver, nodesInCluster));
return handlers;
}

View File

@ -5,13 +5,6 @@
*/
package org.elasticsearch.xpack.security.authz;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.CompositeIndicesRequest;
@ -64,6 +57,16 @@ import org.elasticsearch.xpack.security.user.AnonymousUser;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.sql.plugin.cli.action.CliAction;
import org.elasticsearch.xpack.sql.plugin.jdbc.action.JdbcAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.support.Exceptions.authorizationError;
@ -382,7 +385,10 @@ public class AuthorizationService extends AbstractComponent {
action.equals("indices:data/read/mpercolate") ||
action.equals("indices:data/read/msearch/template") ||
action.equals("indices:data/read/search/template") ||
action.equals("indices:data/write/reindex");
action.equals("indices:data/write/reindex") ||
action.equals(SqlAction.NAME) || // NOCOMMIT verify that SQL requests are properly composite
action.equals(JdbcAction.NAME) ||
action.equals(CliAction.NAME);
}
private static boolean isTranslatedToBulkAction(String action) {

View File

@ -5,11 +5,6 @@
*/
package org.elasticsearch.xpack.sql.plugin;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
@ -18,7 +13,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -40,7 +34,11 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.TransportSqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.rest.RestSqlAction;
import static java.util.Collections.emptyList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import static java.util.Collections.singleton;
public class SqlPlugin extends Plugin implements ActionPlugin {
@ -52,12 +50,6 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
return singleton(new PlanExecutor(client, () -> clusterService.state()));
}
// TODO: hook defaults for things like number of groups or size of inner hits in here
@Override
public List<Setting<?>> getSettings() {
return emptyList();
}
@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,

View File

@ -11,7 +11,7 @@ import org.elasticsearch.client.ElasticsearchClient;
public class CliAction extends Action<CliRequest, CliResponse, CliRequestBuilder> {
public static final CliAction INSTANCE = new CliAction();
public static final String NAME = "indices:data/read/sql/jdbc";
public static final String NAME = "indices:data/read/sql/cli";
private CliAction() {
super(NAME);

View File

@ -9,11 +9,12 @@ import java.util.Objects;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.Request;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class CliRequest extends ActionRequest {
public class CliRequest extends ActionRequest implements CompositeIndicesRequest {
private Request request;

View File

@ -23,25 +23,21 @@ import org.elasticsearch.xpack.sql.plugin.cli.server.CliServer;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportCliAction extends HandledTransportAction<CliRequest, CliResponse> {
private final PlanExecutor planExecutor;
private final ClusterService clusterService;
private final CliServer cliServer;
@Inject
public TransportCliAction(Settings settings, String actionName, ThreadPool threadPool,
public TransportCliAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
super(settings, CliAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, CliRequest::new);
this.clusterService = clusterService;
this.planExecutor = planExecutor;
// lazy init of the resolver
// NOCOMMIT indexNameExpressionResolver should be available some other way
((EsCatalog) planExecutor.catalog()).setIndexNameExpressionResolver(indexNameExpressionResolver);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
this.cliServer = new CliServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(),
Version.CURRENT, Build.CURRENT);
}
@Override

View File

@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.sql.plugin.cli.server;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.elasticsearch.Build;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
@ -19,25 +16,31 @@ import org.elasticsearch.xpack.sql.cli.net.protocol.ErrorResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.ExceptionResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.cli.net.protocol.Request;
import org.elasticsearch.xpack.sql.cli.net.protocol.Response;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.execution.search.SearchHitRowSetCursor;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.function.Supplier;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.sql.net.client.util.StringUtils.EMPTY;
public class CliServer {
private final PlanExecutor executor;
private final InfoResponse infoResponse;
private final Supplier<InfoResponse> infoResponse;
public CliServer(PlanExecutor executor, String clusterName, String nodeName, Version version, Build build) {
public CliServer(PlanExecutor executor, String clusterName, Supplier<String> nodeName, Version version, Build build) {
this.executor = executor;
this.infoResponse = new InfoResponse(nodeName, clusterName, version.major, version.minor, version.toString(), build.shortHash(), build.date());
// Delay building the response until runtime because the node name is not available at startup
this.infoResponse = () -> new InfoResponse(nodeName.get(), clusterName, version.major, version.minor, version.toString(),
build.shortHash(), build.date());
}
public void handle(Request req, ActionListener<Response> listener) {
@ -54,7 +57,7 @@ public class CliServer {
}
public InfoResponse info(InfoRequest req) {
return infoResponse;
return infoResponse.get();
}
public void command(CommandRequest req, ActionListener<Response> listener) {

View File

@ -9,11 +9,12 @@ import java.util.Objects;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Request;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class JdbcRequest extends ActionRequest {
public class JdbcRequest extends ActionRequest implements CompositeIndicesRequest {
private Request request;

View File

@ -23,25 +23,21 @@ import org.elasticsearch.xpack.sql.plugin.jdbc.server.JdbcServer;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportJdbcAction extends HandledTransportAction<JdbcRequest, JdbcResponse> {
private final PlanExecutor planExecutor;
private final ClusterService clusterService;
private final JdbcServer jdbcServer;
@Inject
public TransportJdbcAction(Settings settings, String actionName, ThreadPool threadPool,
public TransportJdbcAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, JdbcRequest::new);
super(settings, JdbcAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, JdbcRequest::new);
this.clusterService = clusterService;
this.planExecutor = planExecutor;
// lazy init of the resolver
((EsCatalog) planExecutor.catalog()).setIndexNameExpressionResolver(indexNameExpressionResolver);
this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(), clusterService.localNode().getName(), Version.CURRENT, Build.CURRENT);
// NOCOMMIT indexNameExpressionResolver should be available some other way
this.jdbcServer = new JdbcServer(planExecutor, clusterService.getClusterName().value(), () -> clusterService.localNode().getName(),
Version.CURRENT, Build.CURRENT);
}
@Override

View File

@ -5,15 +5,6 @@
*/
package org.elasticsearch.xpack.sql.plugin.jdbc.server;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.elasticsearch.Build;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
@ -32,30 +23,41 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.SqlExceptionType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Request;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Response;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.Action;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.SqlExceptionType;
import org.elasticsearch.xpack.sql.net.client.util.StringUtils;
import org.elasticsearch.xpack.sql.parser.ParsingException;
import org.elasticsearch.xpack.sql.type.DataType;
import static java.util.stream.Collectors.toList;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.sql.net.client.util.StringUtils.EMPTY;
public class JdbcServer {
private final PlanExecutor executor;
private final InfoResponse infoResponse;
private final Supplier<InfoResponse> infoResponse;
public JdbcServer(PlanExecutor executor, String clusterName, String nodeName, Version version, Build build) {
public JdbcServer(PlanExecutor executor, String clusterName, Supplier<String> nodeName, Version version, Build build) {
this.executor = executor;
this.infoResponse = new InfoResponse(nodeName, clusterName, version.major, version.minor, version.toString(), build.shortHash(), build.date());
// Delay building the response until runtime because the node name is not available at startup
this.infoResponse = () -> new InfoResponse(nodeName.get(), clusterName, version.major, version.minor, version.toString(),
build.shortHash(), build.date());
}
public void handle(Request req, ActionListener<Response> listener) {
@ -78,7 +80,7 @@ public class JdbcServer {
}
public InfoResponse info(InfoRequest req) {
return infoResponse;
return infoResponse.get();
}
public MetaTableResponse metaTable(MetaTableRequest req) {

View File

@ -10,13 +10,14 @@ import java.util.Objects;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SqlRequest extends ActionRequest {
public class SqlRequest extends ActionRequest implements CompositeIndicesRequest {
// initialized on the first request
private String query;

View File

@ -24,6 +24,8 @@ import org.elasticsearch.xpack.sql.analysis.catalog.EsCatalog;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.sql.util.ActionUtils.chain;
public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlResponse> {
@ -35,22 +37,22 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
.setExpireAfterWrite(TimeValue.timeValueMinutes(10))
.build();
private final String ephemeralId;
private final Supplier<String> ephemeralId;
private final PlanExecutor planExecutor;
@Inject
public TransportSqlAction(Settings settings, String actionName, ThreadPool threadPool,
public TransportSqlAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
PlanExecutor planExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, SqlRequest::new);
super(settings, SqlAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SqlRequest::new);
this.planExecutor = planExecutor;
// lazy init of the resolver
((EsCatalog) planExecutor.catalog()).setIndexNameExpressionResolver(indexNameExpressionResolver);
ephemeralId = transportService.getLocalNode().getEphemeralId();
ephemeralId = () -> transportService.getLocalNode().getEphemeralId();
}
@Override
@ -64,7 +66,8 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
listener.onFailure(new SqlIllegalArgumentException("No query is given and request not part of a session"));
return;
}
// NOCOMMIT move session information somewhere - like into scroll or something. We should be able to reuse something.
// generate the plan and once its done, generate the session id, store it and send back the response
planExecutor.sql(query, chain(listener, c -> {
String id = generateId();
@ -89,6 +92,6 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
}
private String generateId() {
return ephemeralId + "-" + UUIDs.base64UUID();
return ephemeralId.get() + "-" + UUIDs.base64UUID();
}
}

View File

@ -27,7 +27,8 @@ class CliProtoHandler extends ProtoHandler<Response> {
CliProtoHandler(Client client) {
super(client, ProtoUtils::readHeader, CliServerProtoUtils::write);
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, info.getNode().getName(), info.getVersion() , info.getBuild());
this.server = new CliServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
}
@Override

View File

@ -27,7 +27,8 @@ class SqlProtoHandler extends ProtoHandler<Response> {
SqlProtoHandler(Client client) {
super(client, ProtoUtils::readHeader, JdbcServerProtoUtils::write);
this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName, info.getNode().getName(), info.getVersion() , info.getBuild());
this.server = new JdbcServer(TestUtils.planExecutor(client), clusterName, () -> info.getNode().getName(), info.getVersion(),
info.getBuild());
}
@Override

View File

@ -5,9 +5,8 @@
*/
package org.elasticsearch.xpack.sql.test.server;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
@ -18,8 +17,9 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
public abstract class ProtoHandler<R> implements HttpHandler, AutoCloseable {