SOLR-3165: make DIH work with solrcloud

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1294990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sami Siren 2012-02-29 07:07:40 +00:00
parent 6dc05950ae
commit dcaf6be3e4
4 changed files with 110 additions and 6 deletions

View File

@ -261,6 +261,9 @@ Optimizations
Bug Fixes Bug Fixes
---------------------- ----------------------
* SOLR-3165: Cannot use DIH in Solrcloud + Zookeeper (Alexey Serba,
Mark Miller, siren)
* SOLR-3068: Occasional NPE in ThreadDumpHandler (siren) * SOLR-3068: Occasional NPE in ThreadDumpHandler (siren)
* SOLR-2762: FSTLookup could return duplicate results or one results less * SOLR-2762: FSTLookup could return duplicate results or one results less

View File

@ -28,7 +28,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SystemIdResolver; import org.apache.solr.common.util.SystemIdResolver;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.RequestHandlerBase;
@ -39,7 +38,6 @@ import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.request.SolrRequestHandler; import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.plugin.SolrCoreAware; import org.apache.solr.util.plugin.SolrCoreAware;
import java.util.*; import java.util.*;
@ -109,7 +107,7 @@ public class DataImportHandler extends RequestHandlerBase implements
String configLoc = (String) defaults.get("config"); String configLoc = (String) defaults.get("config");
if (configLoc != null && configLoc.length() != 0) { if (configLoc != null && configLoc.length() != 0) {
processConfiguration(defaults); processConfiguration(defaults);
final InputSource is = new InputSource(core.getResourceLoader().openConfig(configLoc)); final InputSource is = new InputSource(core.getResourceLoader().openResource(configLoc));
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc)); is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc));
importer = new DataImporter(is, core, importer = new DataImporter(is, core,
dataSources, coreScopeSession, myName); dataSources, coreScopeSession, myName);

View File

@ -19,7 +19,6 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
@ -96,11 +95,20 @@ public class DataImporter {
*/ */
DataImporter() { DataImporter() {
coreScopeSession = new ConcurrentHashMap<String, Object>(); coreScopeSession = new ConcurrentHashMap<String, Object>();
this.propWriter = new SimplePropertiesWriter(); createPropertyWriter();
propWriter.init(this); propWriter.init(this);
this.handlerName = "dataimport" ; this.handlerName = "dataimport" ;
} }
private void createPropertyWriter() {
if (this.core == null
|| !this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
propWriter = new SimplePropertiesWriter();
} else {
propWriter = new ZKPropertiesWriter();
}
}
DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) { DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
this.handlerName = handlerName; this.handlerName = handlerName;
if (dataConfig == null) if (dataConfig == null)
@ -108,7 +116,7 @@ public class DataImporter {
"Configuration not found"); "Configuration not found");
this.core = core; this.core = core;
this.schema = core.getSchema(); this.schema = core.getSchema();
this.propWriter = new SimplePropertiesWriter(); createPropertyWriter();
propWriter.init(this); propWriter.init(this);
dataSourceProps = ds; dataSourceProps = ds;
if (session == null) if (session == null)

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Properties;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKPropertiesWriter implements DIHPropertiesWriter {
private static final Logger log = LoggerFactory
.getLogger(ZKPropertiesWriter.class);
private String path;
private SolrZkClient zkClient;
@Override
public void init(DataImporter dataImporter) {
String collection = dataImporter.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName();
String persistFilename;
if(dataImporter.getHandlerName() != null){
persistFilename = dataImporter.getHandlerName() + ".properties";
} else {
persistFilename = SimplePropertiesWriter.IMPORTER_PROPERTIES;
}
path = "/configs/" + collection + "/" + persistFilename;
zkClient = dataImporter.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getZkClient();
}
@Override
public boolean isWritable() {
return true;
}
@Override
public void persist(Properties props) {
Properties existing = readIndexerProperties();
existing.putAll(props);
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
existing.store(output, "");
byte[] bytes = output.toByteArray();
if (!zkClient.exists(path, false)) {
try {
zkClient.makePath(path, false);
} catch (NodeExistsException e) {}
}
zkClient.setData(path, bytes, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(
"Could not persist properties to " + path + " :" + e.getClass(), e);
} catch (Exception e) {
log.warn(
"Could not persist properties to " + path + " :" + e.getClass(), e);
}
}
@Override
public Properties readIndexerProperties() {
Properties props = new Properties();
try {
byte[] data = zkClient.getData(path, null, null, false);
if (data != null) {
ByteArrayInputStream input = new ByteArrayInputStream(data);
props.load(input);
}
} catch (Throwable e) {
log.warn(
"Could not read DIH properties from " + path + " :" + e.getClass(), e);
}
return props;
}
}