mirror of https://github.com/apache/lucene.git
SOLR-8508: Implement DatabaseMetaDataImpl.getCatalogs()
This commit is contained in:
parent
ce0b931da8
commit
edf665988d
|
@ -164,7 +164,13 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
|
||||
TupleStream sqlStream = null;
|
||||
|
||||
if(sqlVistor.groupByQuery) {
|
||||
if(sqlVistor.table.toUpperCase(Locale.getDefault()).contains("_CATALOGS_")) {
|
||||
if (!sqlVistor.fields.contains("TABLE_CAT")) {
|
||||
throw new IOException("When querying _CATALOGS_, fields must contain column TABLE_CAT");
|
||||
}
|
||||
|
||||
sqlStream = new CatalogsStream(defaultZkhost);
|
||||
} else if(sqlVistor.groupByQuery) {
|
||||
if(aggregationMode == AggregationMode.FACET) {
|
||||
sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
|
||||
} else {
|
||||
|
@ -549,6 +555,11 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
throw new IOException("Select columns must be specified.");
|
||||
}
|
||||
|
||||
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
|
||||
|
||||
String zkHost = tableSpec.zkHost;
|
||||
String collection = tableSpec.collection;
|
||||
|
||||
boolean score = false;
|
||||
|
||||
for (String field : fields) {
|
||||
|
@ -594,7 +605,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
}
|
||||
} else {
|
||||
if(sqlVisitor.limit < 0) {
|
||||
throw new IOException("order by is required for unlimited select statements.");
|
||||
throw new IOException("order by is required for unlimited select statements.");
|
||||
} else {
|
||||
siBuf.append("score desc");
|
||||
if(!score) {
|
||||
|
@ -603,12 +614,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
}
|
||||
}
|
||||
|
||||
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
|
||||
|
||||
String zkHost = tableSpec.zkHost;
|
||||
String collection = tableSpec.collection;
|
||||
Map<String, String> params = new HashMap();
|
||||
|
||||
params.put("fl", fl.toString());
|
||||
params.put("q", sqlVisitor.query);
|
||||
|
||||
|
@ -616,7 +622,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
params.put("sort", siBuf.toString());
|
||||
}
|
||||
|
||||
TupleStream tupleStream = null;
|
||||
TupleStream tupleStream;
|
||||
|
||||
if(sqlVisitor.limit > -1) {
|
||||
params.put("rows", Integer.toString(sqlVisitor.limit));
|
||||
|
@ -1355,6 +1361,49 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
}
|
||||
}
|
||||
|
||||
private static class CatalogsStream extends TupleStream {
|
||||
private final String zkHost;
|
||||
private StreamContext context;
|
||||
private int currentIndex = 0;
|
||||
private List<String> catalogs;
|
||||
|
||||
public CatalogsStream(String zkHost) {
|
||||
this.zkHost = zkHost;
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
this.catalogs = new ArrayList<>();
|
||||
this.catalogs.add(this.zkHost);
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Map fields = new HashMap<>();
|
||||
if (this.currentIndex < this.catalogs.size()) {
|
||||
this.currentIndex += 1;
|
||||
fields.put("TABLE_CAT", this.zkHost);
|
||||
} else {
|
||||
fields.put("EOF", "true");
|
||||
}
|
||||
return new Tuple(fields);
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MetadataStream extends TupleStream {
|
||||
|
||||
private final TupleStream stream;
|
||||
|
|
|
@ -48,14 +48,18 @@ class ConnectionImpl implements Connection {
|
|||
private final CloudSolrClient client;
|
||||
private final String collection;
|
||||
private final Properties properties;
|
||||
private final DatabaseMetaData databaseMetaData;
|
||||
private final Statement connectionStatement;
|
||||
private boolean closed;
|
||||
private SQLWarning currentWarning;
|
||||
|
||||
ConnectionImpl(String url, String zkHost, String collection, Properties properties) {
|
||||
ConnectionImpl(String url, String zkHost, String collection, Properties properties) throws SQLException {
|
||||
this.url = url;
|
||||
this.client = solrClientCache.getCloudSolrClient(zkHost);
|
||||
this.collection = collection;
|
||||
this.properties = properties;
|
||||
this.connectionStatement = createStatement();
|
||||
this.databaseMetaData = new DatabaseMetaDataImpl(this, this.connectionStatement);
|
||||
}
|
||||
|
||||
String getUrl() {
|
||||
|
@ -119,11 +123,17 @@ class ConnectionImpl implements Connection {
|
|||
if(closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.closed = true;
|
||||
|
||||
try {
|
||||
this.solrClientCache.close();
|
||||
this.closed = true;
|
||||
} catch (Exception e) {
|
||||
throw new SQLException(e);
|
||||
if(this.connectionStatement != null) {
|
||||
this.connectionStatement.close();
|
||||
}
|
||||
} finally {
|
||||
if (this.solrClientCache != null) {
|
||||
this.solrClientCache.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,7 +144,7 @@ class ConnectionImpl implements Connection {
|
|||
|
||||
@Override
|
||||
public DatabaseMetaData getMetaData() throws SQLException {
|
||||
return new DatabaseMetaDataImpl(this);
|
||||
return this.databaseMetaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,7 +164,7 @@ class ConnectionImpl implements Connection {
|
|||
|
||||
@Override
|
||||
public String getCatalog() throws SQLException {
|
||||
return this.collection;
|
||||
return this.client.getZkHost();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -170,7 +180,7 @@ class ConnectionImpl implements Connection {
|
|||
@Override
|
||||
public SQLWarning getWarnings() throws SQLException {
|
||||
if(isClosed()) {
|
||||
throw new SQLException("Statement is closed.");
|
||||
throw new SQLException("Connection is closed.");
|
||||
}
|
||||
|
||||
return this.currentWarning;
|
||||
|
@ -179,7 +189,7 @@ class ConnectionImpl implements Connection {
|
|||
@Override
|
||||
public void clearWarnings() throws SQLException {
|
||||
if(isClosed()) {
|
||||
throw new SQLException("Statement is closed.");
|
||||
throw new SQLException("Connection is closed.");
|
||||
}
|
||||
|
||||
this.currentWarning = null;
|
||||
|
@ -341,7 +351,7 @@ class ConnectionImpl implements Connection {
|
|||
|
||||
@Override
|
||||
public String getSchema() throws SQLException {
|
||||
throw new UnsupportedOperationException();
|
||||
return this.collection;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,12 +22,15 @@ import java.sql.DatabaseMetaData;
|
|||
import java.sql.ResultSet;
|
||||
import java.sql.RowIdLifetime;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
class DatabaseMetaDataImpl implements DatabaseMetaData {
|
||||
private final ConnectionImpl connection;
|
||||
private final Statement connectionStatement;
|
||||
|
||||
DatabaseMetaDataImpl(ConnectionImpl connection) {
|
||||
public DatabaseMetaDataImpl(ConnectionImpl connection, Statement connectionStatement) {
|
||||
this.connection = connection;
|
||||
this.connectionStatement = connectionStatement;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -642,7 +645,7 @@ class DatabaseMetaDataImpl implements DatabaseMetaData {
|
|||
|
||||
@Override
|
||||
public ResultSet getCatalogs() throws SQLException {
|
||||
return null;
|
||||
return this.connectionStatement.executeQuery("select TABLE_CAT from _CATALOGS_");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,6 +42,8 @@ import java.util.Map;
|
|||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.PushBackStream;
|
||||
import org.apache.solr.client.solrj.io.stream.SolrStream;
|
||||
import org.apache.solr.client.solrj.io.stream.StreamContext;
|
||||
|
||||
class ResultSetImpl implements ResultSet {
|
||||
private final StatementImpl statement;
|
||||
|
@ -55,12 +57,18 @@ class ResultSetImpl implements ResultSet {
|
|||
private SQLWarning currentWarning;
|
||||
private boolean wasLastValueNull;
|
||||
|
||||
ResultSetImpl(StatementImpl statement) {
|
||||
ResultSetImpl(StatementImpl statement, SolrStream solrStream) throws SQLException {
|
||||
this.statement = statement;
|
||||
this.solrStream = new PushBackStream(statement.getSolrStream());
|
||||
|
||||
// Read the first tuple so that metadata can be gathered
|
||||
try {
|
||||
this.solrStream = new PushBackStream(solrStream);
|
||||
|
||||
StreamContext context = new StreamContext();
|
||||
context.setSolrClientCache(((ConnectionImpl)this.statement.getConnection()).getSolrClientCache());
|
||||
this.solrStream.setStreamContext(context);
|
||||
|
||||
this.solrStream.open();
|
||||
|
||||
this.metadataTuple = this.solrStream.read();
|
||||
|
||||
Object isMetadata = this.metadataTuple.get("isMetadata");
|
||||
|
@ -71,7 +79,7 @@ class ResultSetImpl implements ResultSet {
|
|||
this.firstTuple = this.solrStream.read();
|
||||
this.solrStream.pushBack(firstTuple);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Couldn't read first tuple", e);
|
||||
throw new SQLException("Couldn't read first tuple", e);
|
||||
}
|
||||
|
||||
this.resultSetMetaData = new ResultSetMetaDataImpl(this);
|
||||
|
@ -115,6 +123,12 @@ class ResultSetImpl implements ResultSet {
|
|||
@Override
|
||||
public void close() throws SQLException {
|
||||
this.done = this.closed = true;
|
||||
|
||||
try {
|
||||
this.solrStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new SQLException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.HashMap;
|
|||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.client.solrj.io.stream.SolrStream;
|
||||
import org.apache.solr.client.solrj.io.stream.StreamContext;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
@ -43,7 +42,6 @@ import org.apache.solr.common.params.CommonParams;
|
|||
class StatementImpl implements Statement {
|
||||
|
||||
private final ConnectionImpl connection;
|
||||
private SolrStream solrStream;
|
||||
private boolean closed;
|
||||
private String currentSQL;
|
||||
private ResultSetImpl currentResultSet;
|
||||
|
@ -53,26 +51,16 @@ class StatementImpl implements Statement {
|
|||
this.connection = connection;
|
||||
}
|
||||
|
||||
public SolrStream getSolrStream() {
|
||||
return this.solrStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultSet executeQuery(String sql) throws SQLException {
|
||||
try {
|
||||
if(this.currentResultSet != null) {
|
||||
this.currentResultSet.close();
|
||||
this.currentResultSet = null;
|
||||
this.solrStream.close();
|
||||
}
|
||||
|
||||
closed = false; // If closed reopen so Statement can be reused.
|
||||
this.solrStream = constructStream(sql);
|
||||
StreamContext context = new StreamContext();
|
||||
context.setSolrClientCache(this.connection.getSolrClientCache());
|
||||
this.solrStream.setStreamContext(context);
|
||||
this.solrStream.open();
|
||||
this.currentResultSet = new ResultSetImpl(this);
|
||||
this.currentResultSet = new ResultSetImpl(this, constructStream(sql));
|
||||
return this.currentResultSet;
|
||||
} catch(Exception e) {
|
||||
throw new SQLException(e);
|
||||
|
@ -83,10 +71,10 @@ class StatementImpl implements Statement {
|
|||
try {
|
||||
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getCatalog());
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getSchema());
|
||||
|
||||
if(slices == null) {
|
||||
throw new Exception("Collection not found:"+this.connection.getCatalog());
|
||||
throw new Exception("Collection not found:"+this.connection.getSchema());
|
||||
}
|
||||
|
||||
List<Replica> shuffler = new ArrayList<>();
|
||||
|
@ -126,13 +114,10 @@ class StatementImpl implements Statement {
|
|||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if(this.solrStream != null) {
|
||||
this.solrStream.close();
|
||||
}
|
||||
this.closed = true;
|
||||
} catch (Exception e) {
|
||||
throw new SQLException(e);
|
||||
this.closed = true;
|
||||
|
||||
if(this.currentResultSet != null) {
|
||||
this.currentResultSet.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -378,7 +378,8 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
|
|||
private void testJDBCMethods(String collection, String connectionString, Properties properties, String sql) throws Exception {
|
||||
try (Connection con = DriverManager.getConnection(connectionString, properties)) {
|
||||
assertTrue(con.isValid(DEFAULT_CONNECTION_TIMEOUT));
|
||||
assertEquals(collection, con.getCatalog());
|
||||
assertEquals(zkServer.getZkAddress(), con.getCatalog());
|
||||
assertEquals(collection, con.getSchema());
|
||||
|
||||
DatabaseMetaData databaseMetaData = con.getMetaData();
|
||||
assertNotNull(databaseMetaData);
|
||||
|
@ -386,6 +387,12 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals(con, databaseMetaData.getConnection());
|
||||
assertEquals(connectionString, databaseMetaData.getURL());
|
||||
|
||||
try(ResultSet rs = databaseMetaData.getCatalogs()) {
|
||||
assertTrue(rs.next());
|
||||
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT"));
|
||||
assertFalse(rs.next());
|
||||
}
|
||||
|
||||
assertNull(con.getWarnings());
|
||||
con.clearWarnings();
|
||||
assertNull(con.getWarnings());
|
||||
|
|
Loading…
Reference in New Issue