SOLR-8515, SOLR-8502: Implement StatementImpl.getConnection

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1725595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2016-01-19 19:14:15 +00:00
parent 58102fa671
commit 1de59cad5d
3 changed files with 44 additions and 38 deletions

View File

@ -42,26 +42,38 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
class ConnectionImpl implements Connection { class ConnectionImpl implements Connection {
private final String url; private final String url;
private SolrClientCache sqlSolrClientCache = new SolrClientCache(); private final SolrClientCache solrClientCache = new SolrClientCache();
private CloudSolrClient client; private final CloudSolrClient client;
private String collection; private final String collection;
Properties props; private final Properties properties;
private boolean closed; private boolean closed;
ConnectionImpl(String url, String zkHost, String collection, Properties props) { ConnectionImpl(String url, String zkHost, String collection, Properties properties) {
this.url = url; this.url = url;
this.client = sqlSolrClientCache.getCloudSolrClient(zkHost); this.client = solrClientCache.getCloudSolrClient(zkHost);
this.collection = collection; this.collection = collection;
this.props = props; this.properties = properties;
} }
String getUrl() { String getUrl() {
return url; return url;
} }
CloudSolrClient getClient() {
return client;
}
Properties getProperties() {
return properties;
}
SolrClientCache getSolrClientCache() {
return this.solrClientCache;
}
@Override @Override
public Statement createStatement() throws SQLException { public Statement createStatement() throws SQLException {
return new StatementImpl(client, this.collection, props, sqlSolrClientCache); return new StatementImpl(this);
} }
@Override @Override
@ -105,7 +117,7 @@ class ConnectionImpl implements Connection {
return; return;
} }
try { try {
this.sqlSolrClientCache.close(); this.solrClientCache.close();
this.closed = true; this.closed = true;
} catch (Exception e) { } catch (Exception e) {
throw new SQLException(e); throw new SQLException(e);

View File

@ -29,12 +29,9 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Properties;
import java.util.Random; import java.util.Random;
import org.apache.solr.client.solrj.io.stream.SolrStream; import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
@ -45,28 +42,21 @@ import org.apache.solr.common.params.CommonParams;
class StatementImpl implements Statement { class StatementImpl implements Statement {
private CloudSolrClient client; private final ConnectionImpl connection;
private SolrClientCache sqlSolrClientCache;
private String collection;
private Properties properties;
private SolrStream solrStream; private SolrStream solrStream;
private boolean closed; private boolean closed;
StatementImpl(CloudSolrClient client, String collection, Properties properties, SolrClientCache sqlSolrClientCache) { StatementImpl(ConnectionImpl connection) {
this.client = client; this.connection = connection;
this.collection = collection;
this.properties = properties;
this.sqlSolrClientCache = sqlSolrClientCache;
} }
@Override @Override
public ResultSet executeQuery(String sql) throws SQLException { public ResultSet executeQuery(String sql) throws SQLException {
try { try {
closed = false; // If closed reopen so Statement can be reused. closed = false; // If closed reopen so Statement can be reused.
this.solrStream = constructStream(sql); this.solrStream = constructStream(sql);
StreamContext context = new StreamContext(); StreamContext context = new StreamContext();
context.setSolrClientCache(sqlSolrClientCache); context.setSolrClientCache(this.connection.getSolrClientCache());
this.solrStream.setStreamContext(context); this.solrStream.setStreamContext(context);
this.solrStream.open(); this.solrStream.open();
return new ResultSetImpl(this.solrStream); return new ResultSetImpl(this.solrStream);
@ -76,19 +66,16 @@ class StatementImpl implements Statement {
} }
protected SolrStream constructStream(String sql) throws IOException { protected SolrStream constructStream(String sql) throws IOException {
try { try {
ZkStateReader zkStateReader = client.getZkStateReader(); ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState(); ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection); Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getCatalog());
if(slices == null) { if(slices == null) {
throw new Exception("Collection not found:"+this.collection); throw new Exception("Collection not found:"+this.connection.getCatalog());
} }
Map params = new HashMap(); List<Replica> shuffler = new ArrayList<>();
List<Replica> shuffler = new ArrayList();
for(Slice slice : slices) { for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas(); Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) { for (Replica replica : replicas) {
@ -98,15 +85,17 @@ class StatementImpl implements Statement {
Collections.shuffle(shuffler, new Random()); Collections.shuffle(shuffler, new Random());
Map<String, String> params = new HashMap<>();
params.put(CommonParams.QT, "/sql"); params.put(CommonParams.QT, "/sql");
params.put("stmt", sql); params.put("stmt", sql);
params.putAll(properties); for(String propertyName : this.connection.getProperties().stringPropertyNames()) {
params.put(propertyName, this.connection.getProperties().getProperty(propertyName));
}
Replica rep = shuffler.get(0); Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep); ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl(); String url = zkProps.getCoreUrl();
return new SolrStream(url, params); return new SolrStream(url, params);
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
@ -119,13 +108,14 @@ class StatementImpl implements Statement {
@Override @Override
public void close() throws SQLException { public void close() throws SQLException {
if(closed) { if(closed) {
return; return;
} }
try { try {
this.solrStream.close(); if(this.solrStream != null) {
this.solrStream.close();
}
this.closed = true; this.closed = true;
} catch (Exception e) { } catch (Exception e) {
throw new SQLException(e); throw new SQLException(e);
@ -254,7 +244,7 @@ class StatementImpl implements Statement {
@Override @Override
public Connection getConnection() throws SQLException { public Connection getConnection() throws SQLException {
throw new UnsupportedOperationException(); return this.connection;
} }
@Override @Override

View File

@ -217,7 +217,7 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
//Test params on the url //Test params on the url
con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1&aggregationMode=map_reduce&numWorkers=2"); con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1&aggregationMode=map_reduce&numWorkers=2");
Properties p = ((ConnectionImpl) con).props; Properties p = ((ConnectionImpl) con).getProperties();
assert(p.getProperty("aggregationMode").equals("map_reduce")); assert(p.getProperty("aggregationMode").equals("map_reduce"));
assert(p.getProperty("numWorkers").equals("2")); assert(p.getProperty("numWorkers").equals("2"));
@ -244,7 +244,7 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
con = DriverManager.getConnection( con = DriverManager.getConnection(
"jdbc:solr://" + zkHost + "?collection=collection1&username=&password=&testKey1=testValue&testKey2"); "jdbc:solr://" + zkHost + "?collection=collection1&username=&password=&testKey1=testValue&testKey2");
p = ((ConnectionImpl) con).props; p = ((ConnectionImpl) con).getProperties();
assert(p.getProperty("username").equals("")); assert(p.getProperty("username").equals(""));
assert(p.getProperty("password").equals("")); assert(p.getProperty("password").equals(""));
assert(p.getProperty("testKey1").equals("testValue")); assert(p.getProperty("testKey1").equals("testValue"));
@ -278,7 +278,7 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
con = DriverManager.getConnection("jdbc:solr://" + zkHost, providedProperties); con = DriverManager.getConnection("jdbc:solr://" + zkHost, providedProperties);
p = ((ConnectionImpl) con).props; p = ((ConnectionImpl) con).getProperties();
assert(p.getProperty("username").equals("")); assert(p.getProperty("username").equals(""));
assert(p.getProperty("password").equals("")); assert(p.getProperty("password").equals(""));
assert(p.getProperty("testKey1").equals("testValue")); assert(p.getProperty("testKey1").equals("testValue"));
@ -317,6 +317,10 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
assertNotNull(databaseMetaData); assertNotNull(databaseMetaData);
assertEquals(connectionString, databaseMetaData.getURL()); assertEquals(connectionString, databaseMetaData.getURL());
try (Statement statement = con.createStatement()) {
assertEquals(con, statement.getConnection());
}
} }
} }
} }