SOLR-2115: more flexible loading of DIH configuration

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1368993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Dyer 2012-08-03 14:45:20 +00:00
parent 91833f0bc8
commit 386c8ddeac
8 changed files with 141 additions and 133 deletions

View File

@ -210,6 +210,14 @@ Other Changes
* SOLR-3682: Fail to parse schema.xml if uniqueKeyField is multivalued (hossman)
* SOLR-2115: DIH no longer requires the "config" parameter to be specified in solrconfig.xml.
Instead, the configuration is loaded and parsed with every import. This allows the use of
a different configuration with each import, and makes correcting configuration errors simpler.
Also, the configuration itself can be passed using the "dataConfig" parameter rather than
using a file (this previously worked in debug mode only). When configuration errors are
encountered, the error message is returned in XML format. (James Dyer)
================== 4.0.0-ALPHA ==================
More information about this release, including any errata related to the
release notes, upgrade instructions, or other changes may be found online at:

View File

@ -152,7 +152,7 @@ public class ContextImpl extends Context {
}
} else if (SCOPE_SOLR_CORE.equals(scope)){
if(dataImporter != null) {
dataImporter.getCoreScopeSession().put(name, val);
dataImporter.putToCoreScopeSession(name, val);
}
}
}
@ -171,7 +171,7 @@ public class ContextImpl extends Context {
DocBuilder.DocWrapper doc = getDocument();
return doc == null ? null: doc.getSessionAttribute(name);
} else if (SCOPE_SOLR_CORE.equals(scope)){
return dataImporter == null ? null : dataImporter.getCoreScopeSession().get(name);
return dataImporter == null ? null : dataImporter.getFromCoreScopeSession(name);
}
return null;
}

View File

