SOLR-9065: Migrate SolrJ tests to SolrCloudTestCase

This commit is contained in:
Alan Woodward 2016-04-30 09:22:28 +01:00
parent e15bab37a1
commit 630a8c950d
14 changed files with 1617 additions and 2334 deletions

View File

@ -245,6 +245,8 @@ Other Changes
* SOLR-9066 Make CountMetric return long instead of double (Kevin Risden)
* SOLR-9065: Migrate SolrJ distributed tests to SolrCloudTestCase. (Alan Woodward)
================== 6.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -215,7 +215,7 @@ public class SolrStream extends TupleStream {
throw new IOException("--> "+this.baseUrl+":"+e.getMessage());
} catch (Exception e) {
//The Stream source did not provide an exception in a format that the SolrStream could propagate.
throw new IOException("--> "+this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
throw new IOException("--> "+this.baseUrl+": An exception has occurred on the server, refer to server log for details.", e);
}
}

View File

@ -1042,6 +1042,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return new RequestStatus(requestId);
}
public static void waitForAsyncRequest(String requestId, SolrClient client, long timeout) throws SolrServerException, InterruptedException, IOException {
requestStatus(requestId).waitFor(client, timeout);
}
// REQUESTSTATUS request
public static class RequestStatus extends CollectionAdminRequest<RequestStatusResponse> {

View File

@ -30,13 +30,17 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.XML;
@ -103,6 +107,10 @@ public class UpdateRequest extends AbstractUpdateRequest {
return this;
}
public UpdateRequest add(String... fields) {
return add(new SolrInputDocument(fields));
}
/**
* Add a SolrInputDocument to this request
* @param doc the document
@ -209,6 +217,13 @@ public class UpdateRequest extends AbstractUpdateRequest {
deleteQuery.add(q);
return this;
}
public UpdateResponse commit(SolrClient client, String collection) throws IOException, SolrServerException {
if (params == null)
params = new ModifiableSolrParams();
params.set(UpdateParams.COMMIT, "true");
return process(client, collection);
}
/**
* @param router to route updates with
@ -383,7 +398,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
/**
* @since solr 1.4
*/
public void writeXML(Writer writer) throws IOException {
public UpdateRequest writeXML(Writer writer) throws IOException {
List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
@ -457,6 +472,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
}
writer.append("</delete>");
}
return this;
}
// --------------------------------------------------------------------------

View File

