mirror of https://github.com/apache/lucene.git
SOLR-8819: Implement DatabaseMetaDataImpl getTables() and fix getSchemas(). (Trey Cahill, Joel Bernstein, Kevin Risden)
This commit is contained in:
parent
a0609e9cfb
commit
dbee659174
|
@ -492,6 +492,9 @@ Other Changes
|
|||
* SOLR-8740: docValues are now enabled by default for most non-text (string, date, and numeric) fields
|
||||
in the schema templates. (yonik)
|
||||
|
||||
* SOLR-8819: Implement DatabaseMetaDataImpl getTables() and fix getSchemas(). (Trey Cahill,
|
||||
Joel Bernstein, Kevin Risden)
|
||||
|
||||
================== 5.5.1 ==================
|
||||
|
||||
Bug Fixes
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.solr.client.solrj.io.stream.UniqueStream;
|
|||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.*;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -173,17 +174,11 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
|
|||
TupleStream sqlStream = null;
|
||||
|
||||
if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_CATALOGS_")) {
|
||||
if (!sqlVistor.fields.contains("TABLE_CAT")) {
|
||||
throw new IOException("When querying _CATALOGS_, fields must contain column TABLE_CAT");
|
||||
}
|
||||
|
||||
sqlStream = new CatalogsStream(defaultZkhost);
|
||||
sqlStream = new SelectStream(new CatalogsStream(defaultZkhost), sqlVistor.columnAliases);
|
||||
} else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_SCHEMAS_")) {
|
||||
if (!sqlVistor.fields.contains("TABLE_SCHEM") || !sqlVistor.fields.contains("TABLE_CATALOG")) {
|
||||
throw new IOException("When querying _SCHEMAS_, fields must contain both TABLE_SCHEM and TABLE_CATALOG");
|
||||
}
|
||||
|
||||
sqlStream = new SchemasStream(defaultZkhost);
|
||||
sqlStream = new SelectStream(new SchemasStream(defaultZkhost), sqlVistor.columnAliases);
|
||||
} else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_TABLES_")) {
|
||||
sqlStream = new SelectStream(new TableStream(defaultZkhost), sqlVistor.columnAliases);
|
||||
} else if(sqlVistor.groupByQuery) {
|
||||
if(aggregationMode == AggregationMode.FACET) {
|
||||
sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
|
||||
|
@ -1378,7 +1373,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
|
|||
private int currentIndex = 0;
|
||||
private List<String> catalogs;
|
||||
|
||||
public CatalogsStream(String zkHost) {
|
||||
CatalogsStream(String zkHost) {
|
||||
this.zkHost = zkHost;
|
||||
}
|
||||
|
||||
|
@ -1392,7 +1387,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
|
|||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Map fields = new HashMap<>();
|
||||
Map<String, String> fields = new HashMap<>();
|
||||
if (this.currentIndex < this.catalogs.size()) {
|
||||
fields.put("TABLE_CAT", this.catalogs.get(this.currentIndex));
|
||||
this.currentIndex += 1;
|
||||
|
@ -1418,10 +1413,8 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
|
|||
private static class SchemasStream extends TupleStream {
|
||||
private final String zkHost;
|
||||
private StreamContext context;
|
||||
private int currentIndex = 0;
|
||||
private List<String> schemas;
|
||||
|
||||
public SchemasStream(String zkHost) {
|
||||
SchemasStream(String zkHost) {
|
||||
this.zkHost = zkHost;
|
||||
}
|
||||
|
||||
|
@ -1430,18 +1423,62 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
|
|||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
this.schemas = new ArrayList<>();
|
||||
|
||||
CloudSolrClient cloudSolrClient = this.context.getSolrClientCache().getCloudSolrClient(this.zkHost);
|
||||
this.schemas.addAll(cloudSolrClient.getZkStateReader().getClusterState().getCollections());
|
||||
Collections.sort(this.schemas);
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Map fields = new HashMap<>();
|
||||
if (this.currentIndex < this.schemas.size()) {
|
||||
fields.put("TABLE_SCHEM", this.schemas.get(this.currentIndex));
|
||||
fields.put("TABLE_CATALOG", this.zkHost);
|
||||
Map<String, String> fields = new HashMap<>();
|
||||
fields.put("EOF", "true");
|
||||
return new Tuple(fields);
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TableStream extends TupleStream {
|
||||
private final String zkHost;
|
||||
private StreamContext context;
|
||||
private int currentIndex = 0;
|
||||
private List<String> tables;
|
||||
|
||||
TableStream(String zkHost) {
|
||||
this.zkHost = zkHost;
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
this.tables = new ArrayList<>();
|
||||
|
||||
CloudSolrClient cloudSolrClient = this.context.getSolrClientCache().getCloudSolrClient(this.zkHost);
|
||||
cloudSolrClient.connect();
|
||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
||||
if (zkStateReader.getClusterState().getCollections().size() != 0) {
|
||||
this.tables.addAll(zkStateReader.getClusterState().getCollections());
|
||||
}
|
||||
Collections.sort(this.tables);
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Map<String, String> fields = new HashMap<>();
|
||||
if (this.currentIndex < this.tables.size()) {
|
||||
fields.put("TABLE_CAT", this.zkHost);
|
||||
fields.put("TABLE_SCHEM", null);
|
||||
fields.put("TABLE_NAME", this.tables.get(this.currentIndex));
|
||||
fields.put("TABLE_TYPE", "TABLE");
|
||||
fields.put("REMARKS", null);
|
||||
this.currentIndex += 1;
|
||||
} else {
|
||||
fields.put("EOF", "true");
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.handler;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -33,7 +34,6 @@ import org.apache.solr.common.params.CommonParams;
|
|||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSQLHandler extends AbstractFullDistribZkTestBase {
|
||||
|
@ -99,6 +99,9 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
|
|||
testParallelBasicGrouping();
|
||||
testParallelSelectDistinct();
|
||||
testParallelTimeSeriesGrouping();
|
||||
testCatalogStream();
|
||||
testSchemasStream();
|
||||
testTablesStream();
|
||||
}
|
||||
|
||||
private void testPredicate() throws Exception {
|
||||
|
@ -2418,6 +2421,74 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void testCatalogStream() throws Exception {
|
||||
CloudJettyRunner jetty = this.cloudJettys.get(0);
|
||||
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(CommonParams.QT, "/sql");
|
||||
params.put("numWorkers", 2);
|
||||
params.put("stmt", "select TABLE_CAT from _CATALOGS_");
|
||||
|
||||
SolrStream solrStream = new SolrStream(jetty.url, params);
|
||||
List<Tuple> tuples = getTuples(solrStream);
|
||||
|
||||
assertEquals(tuples.size(), 1);
|
||||
assertEquals(tuples.get(0).getString("TABLE_CAT"), zkServer.getZkAddress());
|
||||
}
|
||||
|
||||
private void testSchemasStream() throws Exception {
|
||||
CloudJettyRunner jetty = this.cloudJettys.get(0);
|
||||
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(CommonParams.QT, "/sql");
|
||||
params.put("numWorkers", 2);
|
||||
params.put("stmt", "select TABLE_SCHEM, TABLE_CATALOG from _SCHEMAS_");
|
||||
|
||||
SolrStream solrStream = new SolrStream(jetty.url, params);
|
||||
List<Tuple> tuples = getTuples(solrStream);
|
||||
|
||||
assertEquals(tuples.size(), 0);
|
||||
}
|
||||
|
||||
private void testTablesStream() throws Exception {
|
||||
CloudJettyRunner jetty = this.cloudJettys.get(0);
|
||||
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(CommonParams.QT, "/sql");
|
||||
params.put("numWorkers", 2);
|
||||
params.put("stmt", "select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS from _TABLES_");
|
||||
|
||||
SolrStream solrStream = new SolrStream(jetty.url, params);
|
||||
List<Tuple> tuples = getTuples(solrStream);
|
||||
|
||||
assertEquals(2, tuples.size());
|
||||
|
||||
List<String> collections = new ArrayList<>();
|
||||
collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
|
||||
Collections.sort(collections);
|
||||
for (Tuple tuple : tuples) {
|
||||
assertEquals(zkServer.getZkAddress(), tuple.getString("TABLE_CAT"));
|
||||
assertNull(tuple.get("TABLE_SCHEM"));
|
||||
assertTrue(collections.contains(tuple.getString("TABLE_NAME")));
|
||||
assertEquals("TABLE", tuple.getString("TABLE_TYPE"));
|
||||
assertNull(tuple.get("REMARKS"));
|
||||
}
|
||||
|
||||
tuples = getTuples(solrStream);
|
||||
assertEquals(2, tuples.size());
|
||||
|
||||
collections = new ArrayList<>();
|
||||
collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
|
||||
Collections.sort(collections);
|
||||
for (Tuple tuple : tuples) {
|
||||
assertEquals(zkServer.getZkAddress(), tuple.getString("TABLE_CAT"));
|
||||
assertNull(tuple.get("TABLE_SCHEM"));
|
||||
assertTrue(collections.contains(tuple.getString("TABLE_NAME")));
|
||||
assertEquals("TABLE", tuple.getString("TABLE_TYPE"));
|
||||
assertNull(tuple.get("REMARKS"));
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
|
||||
tupleStream.open();
|
||||
List<Tuple> tuples = new ArrayList();
|
||||
|
|
|
@ -55,7 +55,7 @@ class ConnectionImpl implements Connection {
|
|||
ConnectionImpl(String url, String zkHost, String collection, Properties properties) throws SQLException {
|
||||
this.url = url;
|
||||
this.client = this.solrClientCache.getCloudSolrClient(zkHost);
|
||||
this.setSchema(collection);
|
||||
this.collection = collection;
|
||||
this.properties = properties;
|
||||
this.connectionStatement = createStatement();
|
||||
this.databaseMetaData = new DatabaseMetaDataImpl(this, this.connectionStatement);
|
||||
|
@ -69,6 +69,10 @@ class ConnectionImpl implements Connection {
|
|||
return client;
|
||||
}
|
||||
|
||||
String getCollection() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
Properties getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
@ -345,12 +349,12 @@ class ConnectionImpl implements Connection {
|
|||
|
||||
@Override
|
||||
public void setSchema(String schema) throws SQLException {
|
||||
this.collection = schema;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSchema() throws SQLException {
|
||||
return this.collection;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -705,7 +705,7 @@ class DatabaseMetaDataImpl implements DatabaseMetaData {
|
|||
|
||||
@Override
|
||||
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException {
|
||||
return null;
|
||||
return this.connectionStatement.executeQuery("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS from _TABLES_");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -76,10 +76,10 @@ class StatementImpl implements Statement {
|
|||
try {
|
||||
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getSchema());
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getCollection());
|
||||
|
||||
if(slices == null) {
|
||||
throw new Exception("Collection not found:"+this.connection.getSchema());
|
||||
throw new Exception("Collection not found:"+this.connection.getCollection());
|
||||
}
|
||||
|
||||
List<Replica> shuffler = new ArrayList<>();
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
|
@ -36,7 +35,6 @@ import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
|||
import org.apache.solr.cloud.AbstractZkTestCase;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -81,7 +79,6 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
|
|||
super.distribSetUp();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
|
@ -444,9 +441,9 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
|
|||
con.setCatalog(zkServer.getZkAddress());
|
||||
assertEquals(zkServer.getZkAddress(), con.getCatalog());
|
||||
|
||||
assertEquals(collection, con.getSchema());
|
||||
con.setSchema(collection);
|
||||
assertEquals(collection, con.getSchema());
|
||||
assertEquals(null, con.getSchema());
|
||||
con.setSchema("myschema");
|
||||
assertEquals(null, con.getSchema());
|
||||
|
||||
DatabaseMetaData databaseMetaData = con.getMetaData();
|
||||
assertNotNull(databaseMetaData);
|
||||
|
@ -478,11 +475,19 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
|
|||
List<String> collections = new ArrayList<>();
|
||||
collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
|
||||
Collections.sort(collections);
|
||||
|
||||
try(ResultSet rs = databaseMetaData.getSchemas()) {
|
||||
assertFalse(rs.next());
|
||||
}
|
||||
|
||||
try(ResultSet rs = databaseMetaData.getTables(zkServer.getZkAddress(), null, "%", null)) {
|
||||
for(String acollection : collections) {
|
||||
assertTrue(rs.next());
|
||||
assertEquals(acollection, rs.getString("TABLE_SCHEM"));
|
||||
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CATALOG"));
|
||||
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT"));
|
||||
assertNull(rs.getString("TABLE_SCHEM"));
|
||||
assertEquals(acollection, rs.getString("TABLE_NAME"));
|
||||
assertEquals("TABLE", rs.getString("TABLE_TYPE"));
|
||||
assertNull(rs.getString("REMARKS"));
|
||||
}
|
||||
assertFalse(rs.next());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue