SOLR-3422: Refactor DIH - configuration data

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1332292 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Dyer 2012-04-30 16:59:30 +00:00
parent 30bc808366
commit fc2749f24a
35 changed files with 1557 additions and 932 deletions

View File

@ -17,6 +17,7 @@
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.handler.dataimport.config.Entity;
import org.junit.Ignore;
import org.junit.Test;
@ -63,10 +64,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("processAttachement", "false");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top1 did not return 2 messages", swi.docs.size(), 2);
@ -80,10 +79,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("processAttachement", "false");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its children did not return 8 messages", swi.docs.size(), 8);
@ -98,10 +95,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("exclude", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 5 messages", swi.docs.size(), 5);
@ -116,10 +111,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("include", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);
@ -135,10 +128,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("include", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);
@ -153,10 +144,8 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
paramMap.put("fetchMailsSince", "2008-12-26 00:00:00");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
DataConfig.Entity ent = di.getConfig().document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);

View File

@ -40,7 +40,7 @@ public class TestTikaEntityProcessor extends AbstractDataImportHandlerTestCase {
"<dataConfig>" +
" <dataSource type=\"BinFileDataSource\"/>" +
" <document>" +
" <entity processor=\"TikaEntityProcessor\" url=\"" + getFile("dihextras/solr-word.pdf").getAbsolutePath() + "\" >" +
" <entity name=\"Tika\" processor=\"TikaEntityProcessor\" url=\"" + getFile("dihextras/solr-word.pdf").getAbsolutePath() + "\" >" +
" <field column=\"Author\" meta=\"true\" name=\"author\"/>" +
" <field column=\"title\" meta=\"true\" name=\"title\"/>" +
" <field column=\"text\"/>" +
@ -67,17 +67,4 @@ public class TestTikaEntityProcessor extends AbstractDataImportHandlerTestCase {
assertQ(req("*:*"), tests );
}
@Test
public void testIndexingWithTikaEntityProcessorThreaded() throws Exception {
DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(conf)));
((Element) doc.getElementsByTagName("entity").item(0)).setAttribute("threads", "1");
Transformer trans = TransformerFactory.newInstance().newTransformer();
StringWriter writer = new StringWriter();
trans.transform(new DOMSource(doc), new StreamResult(writer));
runFullImport(writer.toString());
assertQ(req("*:*"), tests );
}
}

View File

@ -10,7 +10,10 @@ HTTP data sources quick and easy.
$Id$
================== 4.0.0-dev ==============
(No Changes)
Other Changes
----------------------
* SOLR-3422: Refactored internal data classes.
All entities in data-config.xml must have a name (James Dyer)
================== 3.6.0 ==================

View File

