Make SQLHandlerStream extend JDBCStream

This commit is contained in:
Kevin Risden 2016-11-03 10:22:20 -05:00
parent 5f3b4237e0
commit 0188345e84
2 changed files with 24 additions and 105 deletions

View File

@ -18,14 +18,9 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -37,11 +32,7 @@ import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.JDBCStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreContainer;
@ -55,7 +46,7 @@ import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , PermissionNameProvider {
public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -118,15 +109,8 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String driverClass = CalciteSolrDriver.class.getCanonicalName();
// JDBC driver requires metadata from the SQLHandler. Default to false since this adds a new Metadata stream.
if(params.getBool("includeMetadata", false)) {
/*
* Would be great to replace this with the JDBCStream. Can't do that currently since need to have metadata
* added to the stream for the JDBC driver. This could be fixed by using the Calcite Avatica server and client.
*/
tupleStream = new SqlHandlerStream(url, sql, properties, driverClass);
} else {
tupleStream = new JDBCStream(url, sql, null, properties, driverClass);
}
boolean includeMetadata = params.getBool("includeMetadata", false);
tupleStream = new SqlHandlerStream(url, sql, null, properties, driverClass, includeMetadata);
tupleStream = new StreamHandler.TimerStream(new ExceptionStream(tupleStream));
@ -152,68 +136,32 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
/*
* Only necessary for SolrJ JDBC driver since metadata has to be passed back
*/
private class SqlHandlerStream extends TupleStream {
private final String url;
private final String sql;
private final Properties properties;
private final String driverClass;
private class SqlHandlerStream extends JDBCStream {
private final boolean includeMetadata;
private boolean firstTuple = true;
private Connection connection;
private Statement statement;
private ResultSet resultSet;
private ResultSetMetaData resultSetMetaData;
private int numColumns;
SqlHandlerStream(String url, String sql, Properties properties, String driverClass) {
this.url = url;
this.sql = sql;
this.properties = properties;
this.driverClass = driverClass;
}
SqlHandlerStream(String connectionUrl, String sqlQuery, StreamComparator definedSort,
Properties connectionProperties, String driverClassName, boolean includeMetadata)
throws IOException {
super(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName);
public List<TupleStream> children() {
return Collections.emptyList();
}
public void open() throws IOException {
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
try {
connection = DriverManager.getConnection(url, properties);
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
resultSetMetaData = this.resultSet.getMetaData();
numColumns = resultSetMetaData.getColumnCount();
} catch (SQLException e) {
this.close();
throw new IOException(e);
}
this.includeMetadata = includeMetadata;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("SQL Handler")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_DECORATOR);
}
// Return a metadata tuple as the first tuple and then pass through to the underlying stream.
public Tuple read() throws IOException {
try {
Map<String, Object> fields = new HashMap<>();
if(firstTuple) {
// Return a metadata tuple as the first tuple and then pass through to the JDBCStream.
if(includeMetadata && firstTuple) {
try {
Map<String, Object> fields = new HashMap<>();
firstTuple = false;
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<String> metadataFields = new ArrayList<>();
Map<String, String> metadataAliases = new HashMap<>();
for(int i = 1; i <= numColumns; i++) {
for(int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
String columnName = resultSetMetaData.getColumnName(i);
String columnLabel = resultSetMetaData.getColumnLabel(i);
metadataFields.add(columnName);
@ -223,42 +171,13 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
fields.put("isMetadata", true);
fields.put("fields", metadataFields);
fields.put("aliases", metadataAliases);
} else {
if(this.resultSet.next()){
for(int i = 1; i <= numColumns; i++) {
fields.put(resultSetMetaData.getColumnLabel(i), this.resultSet.getObject(i));
}
} else {
fields.put("EOF", true);
}
return new Tuple(fields);
} catch (SQLException e) {
throw new IOException(e);
}
return new Tuple(fields);
} catch (SQLException e) {
throw new IOException(e);
} else {
return super.read();
}
}
public StreamComparator getStreamSort() {
return null;
}
private void closeQuietly(AutoCloseable closeable) {
if(closeable != null) {
try {
closeable.close();
} catch (Exception ignore) {
}
}
}
public void close() throws IOException {
this.closeQuietly(this.resultSet);
this.closeQuietly(this.statement);
this.closeQuietly(this.connection);
}
public void setStreamContext(StreamContext context) {
}
}
}

View File

@ -85,8 +85,8 @@ public class JDBCStream extends TupleStream implements Expressible {
private Connection connection;
private Properties connectionProperties;
private Statement statement;
private ResultSet resultSet;
private ResultSetValueSelector[] valueSelectors;
protected ResultSet resultSet;
protected transient StreamContext streamContext;
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort) throws IOException {