SOLR-4112: Fixed DataImportHandler ZKAwarePropertiesWriter implementation so

import works fine with SolrCloud clusters

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1432045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2013-01-11 13:38:23 +00:00
parent de17206f02
commit f41e563d7d
6 changed files with 183 additions and 10 deletions

View File

@ -499,6 +499,10 @@ Bug Fixes
* SOLR-3876: Solr Admin UI is completely dysfunctional on IE 9 (steffkes)
* SOLR-4112: Fixed DataImportHandler ZKAwarePropertiesWriter implementation so
import works fine with SolrCloud clusters (Deniz Durmus, James Dyer,
Erick Erickson, shalin)
Other Changes
----------------------

View File

@ -84,12 +84,7 @@ public class SimplePropertiesWriter extends DIHProperties {
} else {
filename = "dataimport.properties";
}
if(params.get(DIRECTORY) != null) {
configDir = params.get(DIRECTORY);
} else {
SolrCore core = dataImporter.getCore();
configDir = (core == null ? "." : core.getResourceLoader().getConfigDir());
}
findDirectory(dataImporter, params);
if(params.get(LOCALE) != null) {
String localeStr = params.get(LOCALE);
for (Locale l : Locale.getAvailableLocales()) {
@ -109,6 +104,14 @@ public class SimplePropertiesWriter extends DIHProperties {
} else {
dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", locale);
}
}
protected void findDirectory(DataImporter dataImporter, Map<String, String> params) {
if(params.get(DIRECTORY) != null) {
configDir = params.get(DIRECTORY);
} else {
SolrCore core = dataImporter.getCore();
configDir = (core == null ? "." : core.getResourceLoader().getConfigDir());
}
}
private File getPersistFile() {

View File

@ -42,12 +42,16 @@ public class ZKPropertiesWriter extends SimplePropertiesWriter {
@Override
public void init(DataImporter dataImporter, Map<String, String> params) {
super.init(dataImporter, params);
super.init(dataImporter, params);
zkClient = dataImporter.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getZkClient();
}
@Override
protected void findDirectory(DataImporter dataImporter, Map<String, String> params) {
String collection = dataImporter.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName();
path = "/configs/" + collection + "/" + filename;
zkClient = dataImporter.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getZkClient();
}
@Override

View File

@ -41,6 +41,8 @@
<field name="COUNTRY_NAME" type="text" indexed="true" stored="true" multiValued="true" />
<field name="SPORT_NAME" type="text" indexed="true" stored="true" multiValued="true" />
<field name="DO_NOT_INDEX" type="ignored" />
<field name="_version_" type="tlong" indexed="true" stored="true" multiValued="false"/>
<dynamicField name="*_i" type="tint" indexed="true" stored="true"/>
<dynamicField name="*_s" type="string" indexed="true" stored="true"/>

View File

@ -0,0 +1,160 @@
package org.apache.solr.handler.dataimport;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
public class TestZKPropertiesWriter extends AbstractDataImportHandlerTestCase {
protected static ZkTestServer zkServer;
protected static String zkDir;
private String dateFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS";
@BeforeClass
public static void dihZk_beforeClass() throws Exception {
System.out.println("Initializing DIH ZK stuff");
createTempDir();
zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
zkServer = new ZkTestServer(zkDir);
zkServer.run();
System.setProperty("solrcloud.skip.autorecovery", "true");
System.setProperty("zkHost", zkServer.getZkAddress());
System.setProperty("jetty.port", "0000");
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), getFile("dih/solr"),
"dataimport-solrconfig.xml", "dataimport-schema.xml");
initCore("dataimport-solrconfig.xml", "dataimport-schema.xml", getFile("dih/solr").getAbsolutePath());
}
@Before
public void beforeDihZKTest() throws Exception {
clearIndex();
assertU(commit());
}
@After
public void afterDihZkTest() throws Exception {
MockDataSource.clearCache();
}
@AfterClass
public static void dihZk_afterClass() throws Exception {
zkServer.shutdown();
zkServer = null;
zkDir = null;
// wait just a bit for any zk client threads to outlast timeout
Thread.sleep(2000);
}
@Test
public void testZKPropertiesWriter() throws Exception {
// test using ZooKeeper
assertTrue("Not using ZooKeeper", h.getCoreContainer().isZooKeeperAware());
// for the really slow/busy computer, we wait to make sure we have a leader before starting
h.getCoreContainer().getZkController().getZkStateReader().getLeaderUrl("collection1", "shard1", 30000);
assertQ("test query on empty index", request("qlkciyopsbgzyvkylsjhchghjrdf"),
"//result[@numFound='0']");
SimpleDateFormat errMsgFormat = new SimpleDateFormat(dateFormat, Locale.ROOT);
delQ("*:*");
commit();
SimpleDateFormat df = new SimpleDateFormat(dateFormat, Locale.ROOT);
Date oneSecondAgo = new Date(System.currentTimeMillis() - 1000);
Map<String, String> init = new HashMap<String, String>();
init.put("dateFormat", dateFormat);
ZKPropertiesWriter spw = new ZKPropertiesWriter();
spw.init(new DataImporter(h.getCore(), "dataimport"), init);
Map<String, Object> props = new HashMap<String, Object>();
props.put("SomeDates.last_index_time", oneSecondAgo);
props.put("last_index_time", oneSecondAgo);
spw.persist(props);
List rows = new ArrayList();
rows.add(createMap("id", "1", "year_s", "2013"));
MockDataSource.setIterator("select " + df.format(oneSecondAgo) + " from dummy", rows.iterator());
h.query("/dataimport", lrf.makeRequest("command", "full-import", "dataConfig",
generateConfig(), "clean", "true", "commit", "true", "synchronous",
"true", "indent", "true"));
props = spw.readIndexerProperties();
Date entityDate = df.parse((String) props.get("SomeDates.last_index_time"));
Date docDate = df.parse((String) props.get("last_index_time"));
Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the document date: " + errMsgFormat.format(docDate), docDate.getTime() - oneSecondAgo.getTime() > 0);
Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the entity date: " + errMsgFormat.format(entityDate), entityDate.getTime() - oneSecondAgo.getTime() > 0);
assertQ(request("*:*"), "//*[@numFound='1']", "//doc/str[@name=\"year_s\"]=\"2013\"");
}
public SolrQueryRequest request(String... q) {
LocalSolrQueryRequest req = lrf.makeRequest(q);
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(req.getParams());
params.set("distrib", true);
req.setParams(params);
return req;
}
protected String generateConfig() {
StringBuilder sb = new StringBuilder();
sb.append("<dataConfig> \n");
sb.append("<propertyWriter dateFormat=\"" + dateFormat + "\" type=\"ZKPropertiesWriter\" />\n");
sb.append("<dataSource name=\"mock\" type=\"MockDataSource\"/>\n");
sb.append("<document name=\"TestSimplePropertiesWriter\"> \n");
sb.append("<entity name=\"SomeDates\" processor=\"SqlEntityProcessor\" dataSource=\"mock\" ");
sb.append("query=\"select ${dih.last_index_time} from dummy\" >\n");
sb.append("<field column=\"AYEAR_S\" name=\"year_s\" /> \n");
sb.append("</entity>\n");
sb.append("</document> \n");
sb.append("</dataConfig> \n");
String config = sb.toString();
log.debug(config);
return config;
}
}

View File

@ -84,7 +84,7 @@ public abstract class AbstractZkTestCase extends SolrTestCaseJ4 {
}
// static to share with distrib test
static void buildZooKeeper(String zkHost, String zkAddress, File solrhome, String config,
public static void buildZooKeeper(String zkHost, String zkAddress, File solrhome, String config,
String schema) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
zkClient.makePath("/solr", false, true);