@ -45,7 +45,7 @@ public class BinContentStreamDataSource extends DataSource<InputStream> {
@Override
public InputStream getData(String query) {
contentStream = context.getDocBuilder().requestParameters.contentStream;
contentStream = context.getDocBuilder().getReqParams().getContentStream();
if (contentStream == null)
throw new DataImportHandlerException(SEVERE, "No stream available. The request has no body");
try {

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class ConfigParseUtil {
public static String getStringAttribute(Element e, String name, String def) {
String r = e.getAttribute(name);
if (r == null || "".equals(r.trim()))
r = def;
return r;
}
public static HashMap<String, String> getAllAttributes(Element e) {
HashMap<String, String> m = new HashMap<String, String>();
NamedNodeMap nnm = e.getAttributes();
for (int i = 0; i < nnm.getLength(); i++) {
m.put(nnm.item(i).getNodeName(), nnm.item(i).getNodeValue());
}
return m;
}
public static String getText(Node elem, StringBuilder buffer) {
if (elem.getNodeType() != Node.CDATA_SECTION_NODE) {
NodeList childs = elem.getChildNodes();
for (int i = 0; i < childs.getLength(); i++) {
Node child = childs.item(i);
short childType = child.getNodeType();
if (childType != Node.COMMENT_NODE
&& childType != Node.PROCESSING_INSTRUCTION_NODE) {
getText(child, buffer);
}
}
} else {
buffer.append(elem.getNodeValue());
}
return buffer.toString();
}
public static List<Element> getChildNodes(Element e, String byName) {
List<Element> result = new ArrayList<Element>();
NodeList l = e.getChildNodes();
for (int i = 0; i < l.getLength(); i++) {
if (e.equals(l.item(i).getParentNode())
&& byName.equals(l.item(i).getNodeName()))
result.add((Element) l.item(i));
}
return result;
}
}

View File

@ -45,7 +45,7 @@ public class ContentStreamDataSource extends DataSource<Reader> {
@Override
public Reader getData(String query) {
contentStream = context.getDocBuilder().requestParameters.contentStream;
contentStream = context.getDocBuilder().getReqParams().getContentStream();
if (contentStream == null)
throw new DataImportHandlerException(SEVERE, "No stream available. The request has no body");
try {

View File

@ -18,6 +18,7 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.dataimport.config.Script;
import java.util.Collections;
import java.util.HashMap;
@ -33,7 +34,7 @@ import java.util.Map;
* @since solr 1.3
*/
public class ContextImpl extends Context {
protected DataConfig.Entity entity;
protected EntityProcessorWrapper epw;
private ContextImpl parent;
@ -54,16 +55,16 @@ public class ContextImpl extends Context {
DocBuilder docBuilder;
public ContextImpl(DataConfig.Entity entity, VariableResolverImpl resolver,
public ContextImpl(EntityProcessorWrapper epw, VariableResolverImpl resolver,
DataSource ds, String currProcess,
Map<String, Object> global, ContextImpl parentContext, DocBuilder docBuilder) {
this.entity = entity;
this.epw = epw;
this.docBuilder = docBuilder;
this.resolver = resolver;
this.ds = ds;
this.currProcess = currProcess;
if (docBuilder != null) {
this.requestParams = docBuilder.requestParameters.requestParams;
this.requestParams = docBuilder.getReqParams().getRawParams();
dataImporter = docBuilder.dataImporter;
}
globalSession = global;
@ -72,17 +73,17 @@ public class ContextImpl extends Context {
@Override
public String getEntityAttribute(String name) {
return entity == null ? null : entity.allAttributes.get(name);
return epw==null || epw.getEntity() == null ? null : epw.getEntity().getAllAttributes().get(name);
}
@Override
public String getResolvedEntityAttribute(String name) {
return entity == null ? null : resolver.replaceTokens(entity.allAttributes.get(name));
return epw==null || epw.getEntity() == null ? null : resolver.replaceTokens(epw.getEntity().getAllAttributes().get(name));
}
@Override
public List<Map<String, String>> getAllEntityFields() {
return entity == null ? Collections.EMPTY_LIST : entity.allFieldsList;
return epw==null || epw.getEntity() == null ? Collections.EMPTY_LIST : epw.getEntity().getAllFieldsList();
}
@Override
@ -93,26 +94,26 @@ public class ContextImpl extends Context {
@Override
public DataSource getDataSource() {
if (ds != null) return ds;
if(entity == null) return null;
if (entity.dataSrc == null) {
entity.dataSrc = dataImporter.getDataSourceInstance(entity, entity.dataSource, this);
if(epw==null) { return null; }
if (epw!=null && epw.getDatasource() == null) {
epw.setDatasource(dataImporter.getDataSourceInstance(epw.getEntity(), epw.getEntity().getDataSourceName(), this));
}
if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug &&
if (epw!=null && epw.getDatasource() != null && docBuilder != null && docBuilder.verboseDebug &&
Context.FULL_DUMP.equals(currentProcess())) {
//debug is not yet implemented properly for deltas
entity.dataSrc = docBuilder.getDebugLogger().wrapDs(entity.dataSrc);
epw.setDatasource(docBuilder.getDebugLogger().wrapDs(epw.getDatasource()));
}
return entity.dataSrc;
return epw.getDatasource();
}
@Override
public DataSource getDataSource(String name) {
return dataImporter.getDataSourceInstance(entity, name, this);
return dataImporter.getDataSourceInstance(epw==null ? null : epw.getEntity(), name, this);
}
@Override
public boolean isRootEntity() {
return entity.isDocRoot;
return epw==null ? false : epw.getEntity().isDocRoot();
}
@Override
@ -127,7 +128,7 @@ public class ContextImpl extends Context {
@Override
public EntityProcessor getEntityProcessor() {
return entity == null ? null : entity.processor;
return epw;
}
@Override
@ -210,18 +211,18 @@ public class ContextImpl extends Context {
@Override
public String getScript() {
if(dataImporter != null) {
DataConfig.Script script = dataImporter.getConfig().script;
return script == null ? null : script.text;
if (dataImporter != null) {
Script script = dataImporter.getConfig().getScript();
return script == null ? null : script.getText();
}
return null;
}
@Override
public String getScriptLanguage() {
if (dataImporter != null) {
DataConfig.Script script = dataImporter.getConfig().script;
return script == null ? null : script.language;
Script script = dataImporter.getConfig().getScript();
return script == null ? null : script.getLanguage();
}
return null;
}

View File

@ -21,6 +21,7 @@ import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrap
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -92,6 +93,7 @@ public class DIHCacheSupport {
}
public void initNewParent(Context context) {
dataSourceRowCache = null;
queryVsCacheIterator = new HashMap<String,Iterator<Map<String,Object>>>();
for (Map.Entry<String,DIHCache> entry : queryVsCache.entrySet()) {
queryVsCacheIterator.put(entry.getKey(), entry.getValue().iterator());
@ -166,18 +168,16 @@ public class DIHCacheSupport {
+ context.getEntityAttribute("name"));
}
DIHCache cache = queryVsCache.get(query);
if (cache == null) {
cache = instantiateCache(context);
queryVsCache.put(query, cache);
populateCache(query, rowIterator);
}
if (dataSourceRowCache == null) {
DIHCache cache = queryVsCache.get(query);
if (cache == null) {
cache = instantiateCache(context);
queryVsCache.put(query, cache);
populateCache(query, rowIterator);
}
dataSourceRowCache = cache.iterator(key);
}
if (dataSourceRowCache == null) {
return null;
}
}
return getFromRowCacheTransformed();
}
@ -191,25 +191,18 @@ public class DIHCacheSupport {
*/
protected Map<String,Object> getSimpleCacheData(Context context,
String query, Iterator<Map<String,Object>> rowIterator) {
DIHCache cache = queryVsCache.get(query);
if (cache == null) {
cache = instantiateCache(context);
queryVsCache.put(query, cache);
populateCache(query, rowIterator);
queryVsCacheIterator.put(query, cache.iterator());
}
if (dataSourceRowCache == null || !dataSourceRowCache.hasNext()) {
dataSourceRowCache = null;
Iterator<Map<String,Object>> cacheIter = queryVsCacheIterator.get(query);
if (cacheIter.hasNext()) {
List<Map<String,Object>> dsrcl = new ArrayList<Map<String,Object>>(1);
dsrcl.add(cacheIter.next());
dataSourceRowCache = dsrcl.iterator();
}
}
if (dataSourceRowCache == null) {
return null;
if (dataSourceRowCache == null) {
DIHCache cache = queryVsCache.get(query);
if (cache == null) {
cache = instantiateCache(context);
queryVsCache.put(query, cache);
populateCache(query, rowIterator);
queryVsCacheIterator.put(query, cache.iterator());
}
Iterator<Map<String,Object>> cacheIter = queryVsCacheIterator.get(query);
dataSourceRowCache = cacheIter;
}
return getFromRowCacheTransformed();
}

View File

@ -1,375 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.apache.solr.schema.SchemaField;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* <p>
* Mapping for data-config.xml
* </p>
* <p/>
* <p>
* Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
* for more details.
* </p>
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @since solr 1.3
*/
public class DataConfig {
static final Logger LOG = LoggerFactory.getLogger(DataConfig.class);
public Document document;
public List<Map<String, String >> functions = new ArrayList<Map<String ,String>>();
public Script script;
public Map<String, Properties> dataSources = new HashMap<String, Properties>();
public Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
public static class Document {
// TODO - remove from here and add it to entity
public String deleteQuery;
public List<Entity> entities = new ArrayList<Entity>();
public String onImportStart, onImportEnd;
public Document() {
}
public Document(Element element) {
this.deleteQuery = getStringAttribute(element, "deleteQuery", null);
this.onImportStart = getStringAttribute(element, "onImportStart", null);
this.onImportEnd = getStringAttribute(element, "onImportEnd", null);
List<Element> l = getChildNodes(element, "entity");
for (Element e : l)
entities.add(new Entity(e));
}
}
public static class Entity {
public String name;
public String pk;
public String pkMappingFromSchema;
public String dataSource;
public Map<String, String> allAttributes;
public String proc;
public String docRoot;
public boolean isDocRoot = false;
public List<Field> fields = new ArrayList<Field>();
public List<Map<String, String>> allFieldsList = new ArrayList<Map<String, String>>();
public List<Entity> entities;
public Entity parentEntity;
public EntityProcessorWrapper processor;
@SuppressWarnings("unchecked")
public DataSource dataSrc;
public Map<String, List<Field>> colNameVsField = new HashMap<String, List<Field>>();
public boolean initalized = false;
public Entity() {
}
public Entity(Element element) {
name = getStringAttribute(element, NAME, null);
if(name == null){
LOG.warn("Entity does not have a name");
name= ""+System.nanoTime();
}
if(name.indexOf(".") != -1){
throw new DataImportHandlerException(SEVERE, "Entity name must not have period (.): '" + name);
}
if (RESERVED_WORDS.contains(name)) {
throw new DataImportHandlerException(SEVERE, "Entity name : '" + name
+ "' is a reserved keyword. Reserved words are: " + RESERVED_WORDS);
}
pk = getStringAttribute(element, "pk", null);
docRoot = getStringAttribute(element, ROOT_ENTITY, null);
proc = getStringAttribute(element, PROCESSOR, null);
dataSource = getStringAttribute(element, DataImporter.DATA_SRC, null);
allAttributes = getAllAttributes(element);
List<Element> n = getChildNodes(element, "field");
for (Element elem : n) {
Field field = new Field(elem);
fields.add(field);
List<Field> l = colNameVsField.get(field.column);
if(l == null) l = new ArrayList<Field>();
boolean alreadyFound = false;
for (Field f : l) {
if(f.getName().equals(field.getName())) {
alreadyFound = true;
break;
}
}
if(!alreadyFound) l.add(field);
colNameVsField.put(field.column, l);
}
n = getChildNodes(element, "entity");
if (!n.isEmpty())
entities = new ArrayList<Entity>();
for (Element elem : n)
entities.add(new Entity(elem));
}
public void clearCache() {
if (entities != null) {
for (Entity entity : entities)
entity.clearCache();
}
try {
processor.close();
} catch (Exception e) {
/*no op*/
}
processor = null;
if (dataSrc != null)
dataSrc.close();
dataSrc = null;
}
public String getPk(){
return pk == null ? pkMappingFromSchema : pk;
}
public String getSchemaPk(){
return pkMappingFromSchema != null ? pkMappingFromSchema : pk;
}
}
public static class Script {
public String language;
public String text;
public Script() {
}
public Script(Element e) {
this.language = getStringAttribute(e, "language", "JavaScript");
StringBuilder buffer = new StringBuilder();
String script = getTxt(e, buffer);
if (script != null)
this.text = script.trim();
}
}
public static class Field {
public String column;
public String name;
public Float boost = 1.0f;
public boolean toWrite = true;
public boolean multiValued = false;
boolean dynamicName;
public Map<String, String> allAttributes = new HashMap<String, String>() {
@Override
public String put(String key, String value) {
if (super.containsKey(key))
return super.get(key);
return super.put(key, value);
}
};
public Field() {
}
public Field(Element e) {
this.name = getStringAttribute(e, DataImporter.NAME, null);
this.column = getStringAttribute(e, DataImporter.COLUMN, null);
if (column == null) {
throw new DataImportHandlerException(SEVERE, "Field must have a column attribute");
}
this.boost = Float.parseFloat(getStringAttribute(e, "boost", "1.0f"));
allAttributes.putAll(getAllAttributes(e));
}
public String getName() {
return name == null ? column : name;
}
public Entity entity;
}
public void readFromXml(Element e) {
List<Element> n = getChildNodes(e, "document");
if (n.isEmpty()) {
throw new DataImportHandlerException(SEVERE, "DataImportHandler " +
"configuration file must have one <document> node.");
}
document = new Document(n.get(0));
n = getChildNodes(e, SCRIPT);
if (!n.isEmpty()) {
script = new Script(n.get(0));
}
// Add the provided evaluators
n = getChildNodes(e, FUNCTION);
if (!n.isEmpty()) {
for (Element element : n) {
String func = getStringAttribute(element, NAME, null);
String clz = getStringAttribute(element, CLASS, null);
if (func == null || clz == null){
throw new DataImportHandlerException(
SEVERE,
"<function> must have a 'name' and 'class' attributes");
} else {
functions.add(getAllAttributes(element));
}
}
}
n = getChildNodes(e, DATA_SRC);
if (!n.isEmpty()) {
for (Element element : n) {
Properties p = new Properties();
HashMap<String, String> attrs = getAllAttributes(element);
for (Map.Entry<String, String> entry : attrs.entrySet()) {
p.setProperty(entry.getKey(), entry.getValue());
}
dataSources.put(p.getProperty("name"), p);
}
}
if(dataSources.get(null) == null){
for (Properties properties : dataSources.values()) {
dataSources.put(null,properties);
break;
}
}
}
private static String getStringAttribute(Element e, String name, String def) {
String r = e.getAttribute(name);
if (r == null || "".equals(r.trim()))
r = def;
return r;
}
private static HashMap<String, String> getAllAttributes(Element e) {
HashMap<String, String> m = new HashMap<String, String>();
NamedNodeMap nnm = e.getAttributes();
for (int i = 0; i < nnm.getLength(); i++) {
m.put(nnm.item(i).getNodeName(), nnm.item(i).getNodeValue());
}
return m;
}
public static String getTxt(Node elem, StringBuilder buffer) {
if (elem.getNodeType() != Node.CDATA_SECTION_NODE) {
NodeList childs = elem.getChildNodes();
for (int i = 0; i < childs.getLength(); i++) {
Node child = childs.item(i);
short childType = child.getNodeType();
if (childType != Node.COMMENT_NODE
&& childType != Node.PROCESSING_INSTRUCTION_NODE) {
getTxt(child, buffer);
}
}
} else {
buffer.append(elem.getNodeValue());
}
return buffer.toString();
}
public static List<Element> getChildNodes(Element e, String byName) {
List<Element> result = new ArrayList<Element>();
NodeList l = e.getChildNodes();
for (int i = 0; i < l.getLength(); i++) {
if (e.equals(l.item(i).getParentNode())
&& byName.equals(l.item(i).getNodeName()))
result.add((Element) l.item(i));
}
return result;
}
public void clearCaches() {
for (Entity entity : document.entities)
entity.clearCache();
}
public static final String SCRIPT = "script";
public static final String NAME = "name";
public static final String PROCESSOR = "processor";
/**
* @deprecated use IMPORTER_NS_SHORT instead
*/
@Deprecated
public static final String IMPORTER_NS = "dataimporter";
public static final String IMPORTER_NS_SHORT = "dih";
public static final String ROOT_ENTITY = "rootEntity";
public static final String FUNCTION = "function";
public static final String CLASS = "class";
public static final String DATA_SRC = "dataSource";
private static final Set<String> RESERVED_WORDS = new HashSet<String>();
static{
RESERVED_WORDS.add(IMPORTER_NS);
RESERVED_WORDS.add(IMPORTER_NS_SHORT);
RESERVED_WORDS.add("request");
RESERVED_WORDS.add("delta");
RESERVED_WORDS.add("functions");
RESERVED_WORDS.add("session");
RESERVED_WORDS.add(SolrWriter.LAST_INDEX_KEY);
}
}

View File

@ -125,16 +125,21 @@ public class DataImportHandler extends RequestHandlerBase implements
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
throws Exception {
rsp.setHttpCaching(false);
SolrParams params = req.getParams();
DataImporter.RequestParams requestParams = new DataImporter.RequestParams(getParamsMap(params));
String command = requestParams.command;
//TODO: figure out why just the first one is OK...
ContentStream contentStream = null;
Iterable<ContentStream> streams = req.getContentStreams();
if(streams != null){
for (ContentStream stream : streams) {
requestParams.contentStream = stream;
contentStream = stream;
break;
}
}
SolrParams params = req.getParams();
RequestInfo requestParams = new RequestInfo(getParamsMap(params), contentStream);
String command = requestParams.getCommand();
if (DataImporter.SHOW_CONF_CMD.equals(command)) {
// Modify incoming request params to add wt=raw
ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
@ -154,13 +159,13 @@ public class DataImportHandler extends RequestHandlerBase implements
if (command != null)
rsp.add("command", command);
if (requestParams.debug && (importer == null || !importer.isBusy())) {
if (requestParams.isDebug() && (importer == null || !importer.isBusy())) {
// Reload the data-config.xml
importer = null;
if (requestParams.dataConfig != null) {
if (requestParams.getDataConfig() != null) {
try {
processConfiguration((NamedList) initArgs.get("defaults"));
importer = new DataImporter(new InputSource(new StringReader(requestParams.dataConfig)), req.getCore()
importer = new DataImporter(new InputSource(new StringReader(requestParams.getDataConfig())), req.getCore()
, dataSources, coreScopeSession, myName);
} catch (RuntimeException e) {
rsp.add("exception", DebugLogger.getStacktraceString(e));
@ -194,23 +199,21 @@ public class DataImportHandler extends RequestHandlerBase implements
SolrResourceLoader loader = req.getCore().getResourceLoader();
SolrWriter sw = getSolrWriter(processor, loader, requestParams, req);
if (requestParams.debug) {
if (requestParams.isDebug()) {
if (debugEnabled) {
// Synchronous request for the debug mode
importer.runCmd(requestParams, sw);
rsp.add("mode", "debug");
rsp.add("documents", requestParams.debugDocuments);
if (requestParams.debugVerboseOutput != null) {
rsp.add("verbose-output", requestParams.debugVerboseOutput);
rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
if (requestParams.getDebugInfo().debugVerboseOutput != null) {
rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
}
requestParams.debugDocuments = new ArrayList<SolrInputDocument>(0);
requestParams.debugVerboseOutput = null;
} else {
message = DataImporter.MSG.DEBUG_NOT_ENABLED;
}
} else {
// Asynchronous request for normal mode
if(requestParams.contentStream == null && !requestParams.syncMode){
if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
importer.runAsync(requestParams, sw);
} else {
importer.runCmd(requestParams, sw);
@ -276,7 +279,7 @@ public class DataImportHandler extends RequestHandlerBase implements
}
private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final DataImporter.RequestParams requestParams, SolrQueryRequest req) {
final SolrResourceLoader loader, final RequestInfo requestParams, SolrQueryRequest req) {
return new SolrWriter(processor, req) {

View File

@ -18,15 +18,16 @@
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.solr.common.util.XMLErrorLogger;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.apache.solr.handler.dataimport.config.Script;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
@ -63,31 +64,20 @@ public class DataImporter {
private static final XMLErrorLogger XMLLOG = new XMLErrorLogger(LOG);
private Status status = Status.IDLE;
private DataConfig config;
private DIHConfiguration config;
private Date indexStartTime;
private Properties store = new Properties();
private Map<String, Properties> dataSourceProps = new HashMap<String, Properties>();
private IndexSchema schema;
public DocBuilder docBuilder;
public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
private SolrCore core;
private SolrCore core;
private DIHPropertiesWriter propWriter;
private ReentrantLock importLock = new ReentrantLock();
private final Map<String , Object> coreScopeSession;
private boolean isDeltaImportSupported = false;
private final String handlerName;
private boolean isDeltaImportSupported = false;
private final String handlerName;
private Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
/**
* Only for testing purposes
@ -106,96 +96,66 @@ public class DataImporter {
} else {
propWriter = new ZKPropertiesWriter();
}
propWriter.init(this);
}
DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
this.handlerName = handlerName;
if (dataConfig == null)
throw new DataImportHandlerException(SEVERE,
"Configuration not found");
this.handlerName = handlerName;
if (dataConfig == null) {
throw new DataImportHandlerException(SEVERE, "Configuration not found");
}
this.core = core;
this.schema = core.getSchema();
loadSchemaFieldMap();
createPropertyWriter();
propWriter.init(this);
dataSourceProps = ds;
if (session == null)
session = new HashMap<String, Object>();
coreScopeSession = session;
loadDataConfig(dataConfig);
for (Map.Entry<String, SchemaField> entry : schema.getFields().entrySet()) {
config.lowerNameVsSchemaField.put(entry.getKey().toLowerCase(Locale.ENGLISH), entry.getValue());
}
for (DataConfig.Entity e : config.document.entities) {
Map<String, DataConfig.Field> fields = new HashMap<String, DataConfig.Field>();
initEntity(e, fields, false);
verifyWithSchema(fields);
identifyPk(e);
if (e.allAttributes.containsKey(SqlEntityProcessor.DELTA_QUERY))
for (Entity e : config.getEntities()) {
if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
isDeltaImportSupported = true;
break;
}
}
}
private void loadSchemaFieldMap() {
Map<String, SchemaField> modLnvsf = new HashMap<String, SchemaField>();
for (Map.Entry<String, SchemaField> entry : schema.getFields().entrySet()) {
modLnvsf.put(entry.getKey().toLowerCase(Locale.ENGLISH), entry.getValue());
}
lowerNameVsSchemaField = Collections.unmodifiableMap(modLnvsf);
}
public SchemaField getSchemaField(String caseInsensitiveName) {
SchemaField schemaField = null;
if(schema!=null) {
schemaField = schema.getFieldOrNull(caseInsensitiveName);
}
if (schemaField == null) {
schemaField = lowerNameVsSchemaField.get(caseInsensitiveName.toLowerCase(Locale.ENGLISH));
}
return schemaField;
}
public String getHandlerName() {
return handlerName;
}
private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
Map<String, SchemaField> schemaFields = schema.getFields();
for (Map.Entry<String, SchemaField> entry : schemaFields.entrySet()) {
SchemaField sf = entry.getValue();
if (!fields.containsKey(sf.getName())) {
if (sf.isRequired()) {
LOG
.info(sf.getName()
+ " is a required field in SolrSchema . But not found in DataConfig");
}
}
}
for (Map.Entry<String, DataConfig.Field> entry : fields.entrySet()) {
DataConfig.Field fld = entry.getValue();
SchemaField field = schema.getFieldOrNull(fld.getName());
if (field == null) {
field = config.lowerNameVsSchemaField.get(fld.getName().toLowerCase(Locale.ENGLISH));
if (field == null) {
LOG.info("The field :" + fld.getName() + " present in DataConfig does not have a counterpart in Solr Schema");
}
}
}
}
/**
* Used by tests
*/
void loadAndInit(String configStr) {
loadDataConfig(new InputSource(new StringReader(configStr)));
Map<String, DataConfig.Field> fields = new HashMap<String, DataConfig.Field>();
for (DataConfig.Entity entity : config.document.entities) {
initEntity(entity, fields, false);
}
}
private void identifyPk(DataConfig.Entity entity) {
SchemaField uniqueKey = schema.getUniqueKeyField();
String schemaPk = "";
if (uniqueKey != null)
schemaPk = uniqueKey.getName();
else return;
//if no fields are mentioned . solr uniqueKey is same as dih 'pk'
entity.pkMappingFromSchema = schemaPk;
for (DataConfig.Field field : entity.fields) {
if(field.getName().equals(schemaPk)) {
entity.pkMappingFromSchema = field.column;
//get the corresponding column mapping for the solr uniqueKey
// But if there are multiple columns mapping to the solr uniqueKey, it will fail
// so , in one off cases we may need pk
break;
}
}
}
loadDataConfig(new InputSource(new StringReader(configStr)));
}
private void loadDataConfig(InputSource configFile) {
@ -224,71 +184,72 @@ public class DataImporter {
IOUtils.closeQuietly(configFile.getByteStream());
}
config = new DataConfig();
NodeList elems = document.getElementsByTagName("dataConfig");
if(elems == null || elems.getLength() == 0) {
throw new DataImportHandlerException(SEVERE, "the root node '<dataConfig>' is missing");
}
config.readFromXml((Element) elems.item(0));
config = readFromXml(document);
LOG.info("Data Configuration loaded successfully");
} catch (Exception e) {
throw new DataImportHandlerException(SEVERE,
"Exception occurred while initializing context", e);
}
}
private void initEntity(DataConfig.Entity e,
Map<String, DataConfig.Field> fields, boolean docRootFound) {
e.allAttributes.put(DATA_SRC, e.dataSource);
if (!docRootFound && !"false".equals(e.docRoot)) {
// if in this chain no document root is found()
e.isDocRoot = true;
public DIHConfiguration readFromXml(Document xmlDocument) {
DIHConfiguration config;
List<Map<String, String >> functions = new ArrayList<Map<String ,String>>();
Script script = null;
Map<String, Properties> dataSources = new HashMap<String, Properties>();
NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
if(dataConfigTags == null || dataConfigTags.getLength() == 0) {
throw new DataImportHandlerException(SEVERE, "the root node '<dataConfig>' is missing");
}
if (e.fields != null) {
for (DataConfig.Field f : e.fields) {
if (schema != null) {
if(f.name != null && f.name.contains("${")){
f.dynamicName = true;
continue;
}
SchemaField schemaField = schema.getFieldOrNull(f.getName());
if (schemaField == null) {
schemaField = config.lowerNameVsSchemaField.get(f.getName().toLowerCase(Locale.ENGLISH));
if (schemaField != null) f.name = schemaField.getName();
}
if (schemaField != null) {
f.multiValued = schemaField.multiValued();
f.allAttributes.put(MULTI_VALUED, Boolean.toString(schemaField
.multiValued()));
f.allAttributes.put(TYPE, schemaField.getType().getTypeName());
f.allAttributes.put("indexed", Boolean.toString(schemaField.indexed()));
f.allAttributes.put("stored", Boolean.toString(schemaField.stored()));
f.allAttributes.put("defaultValue", schemaField.getDefaultValue());
} else {
f.toWrite = false;
}
Element e = (Element) dataConfigTags.item(0);
List<Element> documentTags = ConfigParseUtil.getChildNodes(e, "document");
if (documentTags.isEmpty()) {
throw new DataImportHandlerException(SEVERE, "DataImportHandler " +
"configuration file must have one <document> node.");
}
List<Element> scriptTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.SCRIPT);
if (!scriptTags.isEmpty()) {
script = new Script(scriptTags.get(0));
}
// Add the provided evaluators
List<Element> functionTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.FUNCTION);
if (!functionTags.isEmpty()) {
for (Element element : functionTags) {
String func = ConfigParseUtil.getStringAttribute(element, NAME, null);
String clz = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.CLASS, null);
if (func == null || clz == null){
throw new DataImportHandlerException(
SEVERE,
"<function> must have a 'name' and 'class' attributes");
} else {
functions.add(ConfigParseUtil.getAllAttributes(element));
}
fields.put(f.getName(), f);
f.entity = e;
f.allAttributes.put("boost", f.boost.toString());
f.allAttributes.put("toWrite", Boolean.toString(f.toWrite));
e.allFieldsList.add(Collections.unmodifiableMap(f.allAttributes));
}
}
e.allFieldsList = Collections.unmodifiableList(e.allFieldsList);
e.allAttributes = Collections.unmodifiableMap(e.allAttributes);
if (e.entities == null)
return;
for (DataConfig.Entity e1 : e.entities) {
e1.parentEntity = e;
initEntity(e1, fields, e.isDocRoot || docRootFound);
List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, DATA_SRC);
if (!dataSourceTags.isEmpty()) {
for (Element element : dataSourceTags) {
Properties p = new Properties();
HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
for (Map.Entry<String, String> entry : attrs.entrySet()) {
p.setProperty(entry.getKey(), entry.getValue());
}
dataSources.put(p.getProperty("name"), p);
}
}
if(dataSources.get(null) == null){
for (Properties properties : dataSources.values()) {
dataSources.put(null,properties);
break;
}
}
return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources);
}
DataConfig getConfig() {
DIHConfiguration getConfig() {
return config;
}
@ -308,18 +269,17 @@ public class DataImporter {
return store.get(key);
}
DataSource getDataSourceInstance(DataConfig.Entity key, String name, Context ctx) {
DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
Properties p = dataSourceProps.get(name);
if (p == null)
p = config.dataSources.get(name);
p = config.getDataSources().get(name);
if (p == null)
p = dataSourceProps.get(null);// for default data source
if (p == null)
p = config.dataSources.get(null);
p = config.getDataSources().get(null);
if (p == null)
throw new DataImportHandlerException(SEVERE,
"No dataSource :" + name + " available for entity :"
+ key.name);
"No dataSource :" + name + " available for entity :" + key.getName());
String type = p.getProperty(TYPE);
DataSource dataSrc = null;
if (type == null) {
@ -344,7 +304,7 @@ public class DataImporter {
}
dataSrc.init(ctx, copyProps);
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.dataSource);
wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());
}
return dataSrc;
}
@ -361,7 +321,7 @@ public class DataImporter {
return importLock.isLocked();
}
public void doFullImport(SolrWriter writer, RequestParams requestParams) {
public void doFullImport(SolrWriter writer, RequestInfo requestParams) {
LOG.info("Starting Full Import");
setStatus(Status.RUNNING_FULL_DUMP);
@ -371,14 +331,13 @@ public class DataImporter {
docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer);
docBuilder.execute();
if (!requestParams.debug)
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);
} catch (Throwable t) {
SolrException.log(LOG, "Full Import failed", t);
docBuilder.rollback();
} finally {
setStatus(Status.IDLE);
config.clearCaches();
DocBuilder.INSTANCE.set(null);
}
@ -393,7 +352,7 @@ public class DataImporter {
}
}
public void doDeltaImport(SolrWriter writer, RequestParams requestParams) {
public void doDeltaImport(SolrWriter writer, RequestInfo requestParams) {
LOG.info("Starting Delta Import");
setStatus(Status.RUNNING_DELTA_DUMP);
@ -402,20 +361,19 @@ public class DataImporter {
docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer);
docBuilder.execute();
if (!requestParams.debug)
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);
} catch (Throwable t) {
LOG.error("Delta Import Failed", t);
docBuilder.rollback();
} finally {
setStatus(Status.IDLE);
config.clearCaches();
DocBuilder.INSTANCE.set(null);
}
}
public void runAsync(final RequestParams reqParams, final SolrWriter sw) {
public void runAsync(final RequestInfo reqParams, final SolrWriter sw) {
new Thread() {
@Override
public void run() {
@ -424,8 +382,8 @@ public class DataImporter {
}.start();
}
void runCmd(RequestParams reqParams, SolrWriter sw) {
String command = reqParams.command;
void runCmd(RequestInfo reqParams, SolrWriter sw) {
String command = reqParams.getCommand();
if (command.equals(ABORT_CMD)) {
if (docBuilder != null) {
docBuilder.abort();
@ -514,90 +472,7 @@ public class DataImporter {
public static final String TOTAL_DOCS_SKIPPED = "Total Documents Skipped";
}
static final class RequestParams {
public String command = null;
public boolean debug = false;
public boolean verbose = false;
public boolean syncMode = false;
public boolean commit = true;
public boolean optimize = false;
public int start = 0;
public long rows = Integer.MAX_VALUE;
public boolean clean = true;
public List<String> entities;
public Map<String, Object> requestParams;
public String dataConfig;
public ContentStream contentStream;
public List<SolrInputDocument> debugDocuments = Collections.synchronizedList(new ArrayList<SolrInputDocument>());
public NamedList debugVerboseOutput = null;
public RequestParams() {
}
public RequestParams(Map<String, Object> requestParams) {
if (requestParams.containsKey("command"))
command = (String) requestParams.get("command");
if (StrUtils.parseBool((String)requestParams.get("debug"),false)) {
debug = true;
rows = 10;
// Set default values suitable for debug mode
commit = false;
clean = false;
verbose = StrUtils.parseBool((String)requestParams.get("verbose"),false);
}
syncMode = StrUtils.parseBool((String)requestParams.get("synchronous"),false);
if (DELTA_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
clean = false;
}
if (requestParams.containsKey("commit"))
commit = StrUtils.parseBool((String) requestParams.get("commit"),true);
if (requestParams.containsKey("start"))
start = Integer.parseInt((String) requestParams.get("start"));
if (requestParams.containsKey("rows"))
rows = Integer.parseInt((String) requestParams.get("rows"));
if (requestParams.containsKey("clean"))
clean = StrUtils.parseBool((String) requestParams.get("clean"),true);
if (requestParams.containsKey("optimize")) {
optimize = StrUtils.parseBool((String) requestParams.get("optimize"),false);
if (optimize)
commit = true;
}
Object o = requestParams.get("entity");
if (o != null && o instanceof String) {
entities = new ArrayList<String>();
entities.add((String) o);
} else if (o != null && o instanceof List) {
entities = (List<String>) requestParams.get("entity");
}
dataConfig = (String) requestParams.get("dataConfig");
if (dataConfig != null && dataConfig.trim().length() == 0) {
// Empty data-config param is not valid, change it to null
dataConfig = null;
}
this.requestParams = requestParams;
}
}
IndexSchema getSchema() {
public IndexSchema getSchema() {
return schema;
}

View File

@ -0,0 +1,21 @@
package org.apache.solr.handler.dataimport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
public class DebugInfo {
public List<SolrInputDocument> debugDocuments = new ArrayList<SolrInputDocument>(0);
public NamedList<String> debugVerboseOutput = null;
public boolean verbose;
public DebugInfo(Map<String,Object> requestParams) {
verbose = StrUtils.parseBool((String) requestParams.get("verbose"), false);
debugVerboseOutput = new NamedList<String>();
}
}

View File

@ -20,6 +20,11 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.apache.solr.handler.dataimport.config.EntityField;
import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
import org.apache.solr.schema.SchemaField;
@ -46,9 +51,9 @@ public class DocBuilder {
DataImporter dataImporter;
private DataConfig.Document document;
private DIHConfiguration config;
private DataConfig.Entity root;
private EntityProcessorWrapper currentEntityProcessorWrapper;
@SuppressWarnings("unchecked")
private Map statusMessages = Collections.synchronizedMap(new LinkedHashMap());
@ -57,8 +62,6 @@ public class DocBuilder {
DIHWriter writer;
DataImporter.RequestParams requestParameters;
boolean verboseDebug = false;
Map<String, Object> session = new HashMap<String, Object>();
@ -71,23 +74,22 @@ public class DocBuilder {
private static final String PARAM_WRITER_IMPL = "writerImpl";
private static final String DEFAULT_WRITER_NAME = "SolrWriter";
private DebugLogger debugLogger;
private DataImporter.RequestParams reqParams;
private final RequestInfo reqParams;
@SuppressWarnings("unchecked")
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
@SuppressWarnings("unchecked")
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, RequestInfo reqParams) {
INSTANCE.set(this);
this.dataImporter = dataImporter;
this.reqParams = reqParams;
this.propWriter = propWriter;
DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
requestParameters = reqParams;
verboseDebug = requestParameters.debug && requestParameters.verbose;
verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
persistedProperties = propWriter.readIndexerProperties();
functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this, getVariableResolver());
functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().getFunctions(), this, getVariableResolver());
String writerClassStr = null;
if(reqParams!=null && reqParams.requestParams != null) {
writerClassStr = (String) reqParams.requestParams.get(PARAM_WRITER_IMPL);
if(reqParams!=null && reqParams.getRawParams() != null) {
writerClassStr = (String) reqParams.getRawParams().get(PARAM_WRITER_IMPL);
}
if(writerClassStr != null && !writerClassStr.equals(DEFAULT_WRITER_NAME) && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "." + DEFAULT_WRITER_NAME)) {
try {
@ -99,13 +101,11 @@ public class DocBuilder {
} else {
writer = solrWriter;
}
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.requestParams, null, this);
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
writer.init(ctx);
}
DebugLogger getDebugLogger(){
if (debugLogger == null) {
debugLogger = new DebugLogger();
@ -128,10 +128,10 @@ public class DocBuilder {
indexerNamespace.put(LAST_INDEX_TIME, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH));
}
indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
indexerNamespace.put("request", requestParameters.requestParams);
indexerNamespace.put("request", reqParams.getRawParams());
indexerNamespace.put("functions", functionsNamespace);
for (DataConfig.Entity entity : dataImporter.getConfig().document.entities) {
String key = entity.name + "." + SolrWriter.LAST_INDEX_KEY;
for (Entity entity : dataImporter.getConfig().getEntities()) {
String key = entity.getName() + "." + SolrWriter.LAST_INDEX_KEY;
String lastIndex = persistedProperties.getProperty(key);
if (lastIndex != null) {
indexerNamespace.put(key, lastIndex);
@ -139,8 +139,8 @@ public class DocBuilder {
indexerNamespace.put(key, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH));
}
}
resolver.addNamespace(DataConfig.IMPORTER_NS_SHORT, indexerNamespace);
resolver.addNamespace(DataConfig.IMPORTER_NS, indexerNamespace);
resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
resolver.addNamespace(ConfigNameConstants.IMPORTER_NS, indexerNamespace);
return resolver;
} catch (Exception e) {
wrapAndThrow(SEVERE, e);
@ -177,9 +177,10 @@ public class DocBuilder {
@SuppressWarnings("unchecked")
public void execute() {
List<EntityProcessorWrapper> epwList = null;
try {
dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
document = dataImporter.getConfig().document;
config = dataImporter.getConfig();
final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
statusMessages.put(TIME_ELAPSED, new Object() {
@Override
@ -197,28 +198,33 @@ public class DocBuilder {
statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
importStatistics.skipDocCount);
List<String> entities = requestParameters.entities;
List<String> entities = reqParams.getEntitiesToRun();
// Trigger onImportStart
if (document.onImportStart != null) {
invokeEventListener(document.onImportStart);
if (config.getOnImportStart() != null) {
invokeEventListener(config.getOnImportStart());
}
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
Properties lastIndexTimeProps = new Properties();
lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
for (DataConfig.Entity e : document.entities) {
if (entities != null && !entities.contains(e.name))
epwList = new ArrayList<EntityProcessorWrapper>(config.getEntities().size());
for (Entity e : config.getEntities()) {
epwList.add(getEntityProcessorWrapper(e));
}
for (EntityProcessorWrapper epw : epwList) {
if (entities != null && !entities.contains(epw.getEntity().getName()))
continue;
lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
lastIndexTimeProps.setProperty(epw.getEntity().getName() + "." + LAST_INDEX_KEY,
DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
root = e;
String delQuery = e.allAttributes.get("preImportDeleteQuery");
currentEntityProcessorWrapper = epw;
String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
cleanByQuery(delQuery, fullCleanDone);
doDelta();
delQuery = e.allAttributes.get("postImportDeleteQuery");
delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
@ -226,7 +232,7 @@ public class DocBuilder {
} else {
cleanByQuery(delQuery, fullCleanDone);
doFullDump();
delQuery = e.allAttributes.get("postImportDeleteQuery");
delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
@ -241,7 +247,7 @@ public class DocBuilder {
rollback();
} else {
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
if (!requestParameters.clean) {
if (!reqParams.isClean()) {
if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
finish(lastIndexTimeProps);
}
@ -250,8 +256,8 @@ public class DocBuilder {
finish(lastIndexTimeProps);
}
if (document.onImportEnd != null) {
invokeEventListener(document.onImportEnd);
if (config.getOnImportEnd() != null) {
invokeEventListener(config.getOnImportEnd());
}
}
@ -270,11 +276,23 @@ public class DocBuilder {
if (writer != null) {
writer.close();
}
if(requestParameters.debug) {
requestParameters.debugVerboseOutput = getDebugLogger().output;
if (epwList != null) {
closeEntityProcessorWrappers(epwList);
}
if(reqParams.isDebug()) {
reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
}
}
}
private void closeEntityProcessorWrappers(List<EntityProcessorWrapper> epwList) {
for(EntityProcessorWrapper epw : epwList) {
epw.close();
if(epw.getDatasource()!=null) {
epw.getDatasource().close();
}
closeEntityProcessorWrappers(epw.getChildren());
}
}
@SuppressWarnings("unchecked")
private void finish(Properties lastIndexTimeProps) {
@ -282,10 +300,10 @@ public class DocBuilder {
statusMessages.put("", "Indexing completed. Added/Updated: "
+ importStatistics.docCount + " documents. Deleted "
+ importStatistics.deletedDocCount + " documents.");
if(requestParameters.commit) {
writer.commit(requestParameters.optimize);
if(reqParams.isCommit()) {
writer.commit(reqParams.isOptimize());
addStatusMessage("Committed");
if (requestParameters.optimize)
if (reqParams.isOptimize())
addStatusMessage("Optimized");
}
try {
@ -305,7 +323,7 @@ public class DocBuilder {
private void doFullDump() {
addStatusMessage("Full Dump Started");
buildDocument(getVariableResolver(), null, null, root, true, null);
buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
}
@SuppressWarnings("unchecked")
@ -313,14 +331,14 @@ public class DocBuilder {
addStatusMessage("Delta Dump started");
VariableResolverImpl resolver = getVariableResolver();
if (document.deleteQuery != null) {
writer.deleteByQuery(document.deleteQuery);
if (config.getDeleteQuery() != null) {
writer.deleteByQuery(config.getDeleteQuery());
}
addStatusMessage("Identifying Delta");
LOG.info("Starting delta collection.");
Set<Map<String, Object>> deletedKeys = new HashSet<Map<String, Object>>();
Set<Map<String, Object>> allPks = collectDelta(root, resolver, deletedKeys);
Set<Map<String, Object>> allPks = collectDelta(currentEntityProcessorWrapper, resolver, deletedKeys);
if (stop.get())
return;
addStatusMessage("Deltas Obtained");
@ -338,8 +356,8 @@ public class DocBuilder {
Iterator<Map<String, Object>> pkIter = allPks.iterator();
while (pkIter.hasNext()) {
Map<String, Object> map = pkIter.next();
vri.addNamespace(DataConfig.IMPORTER_NS_SHORT + ".delta", map);
buildDocument(vri, null, map, root, true, null);
vri.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT + ".delta", map);
buildDocument(vri, null, map, currentEntityProcessorWrapper, true, null);
pkIter.remove();
// check for abort
if (stop.get())
@ -356,7 +374,7 @@ public class DocBuilder {
Iterator<Map<String, Object>> iter = deletedKeys.iterator();
while (iter.hasNext()) {
Map<String, Object> map = iter.next();
String keyName = root.isDocRoot ? root.getPk() : root.getSchemaPk();
String keyName = currentEntityProcessorWrapper.getEntity().isDocRoot() ? currentEntityProcessorWrapper.getEntity().getPk() : currentEntityProcessorWrapper.getEntity().getSchemaPk();
Object key = map.get(keyName);
if(key == null) {
keyName = findMatchingPkColumn(keyName, map);
@ -377,53 +395,50 @@ public class DocBuilder {
statusMessages.put(msg, DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
}
private void resetEntity(DataConfig.Entity entity) {
entity.initalized = false;
if (entity.entities != null) {
for (DataConfig.Entity child : entity.entities) {
resetEntity(child);
}
private void resetEntity(EntityProcessorWrapper epw) {
epw.setInitalized(false);
for (EntityProcessorWrapper child : epw.getChildren()) {
resetEntity(child);
}
}
private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
Map<String,Object> pk, DataConfig.Entity entity, boolean isRoot,
Map<String,Object> pk, EntityProcessorWrapper epw, boolean isRoot,
ContextImpl parentCtx) {
List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<EntityProcessorWrapper>();
try {
buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);
buildDocument(vr, doc, pk, epw, isRoot, parentCtx, entitiesToDestroy);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
entityWrapper.destroy();
}
resetEntity(entity);
resetEntity(epw);
}
}
@SuppressWarnings("unchecked")
private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot,
Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);
ContextImpl ctx = new ContextImpl(entity, vr, null,
ContextImpl ctx = new ContextImpl(epw, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
entityProcessor.init(ctx);
if (!entity.initalized) {
entitiesToDestroy.add(entityProcessor);
entity.initalized = true;
epw.init(ctx);
if (!epw.isInitalized()) {
entitiesToDestroy.add(epw);
epw.setInitalized(true);
}
if (requestParameters.start > 0) {
if (reqParams.getStart() > 0) {
getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
}
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.START_ENTITY, entity.name, null);
getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
}
int seenDocCount = 0;
@ -432,66 +447,66 @@ public class DocBuilder {
while (true) {
if (stop.get())
return;
if(importStatistics.docCount.get() > (requestParameters.start + requestParameters.rows)) break;
if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
try {
seenDocCount++;
if (seenDocCount > requestParameters.start) {
if (seenDocCount > reqParams.getStart()) {
getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
}
if (verboseDebug && entity.isDocRoot) {
getDebugLogger().log(DIHLogLevels.START_DOC, entity.name, null);
if (verboseDebug && epw.getEntity().isDocRoot()) {
getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
}
if (doc == null && entity.isDocRoot) {
if (doc == null && epw.getEntity().isDocRoot()) {
doc = new DocWrapper();
ctx.setDoc(doc);
DataConfig.Entity e = entity;
while (e.parentEntity != null) {
addFields(e.parentEntity, doc, (Map<String, Object>) vr
.resolve(e.parentEntity.name), vr);
e = e.parentEntity;
Entity e = epw.getEntity();
while (e.getParentEntity() != null) {
addFields(e.getParentEntity(), doc, (Map<String, Object>) vr
.resolve(e.getParentEntity().getName()), vr);
e = e.getParentEntity();
}
}
Map<String, Object> arow = entityProcessor.nextRow();
Map<String, Object> arow = epw.nextRow();
if (arow == null) {
break;
}
// Support for start parameter in debug mode
if (entity.isDocRoot) {
if (seenDocCount <= requestParameters.start)
if (epw.getEntity().isDocRoot()) {
if (seenDocCount <= reqParams.getStart())
continue;
if (seenDocCount > requestParameters.start + requestParameters.rows) {
if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
LOG.info("Indexing stopped at docCount = " + importStatistics.docCount);
break;
}
}
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_OUT, entity.name, arow);
getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
}
importStatistics.rowsCount.incrementAndGet();
if (doc != null) {
handleSpecialCommands(arow, doc);
addFields(entity, doc, arow, vr);
addFields(epw.getEntity(), doc, arow, vr);
}
if (entity.entities != null) {
vr.addNamespace(entity.name, arow);
for (DataConfig.Entity child : entity.entities) {
if (epw.getEntity().getChildren() != null) {
vr.addNamespace(epw.getEntity().getName(), arow);
for (EntityProcessorWrapper child : epw.getChildren()) {
buildDocument(vr, doc,
child.isDocRoot ? pk : null, child, false, ctx, entitiesToDestroy);
child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
}
vr.removeNamespace(entity.name);
vr.removeNamespace(epw.getEntity().getName());
}
if (entity.isDocRoot) {
if (epw.getEntity().isDocRoot()) {
if (stop.get())
return;
if (!doc.isEmpty()) {
boolean result = writer.upload(doc);
if(reqParams.debug) {
reqParams.debugDocuments.add(doc);
if(reqParams.isDebug()) {
reqParams.getDebugInfo().debugDocuments.add(doc);
}
doc = null;
if (result){
@ -503,7 +518,7 @@ public class DocBuilder {
}
} catch (DataImportHandlerException e) {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, e);
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
}
if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
continue;
@ -514,7 +529,7 @@ public class DocBuilder {
doc = null;
} else {
SolrException.log(LOG, "Exception while processing: "
+ entity.name + " document : " + doc, e);
+ epw.getEntity().getName() + " document : " + doc, e);
}
if (e.getErrCode() == DataImportHandlerException.SEVERE)
throw e;
@ -522,13 +537,13 @@ public class DocBuilder {
throw e;
} catch (Throwable t) {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, t);
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
}
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
} finally {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ROW_END, entity.name, null);
if (entity.isDocRoot)
getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
if (epw.getEntity().isDocRoot())
getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
}
}
@ -609,19 +624,19 @@ public class DocBuilder {
}
@SuppressWarnings("unchecked")
private void addFields(DataConfig.Entity entity, DocWrapper doc,
private void addFields(Entity entity, DocWrapper doc,
Map<String, Object> arow, VariableResolver vr) {
for (Map.Entry<String, Object> entry : arow.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value == null) continue;
if (key.startsWith("$")) continue;
List<DataConfig.Field> field = entity.colNameVsField.get(key);
Set<EntityField> field = entity.getColNameVsField().get(key);
if (field == null && dataImporter.getSchema() != null) {
// This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
SchemaField sf = dataImporter.getSchema().getFieldOrNull(key);
if (sf == null) {
sf = dataImporter.getConfig().lowerNameVsSchemaField.get(key.toLowerCase(Locale.ENGLISH));
sf = dataImporter.getSchemaField(key);
}
if (sf != null) {
addFieldToDoc(entry.getValue(), sf.getName(), 1.0f, sf.multiValued(), doc);
@ -629,12 +644,14 @@ public class DocBuilder {
//else do nothing. if we add it it may fail
} else {
if (field != null) {
for (DataConfig.Field f : field) {
for (EntityField f : field) {
String name = f.getName();
if(f.dynamicName){
if(f.isDynamicName()){
name = vr.replaceTokens(name);
}
if (f.toWrite) addFieldToDoc(entry.getValue(), name, f.boost, f.multiValued, doc);
if (f.isToWrite()) {
addFieldToDoc(entry.getValue(), name, f.getBoost(), f.isMultiValued(), doc);
}
}
}
}
@ -668,22 +685,25 @@ public class DocBuilder {
}
}
private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {
if (entity.processor != null)
return entity.processor;
private EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
EntityProcessor entityProcessor = null;
if (entity.proc == null) {
if (entity.getProcessorName() == null) {
entityProcessor = new SqlEntityProcessor();
} else {
try {
entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
entityProcessor = (EntityProcessor) loadClass(entity.getProcessorName(), dataImporter.getCore())
.newInstance();
} catch (Exception e) {
wrapAndThrow (SEVERE,e,
"Unable to load EntityProcessor implementation for entity:" + entity.name);
"Unable to load EntityProcessor implementation for entity:" + entity.getName());
}
}
return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
EntityProcessorWrapper epw = new EntityProcessorWrapper(entityProcessor, entity, this);
for(Entity e1 : entity.getChildren()) {
epw.getChildren().add(getEntityProcessorWrapper(e1));
}
return epw;
}
private String findMatchingPkColumn(String pk, Map<String, Object> row) {
@ -716,37 +736,34 @@ public class DocBuilder {
* @return an iterator to the list of keys for which Solr documents should be updated.
*/
@SuppressWarnings("unchecked")
public Set<Map<String, Object>> collectDelta(DataConfig.Entity entity, VariableResolverImpl resolver,
public Set<Map<String, Object>> collectDelta(EntityProcessorWrapper epw, VariableResolverImpl resolver,
Set<Map<String, Object>> deletedRows) {
//someone called abort
if (stop.get())
return new HashSet();
EntityProcessor entityProcessor = getEntityProcessor(entity);
ContextImpl context1 = new ContextImpl(entity, resolver, null, Context.FIND_DELTA, session, null, this);
entityProcessor.init(context1);
ContextImpl context1 = new ContextImpl(epw, resolver, null, Context.FIND_DELTA, session, null, this);
epw.init(context1);
Set<Map<String, Object>> myModifiedPks = new HashSet<Map<String, Object>>();
if (entity.entities != null) {
for (DataConfig.Entity entity1 : entity.entities) {
//this ensures that we start from the leaf nodes
myModifiedPks.addAll(collectDelta(entity1, resolver, deletedRows));
//someone called abort
if (stop.get())
return new HashSet();
}
for (EntityProcessorWrapper childEpw : epw.getChildren()) {
//this ensures that we start from the leaf nodes
myModifiedPks.addAll(collectDelta(childEpw, resolver, deletedRows));
//someone called abort
if (stop.get())
return new HashSet();
}
// identifying the modified rows for this entity
Map<String, Map<String, Object>> deltaSet = new HashMap<String, Map<String, Object>>();
LOG.info("Running ModifiedRowKey() for Entity: " + entity.name);
LOG.info("Running ModifiedRowKey() for Entity: " + epw.getEntity().getName());
//get the modified rows in this entity
String pk = entity.getPk();
String pk = epw.getEntity().getPk();
while (true) {
Map<String, Object> row = entityProcessor.nextModifiedRowKey();
Map<String, Object> row = epw.nextModifiedRowKey();
if (row == null)
break;
@ -766,7 +783,7 @@ public class DocBuilder {
//get the deleted rows for this entity
Set<Map<String, Object>> deletedSet = new HashSet<Map<String, Object>>();
while (true) {
Map<String, Object> row = entityProcessor.nextDeletedRowKey();
Map<String, Object> row = epw.nextDeletedRowKey();
if (row == null)
break;
@ -790,36 +807,36 @@ public class DocBuilder {
return new HashSet();
}
LOG.info("Completed ModifiedRowKey for Entity: " + entity.name + " rows obtained : " + deltaSet.size());
LOG.info("Completed DeletedRowKey for Entity: " + entity.name + " rows obtained : " + deletedSet.size());
LOG.info("Completed ModifiedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deltaSet.size());
LOG.info("Completed DeletedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deletedSet.size());
myModifiedPks.addAll(deltaSet.values());
Set<Map<String, Object>> parentKeyList = new HashSet<Map<String, Object>>();
//all that we have captured is useless (in a sub-entity) if no rows in the parent is modified because of these
//propogate up the changes in the chain
if (entity.parentEntity != null) {
if (epw.getEntity().getParentEntity() != null) {
// identifying deleted rows with deltas
for (Map<String, Object> row : myModifiedPks) {
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, entityProcessor, parentKeyList);
getModifiedParentRows(resolver.addNamespace(epw.getEntity().getName(), row), epw.getEntity().getName(), epw, parentKeyList);
// check for abort
if (stop.get())
return new HashSet();
}
// running the same for deletedrows
for (Map<String, Object> row : deletedSet) {
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, entityProcessor, parentKeyList);
getModifiedParentRows(resolver.addNamespace(epw.getEntity().getName(), row), epw.getEntity().getName(), epw, parentKeyList);
// check for abort
if (stop.get())
return new HashSet();
}
}
LOG.info("Completed parentDeltaQuery for Entity: " + entity.name);
if (entity.isDocRoot)
LOG.info("Completed parentDeltaQuery for Entity: " + epw.getEntity().getName());
if (epw.getEntity().isDocRoot())
deletedRows.addAll(deletedSet);
// Do not use entity.isDocRoot here because one of descendant entities may set rootEntity="true"
return entity.parentEntity == null ?
return epw.getEntity().getParentEntity() == null ?
myModifiedPks : new HashSet<Map<String, Object>>(parentKeyList);
}
@ -859,6 +876,13 @@ public class DocBuilder {
% 60 + "." + l % 1000;
}
public RequestInfo getReqParams() {
return reqParams;
}
@SuppressWarnings("unchecked")
static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
try {
@ -913,7 +937,7 @@ public class DocBuilder {
private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
delQuery = getVariableResolver().replaceTokens(delQuery);
if (requestParameters.clean) {
if (reqParams.isClean()) {
if (delQuery == null && !completeCleanDone.get()) {
writer.doDeleteAll();
completeCleanDone.set(true);

View File

@ -17,6 +17,9 @@
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrException;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.Entity;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
import static org.apache.solr.handler.dataimport.EntityProcessorBase.*;
import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP;
@ -38,19 +41,23 @@ public class EntityProcessorWrapper extends EntityProcessor {
private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class);
private EntityProcessor delegate;
private Entity entity;
private DataSource datasource;
private List<EntityProcessorWrapper> children = new ArrayList<EntityProcessorWrapper>();
private DocBuilder docBuilder;
private boolean initalized;
private String onError;
private Context context;
private VariableResolverImpl resolver;
private Context context;
private VariableResolverImpl resolver;
private String entityName;
protected List<Transformer> transformers;
protected List<Map<String, Object>> rowcache;
public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
public EntityProcessorWrapper(EntityProcessor delegate, Entity entity, DocBuilder docBuilder) {
this.delegate = delegate;
this.entity = entity;
this.docBuilder = docBuilder;
}
@ -62,7 +69,7 @@ public class EntityProcessorWrapper extends EntityProcessor {
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) onError = ABORT;
entityName = context.getEntityAttribute(DataConfig.NAME);
entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
}
delegate.init(context);
@ -293,4 +300,28 @@ public class EntityProcessorWrapper extends EntityProcessor {
public void close() {
delegate.close();
}
public Entity getEntity() {
return entity;
}
public List<EntityProcessorWrapper> getChildren() {
return children;
}
public DataSource getDatasource() {
return datasource;
}
public void setDatasource(DataSource datasource) {
this.datasource = datasource;
}
public boolean isInitalized() {
return initalized;
}
public void setInitalized(boolean initalized) {
this.initalized = initalized;
}
}

View File

@ -17,11 +17,12 @@ package org.apache.solr.handler.dataimport;
*/
import org.apache.solr.core.SolrCore;
import static org.apache.solr.handler.dataimport.DataConfig.CLASS;
import static org.apache.solr.handler.dataimport.DataConfig.NAME;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import static org.apache.solr.handler.dataimport.DocBuilder.loadClass;
import static org.apache.solr.handler.dataimport.config.ConfigNameConstants.CLASS;
import static org.apache.solr.handler.dataimport.config.ConfigNameConstants.NAME;
import org.apache.solr.util.DateMathParser;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.slf4j.Logger;

View File

@ -0,0 +1,149 @@
package org.apache.solr.handler.dataimport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.StrUtils;
public class RequestInfo {
private final String command;
private final boolean debug;
private final boolean syncMode;
private final boolean commit;
private final boolean optimize;
private final int start;
private final long rows;
private final boolean clean;
private final List<String> entitiesToRun;
private final Map<String,Object> rawParams;
private final String dataConfig;
//TODO: find a different home for these two...
private final ContentStream contentStream;
private final DebugInfo debugInfo;
public RequestInfo(Map<String,Object> requestParams, ContentStream stream) {
this.contentStream = stream;
if (requestParams.containsKey("command")) {
command = (String) requestParams.get("command");
} else {
command = null;
}
boolean debugMode = StrUtils.parseBool((String) requestParams.get("debug"), false);
if (debugMode) {
debug = true;
debugInfo = new DebugInfo(requestParams);
} else {
debug = false;
debugInfo = null;
}
if (requestParams.containsKey("clean")) {
clean = StrUtils.parseBool( (String) requestParams.get("clean"), true);
} else if (DataImporter.DELTA_IMPORT_CMD.equals(command) || DataImporter.IMPORT_CMD.equals(command)) {
clean = false;
} else {
clean = debug ? false : true;
}
optimize = StrUtils.parseBool((String) requestParams.get("optimize"), false);
if(optimize) {
commit = true;
} else {
commit = StrUtils.parseBool( (String) requestParams.get("commit"), (debug ? false : true));
}
if (requestParams.containsKey("rows")) {
rows = Integer.parseInt((String) requestParams.get("rows"));
} else {
rows = debug ? 10 : Long.MAX_VALUE;
}
if (requestParams.containsKey("start")) {
start = Integer.parseInt((String) requestParams.get("start"));
} else {
start = 0;
}
syncMode = StrUtils.parseBool((String) requestParams.get("synchronous"), false);
Object o = requestParams.get("entity");
List<String> modifiableEntities = null;
if(o != null) {
if (o instanceof String) {
modifiableEntities = new ArrayList<String>();
modifiableEntities.add((String) o);
} else if (o instanceof List<?>) {
@SuppressWarnings("unchecked")
List<String> modifiableEntities1 = new ArrayList<String>((List<String>) o);
modifiableEntities = modifiableEntities1;
}
entitiesToRun = Collections.unmodifiableList(modifiableEntities);
} else {
entitiesToRun = null;
}
String dataConfigParam = (String) requestParams.get("dataConfig");
if (dataConfigParam != null && dataConfigParam.trim().length() == 0) {
// Empty data-config param is not valid, change it to null
dataConfigParam = null;
}
dataConfig = dataConfigParam;
this.rawParams = Collections.unmodifiableMap(new HashMap<String,Object>(requestParams));
}
public String getCommand() {
return command;
}
public boolean isDebug() {
return debug;
}
public boolean isSyncMode() {
return syncMode;
}
public boolean isCommit() {
return commit;
}
public boolean isOptimize() {
return optimize;
}
public int getStart() {
return start;
}
public long getRows() {
return rows;
}
public boolean isClean() {
return clean;
}
/**
* Returns null if we are to run all entities, otherwise just run the entities named in the list.
* @return
*/
public List<String> getEntitiesToRun() {
return entitiesToRun;
}
public String getDataConfig() {
return dataConfig;
}
public Map<String,Object> getRawParams() {
return rawParams;
}
public ContentStream getContentStream() {
return contentStream;
}
public DebugInfo getDebugInfo() {
return debugInfo;
}
}

View File

@ -39,7 +39,7 @@ import java.io.*;
public class SolrWriter extends DIHWriterBase implements DIHWriter {
private static final Logger log = LoggerFactory.getLogger(SolrWriter.class);
static final String LAST_INDEX_KEY = "last_index_time";
public static final String LAST_INDEX_KEY = "last_index_time";
private final UpdateRequestProcessor processor;
private final int commitWithin;

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport.config;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.solr.handler.dataimport.SolrWriter;
public class ConfigNameConstants {
public static final String SCRIPT = "script";
public static final String NAME = "name";
public static final String PROCESSOR = "processor";
/**
* @deprecated use IMPORTER_NS_SHORT instead
*/
@Deprecated
public static final String IMPORTER_NS = "dataimporter";
public static final String IMPORTER_NS_SHORT = "dih";
public static final String ROOT_ENTITY = "rootEntity";
public static final String FUNCTION = "function";
public static final String CLASS = "class";
public static final String DATA_SRC = "dataSource";
public static final Set<String> RESERVED_WORDS;
static{
Set<String> rw = new HashSet<String>();
rw.add(IMPORTER_NS);
rw.add(IMPORTER_NS_SHORT);
rw.add("request");
rw.add("delta");
rw.add("functions");
rw.add("session");
rw.add(SolrWriter.LAST_INDEX_KEY);
RESERVED_WORDS = Collections.unmodifiableSet(rw);
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.handler.dataimport.DataImporter;
import org.apache.solr.schema.SchemaField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class ConfigParseUtil {
private static final Logger LOG = LoggerFactory .getLogger(ConfigParseUtil.class);
public static String getStringAttribute(Element e, String name, String def) {
String r = e.getAttribute(name);
if (r == null || "".equals(r.trim())) r = def;
return r;
}
public static HashMap<String,String> getAllAttributes(Element e) {
HashMap<String,String> m = new HashMap<String,String>();
NamedNodeMap nnm = e.getAttributes();
for (int i = 0; i < nnm.getLength(); i++) {
m.put(nnm.item(i).getNodeName(), nnm.item(i).getNodeValue());
}
return m;
}
public static String getText(Node elem, StringBuilder buffer) {
if (elem.getNodeType() != Node.CDATA_SECTION_NODE) {
NodeList childs = elem.getChildNodes();
for (int i = 0; i < childs.getLength(); i++) {
Node child = childs.item(i);
short childType = child.getNodeType();
if (childType != Node.COMMENT_NODE
&& childType != Node.PROCESSING_INSTRUCTION_NODE) {
getText(child, buffer);
}
}
} else {
buffer.append(elem.getNodeValue());
}
return buffer.toString();
}
public static List<Element> getChildNodes(Element e, String byName) {
List<Element> result = new ArrayList<Element>();
NodeList l = e.getChildNodes();
for (int i = 0; i < l.getLength(); i++) {
if (e.equals(l.item(i).getParentNode())
&& byName.equals(l.item(i).getNodeName())) result.add((Element) l
.item(i));
}
return result;
}
public static void verifyWithSchema(DataImporter di, Map<String,EntityField> fields) {
Map<String,SchemaField> schemaFields = null;
if (di.getSchema() == null) {
schemaFields = Collections.emptyMap();
} else {
schemaFields = di.getSchema().getFields();
}
for (Map.Entry<String,SchemaField> entry : schemaFields.entrySet()) {
SchemaField sf = entry.getValue();
if (!fields.containsKey(sf.getName())) {
if (sf.isRequired()) {
LOG
.info(sf.getName()
+ " is a required field in SolrSchema . But not found in DataConfig");
}
}
}
for (Map.Entry<String,EntityField> entry : fields.entrySet()) {
EntityField fld = entry.getValue();
SchemaField field = di.getSchemaField(fld.getName());
if (field == null) {
LOG
.info("The field :"
+ fld.getName()
+ " present in DataConfig does not have a counterpart in Solr Schema");
}
}
}
public static Map<String,EntityField> gatherAllFields(DataImporter di, Entity e) {
Map<String,EntityField> fields = new HashMap<String,EntityField>();
if (e.getFields() != null) {
for (EntityField f : e.getFields()) {
fields.put(f.getName(), f);
}
}
for (Entity e1 : e.getChildren()) {
fields.putAll(gatherAllFields(di, e1));
}
return fields;
}
}

View File

@ -0,0 +1,99 @@
package org.apache.solr.handler.dataimport.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* <p>
* Mapping for data-config.xml
* </p>
* <p/>
* <p>
* Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
* for more details.
* </p>
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @since solr 1.3
*/
public class DIHConfiguration {
// TODO - remove from here and add it to entity
private final String deleteQuery;
private final List<Entity> entities;
private final String onImportStart;
private final String onImportEnd;
private final List<Map<String, String>> functions;
private final Script script;
private final Map<String, Properties> dataSources;
public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Properties> dataSources) {
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
List<Entity> modEntities = new ArrayList<Entity>();
List<Element> l = ConfigParseUtil.getChildNodes(element, "entity");
boolean docRootFound = false;
for (Element e : l) {
Entity entity = new Entity(docRootFound, e, di, null);
Map<String, EntityField> fields = ConfigParseUtil.gatherAllFields(di, entity);
ConfigParseUtil.verifyWithSchema(di, fields);
modEntities.add(entity);
}
this.entities = Collections.unmodifiableList(modEntities);
if(functions==null) {
functions = Collections.emptyList();
}
List<Map<String, String>> modFunc = new ArrayList<Map<String, String>>(functions.size());
for(Map<String, String> f : functions) {
modFunc.add(Collections.unmodifiableMap(f));
}
this.functions = Collections.unmodifiableList(modFunc);
this.script = script;
this.dataSources = Collections.unmodifiableMap(dataSources);
}
public String getDeleteQuery() {
return deleteQuery;
}
public List<Entity> getEntities() {
return entities;
}
public String getOnImportStart() {
return onImportStart;
}
public String getOnImportEnd() {
return onImportEnd;
}
public List<Map<String,String>> getFunctions() {
return functions;
}
public Map<String,Properties> getDataSources() {
return dataSources;
}
public Script getScript() {
return script;
}
}

View File

@ -0,0 +1,99 @@
package org.apache.solr.handler.dataimport.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* <p>
* Mapping for data-config.xml
* </p>
* <p/>
* <p>
* Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
* for more details.
* </p>
* <p/>
* <b>This API is experimental and subject to change</b>
*
* @since solr 1.3
*/
public class Document {
// TODO - remove from here and add it to entity
private final String deleteQuery;
private final List<Entity> entities;
private final String onImportStart;
private final String onImportEnd;
private final List<Map<String, String>> functions;
private final Script script;
private final Map<String, Properties> dataSources;
public Document(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Properties> dataSources) {
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
List<Entity> modEntities = new ArrayList<Entity>();
List<Element> l = ConfigParseUtil.getChildNodes(element, "entity");
boolean docRootFound = false;
for (Element e : l) {
Entity entity = new Entity(docRootFound, e, di, null);
Map<String, EntityField> fields = ConfigParseUtil.gatherAllFields(di, entity);
ConfigParseUtil.verifyWithSchema(di, fields);
modEntities.add(entity);
}
this.entities = Collections.unmodifiableList(modEntities);
if(functions==null) {
functions = Collections.emptyList();
}
List<Map<String, String>> modFunc = new ArrayList<Map<String, String>>(functions.size());
for(Map<String, String> f : functions) {
modFunc.add(Collections.unmodifiableMap(f));
}
this.functions = Collections.unmodifiableList(modFunc);
this.script = script;
this.dataSources = Collections.unmodifiableMap(dataSources);
}
public String getDeleteQuery() {
return deleteQuery;
}
public List<Entity> getEntities() {
return entities;
}
public String getOnImportStart() {
return onImportStart;
}
public String getOnImportEnd() {
return onImportEnd;
}
public List<Map<String,String>> getFunctions() {
return functions;
}
public Map<String,Properties> getDataSources() {
return dataSources;
}
public Script getScript() {
return script;
}
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.solr.handler.dataimport.config;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.handler.dataimport.DataImportHandlerException;
import org.apache.solr.handler.dataimport.DataImporter;
import org.apache.solr.schema.SchemaField;
import org.w3c.dom.Element;
public class Entity {
private final String name;
private final String pk;
private final String pkMappingFromSchema;
private final String dataSourceName;
private final String processorName;
private final Entity parentEntity;
private final boolean docRoot;
private final List<Entity> children;
private final List<EntityField> fields;
private final Map<String,Set<EntityField>> colNameVsField;
private final Map<String,String> allAttributes;
private final List<Map<String,String>> allFieldAttributes;
public Entity(boolean docRootFound, Element element, DataImporter di, Entity parent) {
this.parentEntity = parent;
String modName = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.NAME, null);
if (modName == null) {
throw new DataImportHandlerException(SEVERE, "Entity must have a name.");
}
if (modName.indexOf(".") != -1) {
throw new DataImportHandlerException(SEVERE,
"Entity name must not have period (.): '" + modName);
}
if (ConfigNameConstants.RESERVED_WORDS.contains(modName)) {
throw new DataImportHandlerException(SEVERE, "Entity name : '" + modName
+ "' is a reserved keyword. Reserved words are: " + ConfigNameConstants.RESERVED_WORDS);
}
this.name = modName;
this.pk = ConfigParseUtil.getStringAttribute(element, "pk", null);
this.processorName = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.PROCESSOR,null);
this.dataSourceName = ConfigParseUtil.getStringAttribute(element, DataImporter.DATA_SRC, null);
String rawDocRootValue = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.ROOT_ENTITY, null);
if (!docRootFound && !"false".equals(rawDocRootValue)) {
// if in this chain no document root is found()
docRoot = true;
} else {
docRoot = false;
}
Map<String,String> modAttributes = ConfigParseUtil
.getAllAttributes(element);
modAttributes.put(ConfigNameConstants.DATA_SRC, this.dataSourceName);
this.allAttributes = Collections.unmodifiableMap(modAttributes);
List<Element> n = ConfigParseUtil.getChildNodes(element, "field");
List<EntityField> modFields = new ArrayList<EntityField>(n.size());
Map<String,Set<EntityField>> modColNameVsField = new HashMap<String,Set<EntityField>>();
List<Map<String,String>> modAllFieldAttributes = new ArrayList<Map<String,String>>();
for (Element elem : n) {
EntityField.Builder fieldBuilder = new EntityField.Builder(elem);
if (di.getSchema() != null) {
if (fieldBuilder.getNameOrColumn() != null
&& fieldBuilder.getNameOrColumn().contains("${")) {
fieldBuilder.dynamicName = true;
} else {
SchemaField schemaField = di.getSchemaField(fieldBuilder
.getNameOrColumn());
if (schemaField != null) {
fieldBuilder.name = schemaField.getName();
fieldBuilder.multiValued = schemaField.multiValued();
fieldBuilder.allAttributes.put(DataImporter.MULTI_VALUED, Boolean
.toString(schemaField.multiValued()));
fieldBuilder.allAttributes.put(DataImporter.TYPE, schemaField
.getType().getTypeName());
fieldBuilder.allAttributes.put("indexed", Boolean
.toString(schemaField.indexed()));
fieldBuilder.allAttributes.put("stored", Boolean
.toString(schemaField.stored()));
fieldBuilder.allAttributes.put("defaultValue", schemaField
.getDefaultValue());
} else {
fieldBuilder.toWrite = false;
}
}
}
Set<EntityField> fieldSet = modColNameVsField.get(fieldBuilder.column);
if (fieldSet == null) {
fieldSet = new HashSet<EntityField>();
modColNameVsField.put(fieldBuilder.column, fieldSet);
}
fieldBuilder.allAttributes.put("boost", Float
.toString(fieldBuilder.boost));
fieldBuilder.allAttributes.put("toWrite", Boolean
.toString(fieldBuilder.toWrite));
modAllFieldAttributes.add(fieldBuilder.allAttributes);
fieldBuilder.entity = this;
EntityField field = new EntityField(fieldBuilder);
fieldSet.add(field);
modFields.add(field);
}
Map<String,Set<EntityField>> modColNameVsField1 = new HashMap<String,Set<EntityField>>();
for (Map.Entry<String,Set<EntityField>> entry : modColNameVsField
.entrySet()) {
if (entry.getValue().size() > 0) {
modColNameVsField1.put(entry.getKey(), Collections
.unmodifiableSet(entry.getValue()));
}
}
this.colNameVsField = Collections.unmodifiableMap(modColNameVsField1);
this.fields = Collections.unmodifiableList(modFields);
this.allFieldAttributes = Collections
.unmodifiableList(modAllFieldAttributes);
String modPkMappingFromSchema = null;
if (di.getSchema() != null) {
SchemaField uniqueKey = di.getSchema().getUniqueKeyField();
if (uniqueKey != null) {
modPkMappingFromSchema = uniqueKey.getName();
// if no fields are mentioned . solr uniqueKey is same as dih 'pk'
for (EntityField field : fields) {
if (field.getName().equals(modPkMappingFromSchema)) {
modPkMappingFromSchema = field.getColumn();
// get the corresponding column mapping for the solr uniqueKey
// But if there are multiple columns mapping to the solr uniqueKey,
// it will fail
// so , in one off cases we may need pk
break;
}
}
}
}
pkMappingFromSchema = modPkMappingFromSchema;
n = ConfigParseUtil.getChildNodes(element, "entity");
List<Entity> modEntities = new ArrayList<Entity>();
for (Element elem : n) {
modEntities
.add(new Entity((docRootFound || this.docRoot), elem, di, this));
}
this.children = Collections.unmodifiableList(modEntities);
}
public String getPk() {
return pk == null ? pkMappingFromSchema : pk;
}
public String getSchemaPk() {
return pkMappingFromSchema != null ? pkMappingFromSchema : pk;
}
public String getName() {
return name;
}
public String getPkMappingFromSchema() {
return pkMappingFromSchema;
}
public String getDataSourceName() {
return dataSourceName;
}
public String getProcessorName() {
return processorName;
}
public Entity getParentEntity() {
return parentEntity;
}
public boolean isDocRoot() {
return docRoot;
}
public List<Entity> getChildren() {
return children;
}
public List<EntityField> getFields() {
return fields;
}
public Map<String,Set<EntityField>> getColNameVsField() {
return colNameVsField;
}
public Map<String,String> getAllAttributes() {
return allAttributes;
}
public List<Map<String,String>> getAllFieldsList() {
return allFieldAttributes;
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport.config;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.handler.dataimport.ConfigParseUtil;
import org.apache.solr.handler.dataimport.DataImportHandlerException;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
public class EntityField {
private final String column;
private final String name;
private final float boost;
private final boolean toWrite;
private final boolean multiValued;
private final boolean dynamicName;
private final Entity entity;
private final Map<String, String> allAttributes;
public EntityField(Builder b) {
this.column = b.column;
this.name = b.name;
this.boost = b.boost;
this.toWrite = b.toWrite;
this.multiValued = b.multiValued;
this.dynamicName = b.dynamicName;
this.entity = b.entity;
this.allAttributes = Collections.unmodifiableMap(new HashMap<String,String>(b.allAttributes));
}
public String getName() {
return name == null ? column : name;
}
public Entity getEntity() {
return entity;
}
public String getColumn() {
return column;
}
public float getBoost() {
return boost;
}
public boolean isToWrite() {
return toWrite;
}
public boolean isMultiValued() {
return multiValued;
}
public boolean isDynamicName() {
return dynamicName;
}
public Map<String,String> getAllAttributes() {
return allAttributes;
}
public static class Builder {
public String column;
public String name;
public float boost;
public boolean toWrite = true;
public boolean multiValued = false;
public boolean dynamicName = false;
public Entity entity;
public Map<String, String> allAttributes = new HashMap<String,String>();
public Builder(Element e) {
this.name = ConfigParseUtil.getStringAttribute(e, DataImporter.NAME, null);
this.column = ConfigParseUtil.getStringAttribute(e, DataImporter.COLUMN, null);
if (column == null) {
throw new DataImportHandlerException(SEVERE, "Field must have a column attribute");
}
this.boost = Float.parseFloat(ConfigParseUtil.getStringAttribute(e, "boost", "1.0f"));
this.allAttributes = new HashMap<String, String>(ConfigParseUtil.getAllAttributes(e));
}
public String getNameOrColumn() {
return name==null ? column : name;
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport.config;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.solr.handler.dataimport.ConfigParseUtil;
import org.apache.solr.handler.dataimport.DataImportHandlerException;
import org.apache.solr.handler.dataimport.DataImporter;
import org.w3c.dom.Element;
public class Field {
private final String column;
private final String name;
private final float boost;
private final boolean toWrite;
private final boolean multiValued;
private final boolean dynamicName;
private final Entity entity;
private final Map<String, String> allAttributes;
public Field(Builder b) {
this.column = b.column;
this.name = b.name;
this.boost = b.boost;
this.toWrite = b.toWrite;
this.multiValued = b.multiValued;
this.dynamicName = b.dynamicName;
this.entity = b.entity;
this.allAttributes = Collections.unmodifiableMap(new HashMap<String,String>(b.allAttributes));
}
public String getName() {
return name == null ? column : name;
}
public Entity getEntity() {
return entity;
}
public String getColumn() {
return column;
}
public float getBoost() {
return boost;
}
public boolean isToWrite() {
return toWrite;
}
public boolean isMultiValued() {
return multiValued;
}
public boolean isDynamicName() {
return dynamicName;
}
public Map<String,String> getAllAttributes() {
return allAttributes;
}
public static class Builder {
public String column;
public String name;
public float boost;
public boolean toWrite = true;
public boolean multiValued = false;
public boolean dynamicName;
public Entity entity;
public Map<String, String> allAttributes = new HashMap<String,String>();
public Builder(Element e) {
this.name = ConfigParseUtil.getStringAttribute(e, DataImporter.NAME, null);
this.column = ConfigParseUtil.getStringAttribute(e, DataImporter.COLUMN, null);
if (column == null) {
throw new DataImportHandlerException(SEVERE, "Field must have a column attribute");
}
this.boost = Float.parseFloat(ConfigParseUtil.getStringAttribute(e, "boost", "1.0f"));
this.allAttributes = new HashMap<String, String>(ConfigParseUtil.getAllAttributes(e));
}
public String getNameOrColumn() {
return name==null ? column : name;
}
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.solr.handler.dataimport.config;
import org.w3c.dom.Element;
public class Script {
private final String language;
private final String text;
public Script(Element e) {
this.language = ConfigParseUtil.getStringAttribute(e, "language", "JavaScript");
StringBuilder buffer = new StringBuilder();
String script = ConfigParseUtil.getText(e, buffer);
if (script != null) {
this.text = script.trim();
} else {
this.text = null;
}
}
public String getLanguage() {
return language;
}
public String getText() {
return text;
}
}

View File

@ -132,15 +132,15 @@ public abstract class AbstractDataImportHandlerTestCase extends
* Helper for creating a Context instance. Useful for testing Transformers
*/
@SuppressWarnings("unchecked")
public static TestContext getContext(DataConfig.Entity parentEntity,
public static TestContext getContext(EntityProcessorWrapper parent,
VariableResolverImpl resolver, DataSource parentDataSource,
String currProcess, final List<Map<String, String>> entityFields,
final Map<String, String> entityAttrs) {
if (resolver == null) resolver = new VariableResolverImpl();
final Context delegate = new ContextImpl(parentEntity, resolver,
final Context delegate = new ContextImpl(parent, resolver,
parentDataSource, currProcess,
new HashMap<String, Object>(), null, null);
return new TestContext(entityAttrs, delegate, entityFields, parentEntity == null);
return new TestContext(entityAttrs, delegate, entityFields, parent == null);
}
/**

View File

@ -49,7 +49,7 @@ public class TestCachedSqlEntityProcessor extends AbstractDataImportHandlerTestC
rows.add(createMap("id", 1, "desc", "one"));
rows.add(createMap("id", 1, "desc", "another one"));
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
EntityProcessor csep = new EntityProcessorWrapper(new CachedSqlEntityProcessor(), null, null);
csep.init(context);
rows = new ArrayList<Map<String, Object>>();
while (true) {
@ -90,7 +90,7 @@ public class TestCachedSqlEntityProcessor extends AbstractDataImportHandlerTestC
rows.add(createMap("id", 1, "desc", "one"));
rows.add(createMap("id", 1, "desc", "another one"));
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null, null);
csep.init(context);
rows = new ArrayList<Map<String, Object>>();
while (true) {
@ -132,7 +132,7 @@ public class TestCachedSqlEntityProcessor extends AbstractDataImportHandlerTestC
rows.add(createMap("id", 1, "desc", "one"));
rows.add(createMap("id", 1, "desc", "another one"));
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null, null);
csep.init(context);
rows = new ArrayList<Map<String, Object>>();
while (true) {
@ -223,7 +223,7 @@ public class TestCachedSqlEntityProcessor extends AbstractDataImportHandlerTestC
rows.add(createMap("id", 3, "desc", "another three"));
rows.add(createMap("id", 3, "desc", "another another three"));
MockDataSource.setIterator(q, rows.iterator());
EntityProcessor csep = new EntityProcessorWrapper(new CachedSqlEntityProcessor(), null);
EntityProcessor csep = new EntityProcessorWrapper(new CachedSqlEntityProcessor(), null, null);
csep.init(context);
rows = new ArrayList<Map<String, Object>>();
while (true) {

View File

@ -18,15 +18,14 @@ package org.apache.solr.handler.dataimport;
import java.util.HashMap;
import org.apache.solr.handler.dataimport.DataConfig.Entity;
import org.apache.solr.handler.dataimport.DataImporter.RequestParams;
import org.apache.solr.handler.dataimport.RequestInfo;
import org.junit.Test;
public class TestContextImpl extends AbstractDataImportHandlerTestCase {
@Test
public void testEntityScope() {
ContextImpl ctx = new ContextImpl(new Entity(), new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
ContextImpl ctx = new ContextImpl(null, new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
String lala = new String("lala");
ctx.setSessionAttribute("huhu", lala, Context.SCOPE_ENTITY);
Object got = ctx.getSessionAttribute("huhu", Context.SCOPE_ENTITY);
@ -38,8 +37,8 @@ public class TestContextImpl extends AbstractDataImportHandlerTestCase {
public void testCoreScope() {
DataImporter di = new DataImporter();
di.loadAndInit("<dataConfig><document /></dataConfig>");
DocBuilder db = new DocBuilder(di, new SolrWriter(null, null),new SimplePropertiesWriter(), new RequestParams());
ContextImpl ctx = new ContextImpl(new Entity(), new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, db);
DocBuilder db = new DocBuilder(di, new SolrWriter(null, null),new SimplePropertiesWriter(), new RequestInfo(new HashMap<String,Object>(), null));
ContextImpl ctx = new ContextImpl(null, new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, db);
String lala = new String("lala");
ctx.setSessionAttribute("huhu", lala, Context.SCOPE_SOLR_CORE);
Object got = ctx.getSessionAttribute("huhu", Context.SCOPE_SOLR_CORE);
@ -48,7 +47,7 @@ public class TestContextImpl extends AbstractDataImportHandlerTestCase {
}
@Test
public void testDocumentScope() {
ContextImpl ctx = new ContextImpl(new Entity(), new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
ContextImpl ctx = new ContextImpl(null, new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
ctx.setDoc(new DocBuilder.DocWrapper());
String lala = new String("lala");
ctx.setSessionAttribute("huhu", lala, Context.SCOPE_DOC);
@ -59,7 +58,7 @@ public class TestContextImpl extends AbstractDataImportHandlerTestCase {
}
@Test
public void testGlobalScope() {
ContextImpl ctx = new ContextImpl(new Entity(), new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
ContextImpl ctx = new ContextImpl(null, new VariableResolverImpl(), null, "something", new HashMap<String,Object>(), null, null);
String lala = new String("lala");
ctx.setSessionAttribute("huhu", lala, Context.SCOPE_GLOBAL);
Object got = ctx.getSessionAttribute("huhu", Context.SCOPE_GLOBAL);

View File

@ -16,6 +16,8 @@
*/
package org.apache.solr.handler.dataimport;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
@ -57,10 +59,9 @@ public class TestDataConfig extends AbstractDataImportHandlerTestCase {
javax.xml.parsers.DocumentBuilder builder = DocumentBuilderFactory
.newInstance().newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(xml)));
DataConfig dc = new DataConfig();
dc.readFromXml(doc.getDocumentElement());
assertEquals("atrimlisting", dc.document.entities.get(0).name);
DataImporter di = new DataImporter();
DIHConfiguration dc = di.readFromXml(doc);
assertEquals("atrimlisting", dc.getEntities().get(0).getName());
}
private static final String xml = "<dataConfig>\n"

View File

@ -17,6 +17,9 @@
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.junit.After;
import org.junit.Test;
@ -42,7 +45,8 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
@Test
public void loadClass() throws Exception {
Class clz = DocBuilder.loadClass("RegexTransformer", null);
@SuppressWarnings("unchecked")
Class<Transformer> clz = DocBuilder.loadClass("RegexTransformer", null);
assertNotNull(clz);
}
@ -50,13 +54,10 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
public void singleEntityNoRows() {
DataImporter di = new DataImporter();
di.loadAndInit(dc_singleEntity);
DataConfig cfg = di.getConfig();
DataConfig.Entity ent = cfg.document.entities.get(0);
MockDataSource.setIterator("select * from x", new ArrayList().iterator());
ent.dataSrc = new MockDataSource();
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
DIHConfiguration cfg = di.getConfig();
Entity ent = cfg.getEntities().get(0);
MockDataSource.setIterator("select * from x", new ArrayList<Map<String, Object>>().iterator());
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals(Boolean.TRUE, swi.deleteAllCalled);
@ -72,13 +73,11 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
public void testDeltaImportNoRows_MustNotCommit() {
DataImporter di = new DataImporter();
di.loadAndInit(dc_deltaConfig);
DataConfig cfg = di.getConfig();
DataConfig.Entity ent = cfg.document.entities.get(0);
MockDataSource.setIterator("select * from x", new ArrayList().iterator());
MockDataSource.setIterator("select id from x", new ArrayList().iterator());
ent.dataSrc = new MockDataSource();
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams(createMap("command", "delta-import"));
DIHConfiguration cfg = di.getConfig();
Entity ent = cfg.getEntities().get(0);
MockDataSource.setIterator("select * from x", new ArrayList<Map<String, Object>>().iterator());
MockDataSource.setIterator("select id from x", new ArrayList<Map<String, Object>>().iterator());
RequestInfo rp = new RequestInfo(createMap("command", "delta-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals(Boolean.FALSE, swi.deleteAllCalled);
@ -94,15 +93,12 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
public void singleEntityOneRow() {
DataImporter di = new DataImporter();
di.loadAndInit(dc_singleEntity);
DataConfig cfg = di.getConfig();
DataConfig.Entity ent = cfg.document.entities.get(0);
List l = new ArrayList();
DIHConfiguration cfg = di.getConfig();
Entity ent = cfg.getEntities().get(0);
List<Map<String, Object>> l = new ArrayList<Map<String, Object>>();
l.add(createMap("id", 1, "desc", "one"));
MockDataSource.setIterator("select * from x", l.iterator());
ent.dataSrc = new MockDataSource();
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals(Boolean.TRUE, swi.deleteAllCalled);
@ -114,7 +110,7 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
assertEquals(1, di.getDocBuilder().importStatistics.rowsCount.get());
for (int i = 0; i < l.size(); i++) {
Map<String, Object> map = (Map<String, Object>) l.get(i);
Map<String, Object> map = l.get(i);
SolrInputDocument doc = swi.docs.get(i);
for (Map.Entry<String, Object> entry : map.entrySet()) {
assertEquals(entry.getValue(), doc.getFieldValue(entry.getKey()));
@ -126,14 +122,12 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
public void testImportCommand() {
DataImporter di = new DataImporter();
di.loadAndInit(dc_singleEntity);
DataConfig cfg = di.getConfig();
DataConfig.Entity ent = cfg.document.entities.get(0);
List l = new ArrayList();
DIHConfiguration cfg = di.getConfig();
Entity ent = cfg.getEntities().get(0);
List<Map<String, Object>> l = new ArrayList<Map<String, Object>>();
l.add(createMap("id", 1, "desc", "one"));
MockDataSource.setIterator("select * from x", l.iterator());
ent.dataSrc = new MockDataSource();
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams(createMap("command", "import"));
RequestInfo rp = new RequestInfo(createMap("command", "import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals(Boolean.FALSE, swi.deleteAllCalled);
@ -157,18 +151,15 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
public void singleEntityMultipleRows() {
DataImporter di = new DataImporter();
di.loadAndInit(dc_singleEntity);
DataConfig cfg = di.getConfig();
DataConfig.Entity ent = cfg.document.entities.get(0);
ent.isDocRoot = true;
DataImporter.RequestParams rp = new DataImporter.RequestParams();
rp.command = "full-import";
List l = new ArrayList();
DIHConfiguration cfg = di.getConfig();
Entity ent = cfg.getEntities().get(0);
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
List<Map<String, Object>> l = new ArrayList<Map<String, Object>>();
l.add(createMap("id", 1, "desc", "one"));
l.add(createMap("id", 2, "desc", "two"));
l.add(createMap("id", 3, "desc", "three"));
MockDataSource.setIterator("select * from x", l.iterator());
ent.dataSrc = new MockDataSource();
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals(Boolean.TRUE, swi.deleteAllCalled);

View File

@ -47,7 +47,7 @@ public class TestEntityProcessorBase extends AbstractDataImportHandlerTestCase {
Map<String, Object> src = new HashMap<String, Object>();
src.put("A", "NA");
src.put("B", "NA");
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null, null);
sep.init(context);
Map<String, Object> res = sep.applyTransformer(src);
assertNotNull(res.get("T1"));

View File

@ -36,7 +36,7 @@ public class TestFieldReader extends AbstractDataImportHandlerTestCase {
DataImporter di = new DataImporter();
di.loadAndInit(config);
TestDocBuilder.SolrWriterImpl sw = new TestDocBuilder.SolrWriterImpl();
DataImporter.RequestParams rp = new DataImporter.RequestParams(createMap("command", "full-import"));
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
List<Map<String, Object>> l = new ArrayList<Map<String, Object>>();
l.add(createMap("xml", xml));
MockDataSource.setIterator("select * from a", l.iterator());

View File

@ -34,7 +34,7 @@ public class TestPlainTextEntityProcessor extends AbstractDataImportHandlerTestC
DataImporter di = new DataImporter();
di.loadAndInit(DATA_CONFIG);
TestDocBuilder.SolrWriterImpl sw = new TestDocBuilder.SolrWriterImpl();
DataImporter.RequestParams rp = new DataImporter.RequestParams(createMap("command", "full-import"));
RequestInfo rp = new RequestInfo(createMap("command", "full-import"), null);
di.runCmd(rp, sw);
assertEquals(DS.s, sw.docs.get(0).getFieldValue("x"));
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.solr.handler.dataimport;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@ -48,7 +50,7 @@ public class TestScriptTransformer extends AbstractDataImportHandlerTestCase {
Context context = getContext("f1", script);
Map<String, Object> map = new HashMap<String, Object>();
map.put("name", "Scott");
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null, null);
sep.init(context);
sep.applyTransformer(map);
assertEquals(map.get("name"), "Hello Scott");
@ -81,7 +83,7 @@ public class TestScriptTransformer extends AbstractDataImportHandlerTestCase {
Context context = getContext("f1", script);
Map<String, Object> map = new HashMap<String, Object>();
map.put("name", "Scott");
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null, null);
sep.init(context);
sep.applyTransformer(map);
assertEquals(map.get("name"), "Hello Scott");
@ -98,10 +100,9 @@ public class TestScriptTransformer extends AbstractDataImportHandlerTestCase {
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
.newDocumentBuilder();
Document document = builder.parse(new InputSource(new StringReader(xml)));
DataConfig config = new DataConfig();
config.readFromXml((Element) document.getElementsByTagName("dataConfig")
.item(0));
assertTrue(config.script.text.indexOf("checkNextToken") > -1);
DataImporter di = new DataImporter();
DIHConfiguration dc = di.readFromXml(document);
assertTrue(dc.getScript().getText().indexOf("checkNextToken") > -1);
} catch (DataImportHandlerException e) {
assumeFalse("This JVM does not have Rhino installed. Test Skipped.", e
.getMessage().startsWith("Cannot load Script Engine for language"));
@ -115,15 +116,13 @@ public class TestScriptTransformer extends AbstractDataImportHandlerTestCase {
DocumentBuilder builder = DocumentBuilderFactory.newInstance()
.newDocumentBuilder();
Document document = builder.parse(new InputSource(new StringReader(xml)));
DataConfig config = new DataConfig();
config.readFromXml((Element) document.getElementsByTagName("dataConfig")
.item(0));
Context c = getContext("checkNextToken", config.script.text);
DataImporter di = new DataImporter();
DIHConfiguration dc = di.readFromXml(document);
Context c = getContext("checkNextToken", dc.getScript().getText());
Map map = new HashMap();
map.put("nextToken", "hello");
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null, null);
sep.init(c);
sep.applyTransformer(map);
assertEquals("true", map.get("$hasMore"));

View File

@ -53,7 +53,7 @@ public class TestSqlEntityProcessor extends AbstractDataImportHandlerTestCase {
@Test
public void testTranformer() {
EntityProcessor sep = new EntityProcessorWrapper( new SqlEntityProcessor(), null);
EntityProcessor sep = new EntityProcessorWrapper( new SqlEntityProcessor(), null, null);
List<Map<String, Object>> rows = getRows(2);
VariableResolverImpl vr = new VariableResolverImpl();
HashMap<String, String> ea = new HashMap<String, String>();
@ -76,7 +76,7 @@ public class TestSqlEntityProcessor extends AbstractDataImportHandlerTestCase {
@Test
public void testTranformerWithReflection() {
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null, null);
List<Map<String, Object>> rows = getRows(2);
VariableResolverImpl vr = new VariableResolverImpl();
HashMap<String, String> ea = new HashMap<String, String>();
@ -99,7 +99,7 @@ public class TestSqlEntityProcessor extends AbstractDataImportHandlerTestCase {
@Test
public void testTranformerList() {
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(),null);
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(),null, null);
List<Map<String, Object>> rows = getRows(2);
VariableResolverImpl vr = new VariableResolverImpl();