@ -71,14 +71,10 @@ public class DataImportHandler extends RequestHandlerBase implements
private DataImporter importer;
private Map<String, Properties> dataSources = new HashMap<String, Properties>();
private boolean debugEnabled = true;
private String myName = "dataimport";
private Map<String , Object> coreScopeSession = new HashMap<String, Object>();
@Override
@SuppressWarnings("unchecked")
public void init(NamedList args) {
@ -102,21 +98,10 @@ public class DataImportHandler extends RequestHandlerBase implements
}
}
debugEnabled = StrUtils.parseBool((String)initArgs.get(ENABLE_DEBUG), true);
NamedList defaults = (NamedList) initArgs.get("defaults");
if (defaults != null) {
String configLoc = (String) defaults.get("config");
if (configLoc != null && configLoc.length() != 0) {
processConfiguration(defaults);
final InputSource is = new InputSource(core.getResourceLoader().openResource(configLoc));
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc));
importer = new DataImporter(is, core,
dataSources, coreScopeSession, myName);
}
}
importer = new DataImporter(core, myName);
} catch (Throwable e) {
LOG.error( DataImporter.MSG.LOAD_EXP, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
DataImporter.MSG.INVALID_CONFIG, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, DataImporter.MSG.LOAD_EXP, e);
}
}
@ -136,48 +121,35 @@ public class DataImportHandler extends RequestHandlerBase implements
}
}
SolrParams params = req.getParams();
NamedList defaultParams = (NamedList) initArgs.get("defaults");
RequestInfo requestParams = new RequestInfo(getParamsMap(params), contentStream);
String command = requestParams.getCommand();
if (DataImporter.SHOW_CONF_CMD.equals(command)) {
// Modify incoming request params to add wt=raw
ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
rawParams.set(CommonParams.WT, "raw");
req.setParams(rawParams);
String dataConfigFile = defaults.get("config");
ContentStreamBase content = new ContentStreamBase.StringStream(SolrWriter
.getResourceAsString(req.getCore().getResourceLoader().openResource(
dataConfigFile)));
rsp.add(RawResponseWriter.CONTENT, content);
if (DataImporter.SHOW_CONF_CMD.equals(command)) {
String dataConfigFile = params.get("config");
String dataConfig = params.get("dataConfig");
if(dataConfigFile != null) {
dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));
}
if(dataConfig==null) {
rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);
} else {
// Modify incoming request params to add wt=raw
ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
rawParams.set(CommonParams.WT, "raw");
req.setParams(rawParams);
ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);
rsp.add(RawResponseWriter.CONTENT, content);
}
return;
}
rsp.add("initArgs", initArgs);
String message = "";
if (command != null)
if (command != null) {
rsp.add("command", command);
if (requestParams.isDebug() && (importer == null || !importer.isBusy())) {
// Reload the data-config.xml
importer = null;
if (requestParams.getDataConfig() != null) {
try {
processConfiguration((NamedList) initArgs.get("defaults"));
importer = new DataImporter(new InputSource(new StringReader(requestParams.getDataConfig())), req.getCore()
, dataSources, coreScopeSession, myName);
} catch (RuntimeException e) {
rsp.add("exception", DebugLogger.getStacktraceString(e));
importer = null;
return;
}
} else {
inform(req.getCore());
}
message = DataImporter.MSG.CONFIG_RELOADED;
}
// If importer is still null
if (importer == null) {
rsp.add("status", DataImporter.MSG.NO_INIT);
@ -192,7 +164,7 @@ public class DataImportHandler extends RequestHandlerBase implements
if (DataImporter.FULL_IMPORT_CMD.equals(command)
|| DataImporter.DELTA_IMPORT_CMD.equals(command) ||
IMPORT_CMD.equals(command)) {
importer.maybeReloadConfiguration(requestParams, defaultParams);
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
@ -219,10 +191,12 @@ public class DataImportHandler extends RequestHandlerBase implements
importer.runCmd(requestParams, sw);
}
}
} else if (DataImporter.RELOAD_CONF_CMD.equals(command)) {
importer = null;
inform(req.getCore());
message = DataImporter.MSG.CONFIG_RELOADED;
} else if (DataImporter.RELOAD_CONF_CMD.equals(command)) {
if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {
message = DataImporter.MSG.CONFIG_RELOADED;
} else {
message = DataImporter.MSG.CONFIG_NOT_RELOADED;
}
}
}
rsp.add("status", importer.isBusy() ? "busy" : "idle");
@ -248,36 +222,6 @@ public class DataImportHandler extends RequestHandlerBase implements
return result;
}
@SuppressWarnings("unchecked")
private void processConfiguration(NamedList defaults) {
if (defaults == null) {
LOG.info("No configuration specified in solrconfig.xml for DataImportHandler");
return;
}
LOG.info("Processing configuration from solrconfig.xml: " + defaults);
dataSources = new HashMap<String, Properties>();
int position = 0;
while (position < defaults.size()) {
if (defaults.getName(position) == null)
break;
String name = defaults.getName(position);
if (name.equals("datasource")) {
NamedList dsConfig = (NamedList) defaults.getVal(position);
Properties props = new Properties();
for (int i = 0; i < dsConfig.size(); i++)
props.put(dsConfig.getName(i), dsConfig.getVal(i));
LOG.info("Adding properties to datasource: " + props);
dataSources.put((String) dsConfig.get("name"), props);
}
position++;
}
}
private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final RequestInfo requestParams, SolrQueryRequest req) {

View File

@ -22,6 +22,8 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.util.SystemIdResolver;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.XMLErrorLogger;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
@ -41,9 +43,12 @@ import org.apache.commons.io.IOUtils;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.IOException;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@ -67,14 +72,14 @@ public class DataImporter {
private DIHConfiguration config;
private Date indexStartTime;
private Properties store = new Properties();
private Map<String, Properties> dataSourceProps = new HashMap<String, Properties>();
private Map<String, Map<String,String>> requestLevelDataSourceProps = new HashMap<String, Map<String,String>>();
private IndexSchema schema;
public DocBuilder docBuilder;
public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
private SolrCore core;
private Map<String, Object> coreScopeSession = new ConcurrentHashMap<String,Object>();
private DIHPropertiesWriter propWriter;
private ReentrantLock importLock = new ReentrantLock();
private final Map<String , Object> coreScopeSession;
private boolean isDeltaImportSupported = false;
private final String handlerName;
private Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
@ -83,12 +88,19 @@ public class DataImporter {
* Only for testing purposes
*/
DataImporter() {
coreScopeSession = new HashMap<String, Object>();
createPropertyWriter();
propWriter.init(this);
this.handlerName = "dataimport" ;
}
DataImporter(SolrCore core, String handlerName) {
this.handlerName = handlerName;
this.core = core;
this.schema = core.getSchema();
loadSchemaFieldMap();
createPropertyWriter();
}
private void createPropertyWriter() {
if (this.core == null
|| !this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
@ -99,27 +111,58 @@ public class DataImporter {
propWriter.init(this);
}
DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
this.handlerName = handlerName;
if (dataConfig == null) {
throw new DataImportHandlerException(SEVERE, "Configuration not found");
}
this.core = core;
this.schema = core.getSchema();
loadSchemaFieldMap();
createPropertyWriter();
dataSourceProps = ds;
if (session == null)
session = new HashMap<String, Object>();
coreScopeSession = session;
loadDataConfig(dataConfig);
for (Entity e : config.getEntities()) {
if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
isDeltaImportSupported = true;
break;
boolean maybeReloadConfiguration(RequestInfo params,
NamedList<?> defaultParams) throws IOException {
if (importLock.tryLock()) {
boolean success = false;
try {
String dataConfigText = params.getDataConfig();
String dataconfigFile = (String) params.getConfigFile();
InputSource is = null;
if(dataConfigText!=null && dataConfigText.length()>0) {
is = new InputSource(new StringReader(dataConfigText));
} else if(dataconfigFile!=null) {
is = new InputSource(core.getResourceLoader().openResource(dataconfigFile));
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(dataconfigFile));
LOG.info("Loading DIH Configuration: " + dataconfigFile);
}
if(is!=null) {
loadDataConfig(is);
success = true;
}
Map<String,Map<String,String>> dsProps = new HashMap<String,Map<String,String>>();
if(defaultParams!=null) {
int position = 0;
while (position < defaultParams.size()) {
if (defaultParams.getName(position) == null) {
break;
}
String name = defaultParams.getName(position);
if (name.equals("datasource")) {
success = true;
NamedList dsConfig = (NamedList) defaultParams.getVal(position);
LOG.info("Getting configuration for Global Datasource...");
Map<String,String> props = new HashMap<String,String>();
for (int i = 0; i < dsConfig.size(); i++) {
props.put(dsConfig.getName(i), dsConfig.getVal(i).toString());
}
LOG.info("Adding properties to datasource: " + props);
dsProps.put((String) dsConfig.get("name"), props);
}
position++;
}
}
requestLevelDataSourceProps = Collections.unmodifiableMap(dsProps);
} catch(IOException ioe) {
throw ioe;
} finally {
importLock.unlock();
}
return success;
} else {
return false;
}
}
@ -188,7 +231,13 @@ public class DataImporter {
LOG.info("Data Configuration loaded successfully");
} catch (Exception e) {
throw new DataImportHandlerException(SEVERE,
"Exception occurred while initializing context", e);
"Data Config problem: " + e.getMessage(), e);
}
for (Entity e : config.getEntities()) {
if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
isDeltaImportSupported = true;
break;
}
}
}
@ -196,7 +245,7 @@ public class DataImporter {
DIHConfiguration config;
List<Map<String, String >> functions = new ArrayList<Map<String ,String>>();
Script script = null;
Map<String, Properties> dataSources = new HashMap<String, Properties>();
Map<String, Map<String,String>> dataSources = new HashMap<String, Map<String,String>>();
NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
if(dataConfigTags == null || dataConfigTags.getLength() == 0) {
@ -232,16 +281,16 @@ public class DataImporter {
List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, DATA_SRC);
if (!dataSourceTags.isEmpty()) {
for (Element element : dataSourceTags) {
Properties p = new Properties();
Map<String,String> p = new HashMap<String,String>();
HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
for (Map.Entry<String, String> entry : attrs.entrySet()) {
p.setProperty(entry.getKey(), entry.getValue());
p.put(entry.getKey(), entry.getValue());
}
dataSources.put(p.getProperty("name"), p);
dataSources.put(p.get("name"), p);
}
}
if(dataSources.get(null) == null){
for (Properties properties : dataSources.values()) {
for (Map<String,String> properties : dataSources.values()) {
dataSources.put(null,properties);
break;
}
@ -270,17 +319,17 @@ public class DataImporter {
}
DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
Properties p = dataSourceProps.get(name);
Map<String,String> p = requestLevelDataSourceProps.get(name);
if (p == null)
p = config.getDataSources().get(name);
if (p == null)
p = dataSourceProps.get(null);// for default data source
p = requestLevelDataSourceProps.get(null);// for default data source
if (p == null)
p = config.getDataSources().get(null);
if (p == null)
throw new DataImportHandlerException(SEVERE,
"No dataSource :" + name + " available for entity :" + key.getName());
String type = p.getProperty(TYPE);
String type = p.get(TYPE);
DataSource dataSrc = null;
if (type == null) {
dataSrc = new JdbcDataSource();
@ -458,6 +507,8 @@ public class DataImporter {
public static final String DEBUG_NOT_ENABLED = "Debug not enabled. Add a tag <str name=\"enableDebug\">true</str> in solrconfig.xml";
public static final String CONFIG_RELOADED = "Configuration Re-loaded sucessfully";
public static final String CONFIG_NOT_RELOADED = "Configuration NOT Re-loaded...Data Importer is busy.";
public static final String TOTAL_DOC_PROCESSED = "Total Documents Processed";
@ -476,13 +527,16 @@ public class DataImporter {
return schema;
}
Map<String, Object> getCoreScopeSession() {
return coreScopeSession;
}
SolrCore getCore() {
return core;
}
void putToCoreScopeSession(String key, Object val) {
coreScopeSession.put(key, val);
}
Object getFromCoreScopeSession(String key) {
return coreScopeSession.get(key);
}
public static final String COLUMN = "column";

View File

@ -36,6 +36,7 @@ public class RequestInfo {
private final boolean clean;
private final List<String> entitiesToRun;
private final Map<String,Object> rawParams;
private final String configFile;
private final String dataConfig;
//TODO: find a different home for these two...
@ -98,7 +99,8 @@ public class RequestInfo {
} else {
entitiesToRun = null;
}
String configFileParam = (String) requestParams.get("config");
configFile = configFileParam;
String dataConfigParam = (String) requestParams.get("dataConfig");
if (dataConfigParam != null && dataConfigParam.trim().length() == 0) {
// Empty data-config param is not valid, change it to null
@ -161,4 +163,8 @@ public class RequestInfo {
public DebugInfo getDebugInfo() {
return debugInfo;
}
public String getConfigFile() {
return configFile;
}
}

View File

@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
@ -49,8 +48,8 @@ public class DIHConfiguration {
private final String onImportEnd;
private final List<Map<String, String>> functions;
private final Script script;
private final Map<String, Properties> dataSources;
public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Properties> dataSources) {
private final Map<String, Map<String,String>> dataSources;
public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Map<String,String>> dataSources) {
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
@ -90,7 +89,7 @@ public class DIHConfiguration {
public List<Map<String,String>> getFunctions() {
return functions;
}
public Map<String,Properties> getDataSources() {
public Map<String,Map<String,String>> getDataSources() {
return dataSources;
}
public Script getScript() {

View File

@ -31,11 +31,7 @@
<str name="echoParams">explicit</str>
</lst>
</requestHandler>
<requestHandler name="/dataimport-end-to-end" class="org.apache.solr.handler.dataimport.DataImportHandler">
<lst name="defaults">
<str name="config">data-config-end-to-end.xml</str>
</lst>
</requestHandler>
<requestHandler name="/dataimport-end-to-end" class="org.apache.solr.handler.dataimport.DataImportHandler" />
<requestHandler name="/search" class="org.apache.solr.handler.component.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>

View File

@ -31,7 +31,8 @@ public class TestDIHEndToEnd extends AbstractDIHJdbcTestCase {
}
@Test
public void testEndToEnd() throws Exception {
LocalSolrQueryRequest request = lrf.makeRequest("command", "full-import",
LocalSolrQueryRequest request = lrf.makeRequest(
"command", "full-import", "config", "data-config-end-to-end.xml",
"clean", "true", "commit", "true", "synchronous", "true", "indent", "true");
h.query("/dataimport-end-to-end", request);
assertQ(req("*:*"), "//*[@numFound='20']");