@ -38,8 +38,12 @@ public class SolrInputDocument extends SolrDocumentBase<SolrInputField, SolrInpu
private float _documentBoost = 1.0f;
private List<SolrInputDocument> _childDocuments;
public SolrInputDocument() {
public SolrInputDocument(String... fields) {
_fields = new LinkedHashMap<>();
assert fields.length % 2 == 0;
for (int i = 0; i < fields.length; i += 2) {
addField(fields[i], fields[i + 1]);
}
}
public SolrInputDocument(Map<String,SolrInputField> fields) {

View File

@ -16,6 +16,19 @@
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -26,12 +39,13 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@ -46,6 +60,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -53,163 +68,87 @@ import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
/**
* This test would be faster if we simulated the zk state instead.
*/
@Slow
public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public class CloudSolrClientTest extends SolrCloudTestCase {
private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
private static final String COLLECTION = "collection1";
private static final String id = "id";
private static final int TIMEOUT = 30;
@BeforeClass
public static void beforeSuperClass() {
// this is necessary because AbstractZkTestCase.buildZooKeeper is used by AbstractDistribZkTestBase
// and the auto-detected SOLRHOME=TEST_HOME() does not exist for solrj tests
// todo fix this
AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
protected String getCloudSolrConfig() {
return "solrconfig.xml";
}
@Override
public String getSolrHome() {
return SOLR_HOME;
}
public static String SOLR_HOME() {
return SOLR_HOME;
}
public CloudSolrClientTest() {
super();
sliceCount = 2;
fixShardCount(3);
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Before
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
}
@Test
public void test() throws Exception {
testParallelUpdateQTime();
checkCollectionParameters();
allTests();
stateVersionParamTest();
customHttpClientTest();
testOverwriteOption();
preferLocalShardsTest();
}
private void testParallelUpdateQTime() throws Exception {
public void testParallelUpdateQTime() throws Exception {
UpdateRequest req = new UpdateRequest();
for (int i=0; i<10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
req.add(doc);
}
UpdateResponse response = req.process(cloudClient);
UpdateResponse response = req.process(cluster.getSolrClient(), COLLECTION);
// See SOLR-6547, we just need to ensure that no exception is thrown here
assertTrue(response.getQTime() >= 0);
}
private void testOverwriteOption() throws Exception, SolrServerException,
IOException {
String collectionName = "overwriteCollection";
createCollection(collectionName, controlClientCloud, 1, 1);
waitForRecoveriesToFinish(collectionName, false);
try (CloudSolrClient cloudClient = createCloudClient(collectionName)) {
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField(id, "0");
doc1.addField("a_t", "hello1");
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField(id, "0");
doc2.addField("a_t", "hello2");
UpdateRequest request = new UpdateRequest();
request.add(doc1);
request.add(doc2);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
NamedList<Object> response = cloudClient.request(request);
QueryResponse resp = cloudClient.query(new SolrQuery("*:*"));
assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
doc1 = new SolrInputDocument();
doc1.addField(id, "1");
doc1.addField("a_t", "hello1");
doc2 = new SolrInputDocument();
doc2.addField(id, "1");
doc2.addField("a_t", "hello2");
request = new UpdateRequest();
// overwrite=false
request.add(doc1, false);
request.add(doc2, false);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
response = cloudClient.request(request);
resp = cloudClient.query(new SolrQuery("*:*"));
@Test
public void testOverwriteOption() throws Exception {
CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
AbstractDistribZkTestBase.waitForRecoveriesToFinish("overwrite", cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
new UpdateRequest()
.add("id", "0", "a_t", "hello1")
.add("id", "0", "a_t", "hello2")
.commit(cluster.getSolrClient(), "overwrite");
QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
new UpdateRequest()
.add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
.add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
.commit(cluster.getSolrClient(), "overwrite");
resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
}
}
private void allTests() throws Exception {
String collectionName = "clientTestExternColl";
createCollection(collectionName, controlClientCloud, 2, 2);
waitForRecoveriesToFinish(collectionName, false);
CloudSolrClient cloudClient = createCloudClient(collectionName);
assertNotNull(cloudClient);
@Test
public void testRouting() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
this.cloudClient.commit();
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField(id, "0");
doc1.addField("a_t", "hello1");
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField(id, "2");
doc2.addField("a_t", "hello2");
UpdateRequest request = new UpdateRequest();
request.add(doc1);
request.add(doc2);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
AbstractUpdateRequest request = new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
// Test single threaded routed updates for UpdateRequest
NamedList<Object> response = cloudClient.request(request);
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
@ -234,22 +173,19 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
// Test the deleteById routing for UpdateRequest
UpdateRequest delRequest = new UpdateRequest();
delRequest.deleteById("0");
delRequest.deleteById("2");
delRequest.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
cloudClient.request(delRequest);
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add("q", "*:*");
QueryRequest qRequest = new QueryRequest(qParams);
QueryResponse qResponse = qRequest.process(cloudClient);
new UpdateRequest()
.deleteById("0")
.deleteById("2")
.commit(cluster.getSolrClient(), COLLECTION);
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
SolrDocumentList docs = qResponse.getResults();
assertTrue(docs.getNumFound() == 0);
assertEquals(0, docs.getNumFound());
// Test Multi-Threaded routed updates for UpdateRequest
try (CloudSolrClient threadedClient = getCloudSolrClient(zkServer.getZkAddress())) {
try (CloudSolrClient threadedClient = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
threadedClient.setParallelUpdates(true);
threadedClient.setDefaultCollection(collectionName);
threadedClient.setDefaultCollection(COLLECTION);
response = threadedClient.request(request);
rr = (CloudSolrClient.RouteResponse) response;
routes = rr.getRoutes();
@ -277,13 +213,13 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
// Test that queries with _route_ params are routed by the client
// Track request counts on each node before query calls
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection col = clusterState.getCollection(collectionName);
ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
DocCollection col = clusterState.getCollection(COLLECTION);
Map<String, Long> requestCountsMap = Maps.newHashMap();
for (Slice slice : col.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
requestCountsMap.put(baseURL, getNumRequests(baseURL,collectionName));
requestCountsMap.put(baseURL, getNumRequests(baseURL, COLLECTION));
}
}
@ -328,7 +264,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(CommonParams.Q, "*:*");
solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
log.info("output : {}" ,cloudClient.query(solrParams));
log.info("output: {}", cluster.getSolrClient().query(COLLECTION, solrParams));
}
// Request counts increase from expected nodes should aggregate to 1000, while there should be
@ -341,7 +277,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
Long prevNumRequests = requestCountsMap.get(baseURL);
Long curNumRequests = getNumRequests(baseURL, collectionName);
Long curNumRequests = getNumRequests(baseURL, COLLECTION);
long delta = curNumRequests - prevNumRequests;
if (expectedBaseURLs.contains(baseURL)) {
@ -357,74 +293,36 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
0, increaseFromUnexpectedUrls);
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
cloudClient.commit();
cloudClient.close();
}
/**
* Tests if the specification of 'preferLocalShards' in the query-params
* limits the distributed query to locally hosted shards only
*/
private void preferLocalShardsTest() throws Exception {
@Test
public void preferLocalShardsTest() throws Exception {
String collectionName = "localShardsTestColl";
int liveNodes = getCommonCloudSolrClient()
.getZkStateReader().getClusterState().getLiveNodes().size();
int liveNodes = cluster.getJettySolrRunners().size();
// For preferLocalShards to succeed in a test, every shard should have
// all its cores on the same node.
// Hence the below configuration for our collection
Map<String, Object> props = makeMap(
REPLICATION_FACTOR, liveNodes,
MAX_SHARDS_PER_NODE, liveNodes,
NUM_SLICES, liveNodes);
Map<String,List<Integer>> collectionInfos = new HashMap<String,List<Integer>>();
createCollection(collectionInfos, collectionName, props, controlClientCloud);
waitForRecoveriesToFinish(collectionName, false);
CloudSolrClient cloudClient = createCloudClient(collectionName);
assertNotNull(cloudClient);
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
// Remove any documents from previous test (if any)
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
cloudClient.commit();
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
// Add some new documents
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField(id, "0");
doc1.addField("a_t", "hello1");
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField(id, "2");
doc2.addField("a_t", "hello2");
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField(id, "3");
doc3.addField("a_t", "hello2");
UpdateRequest request = new UpdateRequest();
request.add(doc1);
request.add(doc2);
request.add(doc3);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(cluster.getSolrClient(), collectionName);
// Run the actual test for 'preferLocalShards'
queryWithPreferLocalShards(cloudClient, true, collectionName);
// Cleanup
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
cloudClient.commit();
cloudClient.close();
queryWithPreferLocalShards(cluster.getSolrClient(), true, collectionName);
}
private void queryWithPreferLocalShards(CloudSolrClient cloudClient,
@ -432,8 +330,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery();
qRequest.setQuery("*:*");
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add("preferLocalShards", Boolean.toString(preferLocalShards));
@ -444,7 +341,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
// And since all the nodes are hosting cores from all shards, the
// distributed query formed by this node will select cores from the
// local shards only
QueryResponse qResponse = cloudClient.query (qRequest);
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
@ -495,21 +392,23 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
return (Long) resp.findRecursive("solr-mbeans", "QUERYHANDLER",
"standard", "stats", "requests");
}
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
private void checkCollectionParameters() throws Exception {
@Test
public void checkCollectionParameters() throws Exception {
try (CloudSolrClient client = createCloudClient("multicollection1")) {
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
createCollection("multicollection1", client, 2, 2);
createCollection("multicollection2", client, 2, 2);
waitForRecoveriesToFinish("multicollection1", false);
waitForRecoveriesToFinish("multicollection2", false);
String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
.processAsync(client);
String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
.processAsync(client);
CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
AbstractDistribZkTestBase.waitForRecoveriesToFinish("multicollection1", client.getZkStateReader(), false, true, TIMEOUT);
AbstractDistribZkTestBase.waitForRecoveriesToFinish("multicollection2", client.getZkStateReader(), false, true, TIMEOUT);
client.setDefaultCollection("multicollection1");
List<SolrInputDocument> docs = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
@ -540,73 +439,70 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
}
private void stateVersionParamTest() throws Exception {
@Test
public void stateVersionParamTest() throws Exception {
try (CloudSolrClient client = createCloudClient(null)) {
String collectionName = "checkStateVerCol";
createCollection(collectionName, client, 1, 3);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
SolrQuery q = new SolrQuery().setQuery("*:*");
HttpSolrClient.RemoteSolrException sse = null;
SolrQuery q = new SolrQuery().setQuery("*:*");
HttpSolrClient.RemoteSolrException sse = null;
final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" +collectionName;
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" + COLLECTION;
try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
log.info("should work query, result {}", solrClient.query(q));
//no problem
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + coll.getZNodeVersion());
log.info("2nd query , result {}", solrClient.query(q));
//no error yet good
log.info("should work query, result {}", solrClient.query(q));
//no problem
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
log.info("2nd query , result {}", solrClient.query(q));
//no error yet good
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
QueryResponse rsp = solrClient.query(q);
Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
assertNotNull(m.get(COLLECTION));
}
//now send the request to another node that does not serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
}
}
String theNode = null;
Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
for (String s : liveNodes) {
String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(n)){
theNode = n;
break;
}
}
log.info("the node which does not serve this collection{} ",theNode);
assertNotNull(theNode);
final String solrClientUrl = theNode + "/" + COLLECTION;
try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
try {
QueryResponse rsp = solrClient.query(q);
Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
assertNotNull(m.get(collectionName));
}
//now send the request to another node that does not serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
}
}
String theNode = null;
Set<String> liveNodes = client.getZkStateReader().getClusterState().getLiveNodes();
for (String s : liveNodes) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(n)){
theNode = n;
break;
}
}
log.info("the node which does not serve this collection{} ",theNode);
assertNotNull(theNode);
final String solrClientUrl = theNode + "/" + collectionName;
try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion()-1));
try {
QueryResponse rsp = solrClient.query(q);
log.info("error was expected");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
}
assertNotNull(sse);
assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
log.info("error was expected");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
}
assertNotNull(sse);
assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
}
}
@Test
public void testShutdown() throws IOException {
try (CloudSolrClient client = getCloudSolrClient("[ff01::114]:33332")) {
client.setZkConnectTimeout(100);
@ -620,22 +516,23 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testWrongZkChrootTest() throws IOException {
exception.expect(SolrException.class);
exception.expectMessage("cluster not found/not ready");
try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress() + "/xyz/foo")) {
client.setDefaultCollection(DEFAULT_COLLECTION);
try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
client.setZkClientTimeout(1000 * 60);
client.connect();
fail("Expected exception");
}
}
@Test
public void customHttpClientTest() throws IOException {
CloseableHttpClient client = HttpClientUtil.createClient(null);
try (CloudSolrClient solrClient = getCloudSolrClient(zkServer.getZkAddress(), client)) {
try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
assertTrue(solrClient.getLbClient().getHttpClient() == client);

View File

@ -17,14 +17,12 @@ package org.apache.solr.client.solrj.io.graph;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -34,18 +32,20 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.HashJoinStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.junit.AfterClass;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -58,96 +58,52 @@ import org.junit.Test;
@Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
public class GraphExpressionTest extends SolrCloudTestCase {
private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
private static final String COLLECTION = "collection1";
static {
schemaString = "schema-streaming.xml";
}
private static final String id = "id";
private static final int TIMEOUT = 30;
@BeforeClass
public static void beforeSuperClass() {
AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
}
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
@AfterClass
public static void afterSuperClass() {
}
protected String getCloudSolrConfig() {
return "solrconfig-streaming.xml";
}
@Override
public String getSolrHome() {
return SOLR_HOME;
}
public static String SOLR_HOME() {
return SOLR_HOME;
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// we expect this time of exception as shards go up and down...
//ignoreException(".*");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
resetExceptionIgnores();
}
public GraphExpressionTest() {
super();
sliceCount = 2;
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
}
@Test
public void testAll() throws Exception{
assertNotNull(cloudClient);
public void testShortestPathStream() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForRecoveriesToFinish(false);
del("*:*");
commit();
testShortestPathStream();
testGatherNodesStream();
testGatherNodesFriendsStream();
}
private void testShortestPathStream() throws Exception {
indexr(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows");
indexr(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows");
indexr(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows");
indexr(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows");
indexr(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows");
indexr(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows");
indexr(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows");
indexr(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows");
indexr(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows");
indexr(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows");
indexr(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows");
indexr(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows");
indexr(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows");
indexr(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows");
indexr(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows");
commit();
new UpdateRequest()
.add(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows")
.add(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows")
.add(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows")
.add(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows")
.add(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows")
.add(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows")
.add(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows")
.add(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows")
.add(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows")
.add(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows")
.add(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows")
.add(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows")
.add(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows")
.add(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows")
.add(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
Set<String> paths = null;
@ -157,7 +113,7 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
context.setSolrClientCache(cache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withFunctionName("shortestPath", ShortestPathStream.class);
Map params = new HashMap();
@ -271,27 +227,26 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
assertTrue(paths.contains("[jim, stan, mary, steve]"));
cache.close();
del("*:*");
commit();
}
@Test
public void testGatherNodesStream() throws Exception {
private void testGatherNodesStream() throws Exception {
indexr(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20");
indexr(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30");
indexr(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1");
indexr(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2");
indexr(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5");
indexr(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10");
indexr(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20");
indexr(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10");
indexr(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10");
indexr(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40");
indexr(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10");
indexr(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10");
commit();
new UpdateRequest()
.add(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20")
.add(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30")
.add(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1")
.add(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2")
.add(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5")
.add(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10")
.add(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20")
.add(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10")
.add(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10")
.add(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40")
.add(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10")
.add(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
Set<String> paths = null;
@ -301,7 +256,7 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
context.setSolrClientCache(cache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("count", CountMetric.class)
@ -417,20 +372,20 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
assertTrue(tuples.get(1).getString("node").equals("basket3"));
cache.close();
del("*:*");
commit();
}
private void testGatherNodesFriendsStream() throws Exception {
@Test
public void testGatherNodesFriendsStream() throws Exception {
indexr(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim");
indexr(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam");
indexr(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max");
indexr(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip");
indexr(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve");
indexr(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve");
commit();
new UpdateRequest()
.add(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim")
.add(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam")
.add(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max")
.add(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip")
.add(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve")
.add(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
Set<String> paths = null;
@ -440,7 +395,7 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
context.setSolrClientCache(cache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("count", CountMetric.class)
@ -628,10 +583,10 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
assertTrue(tuples.get(6).getLong("level").equals(new Long(2)));
//Add a cycle from jim to bill
indexr(id, "6", "from_s", "jim", "to_s", "bill", "message_t", "Hello steve");
indexr(id, "7", "from_s", "sam", "to_s", "bill", "message_t", "Hello steve");
commit();
new UpdateRequest()
.add(id, "6", "from_s", "jim", "to_s", "bill", "message_t", "Hello steve")
.add(id, "7", "from_s", "sam", "to_s", "bill", "message_t", "Hello steve")
.commit(cluster.getSolrClient(), COLLECTION);
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
@ -676,12 +631,9 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
assertTrue(tuples.get(6).getLong("level").equals(new Long(2)));
cache.close();
del("*:*");
commit();
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList();
@ -691,9 +643,7 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
tupleStream.close();
return tuples;
}
protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
return assertOrderOf(tuples, "id", ids);
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
@ -707,56 +657,6 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
return true;
}
protected boolean assertMapOrder(List<Tuple> tuples, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
List<Map> tip = t.getMaps("group");
int id = (int)tip.get(0).get("id");
if(id != val) {
throw new Exception("Found value:"+id+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(!tuple.fields.containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
}
}
}
return true;
}
protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(tuple.fields.containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
}
}
}
return true;
}
protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
List<?> group = (List<?>)tuple.get("tuples");
int i=0;
for(int val : ids) {
Map<?,?> t = (Map<?,?>)group.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
long lv = (long)tuple.get(fieldName);
if(lv != l) {
@ -778,44 +678,4 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
return true;
}
protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
if(maps.size() != ids.length) {
throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
}
int i=0;
for(int val : ids) {
Map t = maps.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
private boolean assertList(List list, Object... vals) throws Exception {
if(list.size() != vals.length) {
throw new Exception("Lists are not the same size:"+list.size() +" : "+vals.length);
}
for(int i=0; i<list.size(); i++) {
Object a = list.get(i);
Object b = vals[i];
if(!a.equals(b)) {
throw new Exception("List items not equals:"+a+" : "+b);
}
}
return true;
}
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
}

View File

@ -17,30 +17,26 @@ package org.apache.solr.client.solrj.io.graph;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.junit.AfterClass;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Set;
import java.util.HashSet;
/**
* All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so
* SolrStream will get fully exercised through these tests.
@ -49,86 +45,57 @@ import java.util.HashSet;
@LuceneTestCase.Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class GraphTest extends AbstractFullDistribZkTestBase {
public class GraphTest extends SolrCloudTestCase {
private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
private StreamFactory streamFactory;
private static final String COLLECTION = "collection1";
static {
schemaString = "schema-streaming.xml";
}
private static final String id = "id";
private static final int TIMEOUT = 30;
@BeforeClass
public static void beforeSuperClass() {
AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
}
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
@AfterClass
public static void afterSuperClass() {
}
protected String getCloudSolrConfig() {
return "solrconfig-streaming.xml";
}
@Override
public String getSolrHome() {
return SOLR_HOME;
}
public static String SOLR_HOME() {
return SOLR_HOME;
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// we expect this time of exception as shards go up and down...
//ignoreException(".*");
//System.setProperty("export.test", "true");
System.setProperty("numShards", Integer.toString(sliceCount));
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
resetExceptionIgnores();
}
@Test
public void testShortestPathStream() throws Exception {
public GraphTest() {
super();
sliceCount = 2;
}
private void testShortestPathStream() throws Exception {
indexr(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows");
indexr(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows");
indexr(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows");
indexr(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows");
indexr(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows");
indexr(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows");
indexr(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows");
indexr(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows");
indexr(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows");
indexr(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows");
indexr(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows");
indexr(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows");
indexr(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows");
indexr(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows");
indexr(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows");
commit();
new UpdateRequest()
.add(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows")
.add(id, "1", "from_s", "jim", "to_s", "dave", "predicate_s", "knows")
.add(id, "2", "from_s", "jim", "to_s", "stan", "predicate_s", "knows")
.add(id, "3", "from_s", "dave", "to_s", "stan", "predicate_s", "knows")
.add(id, "4", "from_s", "dave", "to_s", "bill", "predicate_s", "knows")
.add(id, "5", "from_s", "dave", "to_s", "mike", "predicate_s", "knows")
.add(id, "20", "from_s", "dave", "to_s", "alex", "predicate_s", "knows")
.add(id, "21", "from_s", "alex", "to_s", "steve", "predicate_s", "knows")
.add(id, "6", "from_s", "stan", "to_s", "alice", "predicate_s", "knows")
.add(id, "7", "from_s", "stan", "to_s", "mary", "predicate_s", "knows")
.add(id, "8", "from_s", "stan", "to_s", "dave", "predicate_s", "knows")
.add(id, "10", "from_s", "mary", "to_s", "mike", "predicate_s", "knows")
.add(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows")
.add(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows")
.add(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows")
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
Set<String> paths = null;
ShortestPathStream stream = null;
String zkHost = zkServer.getZkAddress();
String zkHost = cluster.getZkServer().getZkAddress();
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
@ -260,40 +227,6 @@ public class GraphTest extends AbstractFullDistribZkTestBase {
assertTrue(paths.contains("[jim, stan, mary, steve]"));
cache.close();
del("*:*");
commit();
}
@Test
public void streamTests() throws Exception {
assertNotNull(cloudClient);
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForRecoveriesToFinish(false);
del("*:*");
commit();
testShortestPathStream();
}
protected Map mapParams(String... vals) {
Map params = new HashMap();
String k = null;
for(String val : vals) {
if(k == null) {
k = val;
} else {
params.put(k, val);
k = null;
}
}
return params;
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
@ -311,58 +244,6 @@ public class GraphTest extends AbstractFullDistribZkTestBase {
return tuples;
}
protected Tuple getTuple(TupleStream tupleStream) throws IOException {
tupleStream.open();
Tuple t = tupleStream.read();
tupleStream.close();
return t;
}
protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
List group = (List)tuple.get("tuples");
int i=0;
for(int val : ids) {
Map t = (Map)group.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertMaps(List<Map> maps, int... ids) throws Exception {
if(maps.size() != ids.length) {
throw new Exception("Expected id count != actual map count:"+ids.length+":"+maps.size());
}
int i=0;
for(int val : ids) {
Map t = maps.get(i);
Long tip = (Long)t.get("id");
if(tip.intValue() != val) {
throw new Exception("Found value:"+tip.intValue()+" expecting:"+val);
}
++i;
}
return true;
}
public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
long lv = (long)tuple.get(fieldName);
if(lv != l) {
@ -372,16 +253,5 @@ public class GraphTest extends AbstractFullDistribZkTestBase {
return true;
}
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
private void attachStreamFactory(TupleStream tupleStream) {
StreamContext streamContext = new StreamContext();
streamContext.setStreamFactory(streamFactory);
tupleStream.setStreamContext(streamContext);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.solr.client.solrj.io.sql;
import java.io.File;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@ -32,11 +31,10 @@ import java.util.Properties;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.junit.After;
import org.junit.AfterClass;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.BeforeClass;
import org.junit.Test;
@ -47,68 +45,45 @@ import org.junit.Test;
@Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class JdbcTest extends AbstractFullDistribZkTestBase {
public class JdbcTest extends SolrCloudTestCase {
private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
private static final String COLLECTION = "collection1";
private static final String id = "id";
static {
schemaString = "schema-sql.xml";
}
private static final int TIMEOUT = 30;
private static String zkHost;
@BeforeClass
public static void beforeSuperClass() {
AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME);
}
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
@AfterClass
public static void afterSuperClass() {
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "testnull_i", null)
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "testnull_i", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "testnull_i", null)
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "testnull_i", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "testnull_i", null)
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6", "testnull_i", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7", "testnull_i", null)
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8", "testnull_i", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9", "testnull_i", null)
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "testnull_i", "10")
.commit(cluster.getSolrClient(), COLLECTION);
protected String getCloudSolrConfig() {
return "solrconfig-sql.xml";
}
@Override
public String getSolrHome() {
return SOLR_HOME;
}
@Override
public void distribSetUp() throws Exception {
super.distribSetUp();
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
resetExceptionIgnores();
zkHost = cluster.getZkServer().getZkAddress();
}
@Test
@ShardsFixed(num = 2)
public void doTest() throws Exception {
waitForRecoveriesToFinish(false);
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "testnull_i", null);
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "testnull_i", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "testnull_i", null);
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "testnull_i", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "testnull_i", null);
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6", "testnull_i", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7", "testnull_i", null);
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8", "testnull_i", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9", "testnull_i", null);
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "testnull_i", "10");
commit();
String zkHost = zkServer.getZkAddress();
Properties props = new Properties();
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
@ -202,8 +177,13 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
@Test
public void testFacetAggregation() throws Exception {
//Test facet aggregation
props = new Properties();
Properties props = new Properties();
props.put("aggregationMode", "facet");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Statement stmt = con.createStatement()) {
@ -236,8 +216,13 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
@Test
public void testMapReduceAggregation() throws Exception {
//Test map / reduce aggregation
props = new Properties();
Properties props = new Properties();
props.put("aggregationMode", "map_reduce");
props.put("numWorkers", "2");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
@ -270,15 +255,20 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
}
@Test
public void testConnectionParams() throws Exception {
//Test params on the url
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost +
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost +
"?collection=collection1&aggregationMode=map_reduce&numWorkers=2")) {
Properties p = ((ConnectionImpl) con).getProperties();
assert(p.getProperty("aggregationMode").equals("map_reduce"));
assert(p.getProperty("numWorkers").equals("2"));
assert (p.getProperty("aggregationMode").equals("map_reduce"));
assert (p.getProperty("numWorkers").equals("2"));
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
@ -310,6 +300,11 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
@Test
public void testJDBCUrlParameters() throws Exception {
// Test JDBC paramters in URL
try (Connection con = DriverManager.getConnection(
"jdbc:solr://" + zkHost + "?collection=collection1&username=&password=&testKey1=testValue&testKey2")) {
@ -350,6 +345,11 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
@Test
public void testJDBCPropertiesParameters() throws Exception {
// Test JDBC paramters in properties
Properties providedProperties = new Properties();
providedProperties.put("collection", "collection1");
@ -360,10 +360,10 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost, providedProperties)) {
Properties p = ((ConnectionImpl) con).getProperties();
assert(p.getProperty("username").equals(""));
assert(p.getProperty("password").equals(""));
assert(p.getProperty("testKey1").equals("testValue"));
assert(p.getProperty("testKey2").equals(""));
assert (p.getProperty("username").equals(""));
assert (p.getProperty("password").equals(""));
assert (p.getProperty("testKey1").equals("testValue"));
assert (p.getProperty("testKey2").equals(""));
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
@ -394,10 +394,13 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
}
}
@Test
public void testErrorPropagation() throws Exception {
//Test error propagation
props = new Properties();
Properties props = new Properties();
props.put("aggregationMode", "facet");
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
try (Statement stmt = con.createStatement()) {
@ -410,20 +413,20 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
}
}
testDriverMetadata();
}
private void testDriverMetadata() throws Exception {
String collection = DEFAULT_COLLECTION;
@Test
public void testDriverMetadata() throws Exception {
String collection = COLLECTION;
String connectionString1 = "jdbc:solr://" + zkServer.getZkAddress() + "?collection=" + collection +
String connectionString1 = "jdbc:solr://" + zkHost + "?collection=" + collection +
"&username=&password=&testKey1=testValue&testKey2";
Properties properties1 = new Properties();
String sql = "select id, a_i, a_s, a_f as my_float_col, testnull_i from " + collection +
" order by a_i desc";
String connectionString2 = "jdbc:solr://" + zkServer.getZkAddress() + "?collection=" + collection +
String connectionString2 = "jdbc:solr://" + zkHost + "?collection=" + collection +
"&aggregationMode=map_reduce&numWorkers=2&username=&password=&testKey1=testValue&testKey2";
Properties properties2 = new Properties();
@ -439,9 +442,9 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
try (Connection con = DriverManager.getConnection(connectionString, properties)) {
assertTrue(con.isValid(DEFAULT_CONNECTION_TIMEOUT));
assertEquals(zkServer.getZkAddress(), con.getCatalog());
con.setCatalog(zkServer.getZkAddress());
assertEquals(zkServer.getZkAddress(), con.getCatalog());
assertEquals(zkHost, con.getCatalog());
con.setCatalog(zkHost);
assertEquals(zkHost, con.getCatalog());
assertEquals(null, con.getSchema());
con.setSchema("myschema");
@ -470,22 +473,22 @@ public class JdbcTest extends AbstractFullDistribZkTestBase {
try(ResultSet rs = databaseMetaData.getCatalogs()) {
assertTrue(rs.next());
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT"));
assertEquals(zkHost, rs.getString("TABLE_CAT"));
assertFalse(rs.next());
}
List<String> collections = new ArrayList<>();
collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
collections.addAll(cluster.getSolrClient().getZkStateReader().getClusterState().getCollections());
Collections.sort(collections);
try(ResultSet rs = databaseMetaData.getSchemas()) {
assertFalse(rs.next());
}
try(ResultSet rs = databaseMetaData.getTables(zkServer.getZkAddress(), null, "%", null)) {
try(ResultSet rs = databaseMetaData.getTables(zkHost, null, "%", null)) {
for(String acollection : collections) {
assertTrue(rs.next());
assertEquals(zkServer.getZkAddress(), rs.getString("TABLE_CAT"));
assertEquals(zkHost, rs.getString("TABLE_CAT"));
assertNull(rs.getString("TABLE_SCHEM"));
assertEquals(acollection, rs.getString("TABLE_NAME"));
assertEquals("TABLE", rs.getString("TABLE_TYPE"));

View File

@ -16,7 +16,6 @@
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@ -26,10 +25,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -38,10 +35,10 @@ import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -50,19 +47,28 @@ import org.junit.Test;
/**
*/
@Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
public class JDBCStreamTest extends SolrCloudTestCase {
private static final String SOLR_HOME = getFile("solrj" + File.separator + "solr").getAbsolutePath();
private static final String COLLECTION = "jdbc";
static {
schemaString = "schema-streaming.xml";
private static final int TIMEOUT = 30;
private static final String id = "id";
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@BeforeClass
public static void beforeSuperClass() throws Exception {
AbstractZkTestCase.SOLRHOME = new File(SOLR_HOME());
public static void setupDatabase() throws Exception {
// Initialize Database
// Ok, so.....hsqldb is doing something totally weird so I thought I'd take a moment to explain it.
@ -74,8 +80,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
// JDBCStream and is only a carryover from the driver we are testing with.
Class.forName("org.hsqldb.jdbcDriver").newInstance();
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement = connection.createStatement();
Statement statement = connection.createStatement();
statement.executeUpdate("create table COUNTRIES(CODE varchar(3) not null primary key, COUNTRY_NAME varchar(50), DELETED char(1) default 'N')");
statement.executeUpdate("create table PEOPLE(ID int not null primary key, NAME varchar(50), COUNTRY_CODE char(2), DELETED char(1) default 'N')");
statement.executeUpdate("create table PEOPLE_SPORTS(ID int not null primary key, PERSON_ID int, SPORT_NAME varchar(50), DELETED char(1) default 'N')");
@ -83,107 +88,48 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
}
@AfterClass
public static void afterSuperClass() throws SQLException {
public static void teardownDatabase() throws SQLException {
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("shutdown");
}
protected String getCloudSolrConfig() {
return "solrconfig-streaming.xml";
}
@Override
public String getSolrHome() {
return SOLR_HOME;
}
public static String SOLR_HOME() {
return SOLR_HOME;
@Before
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// we expect this time of exception as shards go up and down...
//ignoreException(".*");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
resetExceptionIgnores();
}
public JDBCStreamTest() {
super();
sliceCount = 2;
public void cleanDatabase() throws Exception {
// Clear database
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("delete from COUNTRIES WHERE 1=1");
statement.executeUpdate("delete from PEOPLE WHERE 1=1");
statement.executeUpdate("delete from PEOPLE_SPORTS WHERE 1=1");
}
}
@Test
public void testAll() throws Exception{
assertNotNull(cloudClient);
public void testJDBCSelect() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForRecoveriesToFinish(false);
// Run JDBC Only tests
testJDBCSelect();
testJDBCJoin();
// Run JDBC + Solr tests
testJDBCSolrMerge();
testJDBCSolrInnerJoinExpression();
testJDBCSolrInnerJoinRollupExpression();
testJDBCSolrInnerJoinExpressionWithProperties();
// Clear all data
clearData();
// Delete database
// done during afterSuperClass(...)
}
private void clearData() throws Exception {
// Clear Solr index
del("*:*");
commit();
// Clear database
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("delete from COUNTRIES WHERE 1=1");
statement.executeUpdate("delete from PEOPLE WHERE 1=1");
statement.executeUpdate("delete from PEOPLE_SPORTS WHERE 1=1");
statement.close();
connection.close();
}
private void testJDBCSelect() throws Exception {
clearData();
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
}
TupleStream stream;
List<Tuple> tuples;
// Simple 1
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE",
new FieldComparator("CODE", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assert(tuples.size() == 4);
@ -191,7 +137,8 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "COUNTRY_NAME", "Netherlands", "Norway", "Nepal", "United States");
// Simple 2
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by COUNTRY_NAME", new FieldComparator("COUNTRY_NAME", ComparatorOrder.ASCENDING));
stream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by COUNTRY_NAME",
new FieldComparator("COUNTRY_NAME", ComparatorOrder.ASCENDING));
tuples = getTuples(stream);
assertEquals(4, tuples.size());
@ -199,29 +146,28 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "COUNTRY_NAME", "Nepal", "Netherlands", "Norway", "United States");
}
private void testJDBCJoin() throws Exception {
clearData();
@Test
public void testJDBCJoin() throws Exception {
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','NI')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NG')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NF')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NE')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','NC')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NZ')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','NR')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','NI')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NG')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NF')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NE')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','NC')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NZ')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','NR')");
}
TupleStream stream;
List<Tuple> tuples;
@ -234,28 +180,28 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "ID", 11, 17, 19);
assertOrderOf(tuples, "NAME", "Emma", "Mia", "Olivia");
}
private void testJDBCSolrMerge() throws Exception {
clearData();
@Test
public void testJDBCSolrMerge() throws Exception {
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('AL', 'Algeria')");
}
// Load Solr
indexr(id, "0", "code_s", "GB", "name_s", "Great Britian");
indexr(id, "1", "code_s", "CA", "name_s", "Canada");
commit();
new UpdateRequest()
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
.add(id, "1", "code_s", "CA", "name_s", "Canada")
.commit(cluster.getSolrClient(), COLLECTION);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
List<Tuple> tuples;
@ -263,7 +209,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
// Simple 1
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
TupleStream searchStream = factory.constructStream("search(collection1, fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream searchStream = factory.constructStream("search(" + COLLECTION + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
tuples = getTuples(mergeStream);
@ -272,49 +218,49 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "code_s", "AL","CA","GB","NL","NO","NP","US");
assertOrderOf(tuples, "name_s", "Algeria", "Canada", "Great Britian", "Netherlands", "Norway", "Nepal", "United States");
}
private void testJDBCSolrInnerJoinExpression() throws Exception{
clearData();
@Test
public void testJDBCSolrInnerJoinExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("jdbc", JDBCStream.class);
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
indexr(id, "1", "rating_f", "3.5", "personId_i", "11");
indexr(id, "2", "rating_f", "5", "personId_i", "12");
indexr(id, "3", "rating_f", "2.2", "personId_i", "13");
indexr(id, "4", "rating_f", "4.3", "personId_i", "14");
indexr(id, "5", "rating_f", "3.5", "personId_i", "15");
indexr(id, "6", "rating_f", "3", "personId_i", "16");
indexr(id, "7", "rating_f", "3", "personId_i", "17");
indexr(id, "8", "rating_f", "4", "personId_i", "18");
indexr(id, "9", "rating_f", "4.1", "personId_i", "19");
indexr(id, "10", "rating_f", "4.8", "personId_i", "20");
commit();
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
String expression;
TupleStream stream;
@ -324,7 +270,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
expression =
"innerJoin("
+ " select("
+ " search(collection1, fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -347,48 +293,48 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
}
private void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{
clearData();
@Test
public void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("jdbc", JDBCStream.class);
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
indexr(id, "1", "rating_f", "3.5", "personId_i", "11");
indexr(id, "2", "rating_f", "5", "personId_i", "12");
indexr(id, "3", "rating_f", "2.2", "personId_i", "13");
indexr(id, "4", "rating_f", "4.3", "personId_i", "14");
indexr(id, "5", "rating_f", "3.5", "personId_i", "15");
indexr(id, "6", "rating_f", "3", "personId_i", "16");
indexr(id, "7", "rating_f", "3", "personId_i", "17");
indexr(id, "8", "rating_f", "4", "personId_i", "18");
indexr(id, "9", "rating_f", "4.1", "personId_i", "19");
indexr(id, "10", "rating_f", "4.8", "personId_i", "20");
commit();
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
String expression;
TupleStream stream;
@ -401,7 +347,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
expression =
"innerJoin("
+ " select("
+ " search(collection1, fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -430,7 +376,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
expression =
"innerJoin("
+ " select("
+ " search(collection1, fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -453,12 +399,11 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
assertOrderOf(tuples, "country", "Netherlands","United States","Netherlands","Netherlands","Netherlands","United States","United States","Netherlands","Netherlands","United States");
}
private void testJDBCSolrInnerJoinRollupExpression() throws Exception{
clearData();
@Test
public void testJDBCSolrInnerJoinRollupExpression() throws Exception{
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
@ -471,38 +416,37 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
;
// Load Database Data
Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement();
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
statement.close();
connection.close();
try (Connection connection = DriverManager.getConnection("jdbc:hsqldb:mem:.");
Statement statement = connection.createStatement()) {
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('US', 'United States')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NL', 'Netherlands')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NP', 'Nepal')");
statement.executeUpdate("insert into COUNTRIES (CODE,COUNTRY_NAME) values ('NO', 'Norway')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (11,'Emma','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (12,'Grace','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (13,'Hailey','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (14,'Isabella','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (15,'Lily','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (16,'Madison','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (17,'Mia','US')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (18,'Natalie','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (19,'Olivia','NL')");
statement.executeUpdate("insert into PEOPLE (ID, NAME, COUNTRY_CODE) values (20,'Samantha','US')");
}
// Load solr data
indexr(id, "1", "rating_f", "3.5", "personId_i", "11");
indexr(id, "3", "rating_f", "2.2", "personId_i", "13");
indexr(id, "4", "rating_f", "4.3", "personId_i", "14");
indexr(id, "5", "rating_f", "3.5", "personId_i", "15");
indexr(id, "8", "rating_f", "4", "personId_i", "18");
indexr(id, "9", "rating_f", "4.1", "personId_i", "19");
indexr(id, "2", "rating_f", "5", "personId_i", "12");
indexr(id, "6", "rating_f", "3", "personId_i", "16");
indexr(id, "7", "rating_f", "3", "personId_i", "17");
indexr(id, "10", "rating_f", "4.8", "personId_i", "20");
commit();
new UpdateRequest()
.add(id, "1", "rating_f", "3.5", "personId_i", "11")
.add(id, "3", "rating_f", "2.2", "personId_i", "13")
.add(id, "4", "rating_f", "4.3", "personId_i", "14")
.add(id, "5", "rating_f", "3.5", "personId_i", "15")
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "2", "rating_f", "5", "personId_i", "12")
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
.commit(cluster.getSolrClient(), COLLECTION);
String expression;
TupleStream stream;
@ -513,7 +457,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
"rollup("
+ " hashJoin("
+ " hashed=select("
+ " search(collection1, fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@ -562,6 +506,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
tupleStream.close();
return tuples;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, int... values) throws Exception {
int i = 0;
for(int val : values) {
@ -574,6 +519,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
}
return true;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, double... values) throws Exception {
int i = 0;
for(double val : values) {
@ -586,6 +532,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
}
return true;
}
protected boolean assertOrderOf(List<Tuple> tuples, String fieldName, String... values) throws Exception {
int i = 0;
for(String val : values) {
@ -617,6 +564,7 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
}
return true;
}
protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
@ -649,9 +597,4 @@ public class JDBCStreamTest extends AbstractFullDistribZkTestBase {
return true;
}
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
}