diff --git a/contrib/dataimporthandler/CHANGES.txt b/contrib/dataimporthandler/CHANGES.txt index ef4f2a8db40..d7d82deee8a 100644 --- a/contrib/dataimporthandler/CHANGES.txt +++ b/contrib/dataimporthandler/CHANGES.txt @@ -24,6 +24,11 @@ The Context API has been changed in a non back-compatible way. In particular, th now returns a String describing the type of the current import process instead of an int. Similarily, the public constants in Context viz. FULL_DUMP, DELTA_DUMP and FIND_DELTA are changed to a String type. See SOLR-969 for details. +The EntityProcessor API has been simplified by moving logic for applying transformers and handling multi-row outputs +from Transformers into an EntityProcessorWrapper class. The EntityProcessor#destroy is now called once per +parent-row at the end of row (end of data). A new method EntityProcessor#close is added which is called at the end +of import. + Detailed Change List ---------------------- @@ -253,6 +258,13 @@ Other Change Context.currentProcess() to return a string instead of an integer. (Kay Kay, Noble Paul, shalin) +9. SOLR-1120: Simplified EntityProcessor API by moving logic for applying transformers and handling multi-row outputs + from Transformers into an EntityProcessorWrapper class. The behavior of the method + EntityProcessor#destroy has been modified to be called once per parent-row at the end of row. A new + method EntityProcessor#close is added which is called at the end of import. A new method + Context#getResolvedEntityAttribute is added which returns the resolved value of an entity's attribute. + (Noble Paul, shalin) + ================== Release 1.3.0 20080915 ================== Status diff --git a/contrib/dataimporthandler/src/extras/main/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java b/contrib/dataimporthandler/src/extras/main/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java index 5da7d0557aa..1366c3f34ec 100644 --- a/contrib/dataimporthandler/src/extras/main/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java +++ b/contrib/dataimporthandler/src/extras/main/java/org/apache/solr/handler/dataimport/MailEntityProcessor.java @@ -103,11 +103,7 @@ public class MailEntityProcessor extends EntityProcessorBase { mail = getNextMail(); if (mail != null) row = getDocumentFromMail(mail); - } - while (row == null && mail != null); - if (row != null) { - row = super.applyTransformer(row); - } + } while (row == null && mail != null); return row; } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/AbstractDataImportHandlerTest.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/AbstractDataImportHandlerTest.java index 257624d502e..3bf3ce1ac5d 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/AbstractDataImportHandlerTest.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/AbstractDataImportHandlerTest.java @@ -118,8 +118,12 @@ public abstract class AbstractDataImportHandlerTest extends } public String getEntityAttribute(String name) { - return entityAttrs == null ? delegate.getEntityAttribute(name) - : entityAttrs.get(name); + return entityAttrs == null ? delegate.getEntityAttribute(name) : entityAttrs.get(name); + } + + public String getResolvedEntityAttribute(String name) { + return entityAttrs == null ? delegate.getResolvedEntityAttribute(name) : + delegate.getResolvedEntityAttribute(entityAttrs.get(name)); } public List> getAllEntityFields() { diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java index 9840091a514..6ba535d6614 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java @@ -46,7 +46,6 @@ public class CachedSqlEntityProcessor extends SqlEntityProcessor { } public Map nextRow() { - if (rowcache != null) return getFromRowCache(); if (dataSourceRowCache != null) return getFromRowCacheTransformed(); if (!isFirst) diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java index eb11ad1d395..76e07079f95 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java @@ -70,6 +70,13 @@ public abstract class Context { */ public abstract String getEntityAttribute(String name); + /** + * Get the value of any attribute put into this entity after resolving all variables found in the attribute value + * @param name name of the attribute + * @return value of the named attribute after resolving all variables + */ + public abstract String getResolvedEntityAttribute(String name); + /** * Returns all the fields put into an entity. each item (which is a map ) in * the list corresponds to one field. each if the map contains the attribute diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java index 83256a2c34e..c542af1601f 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java @@ -72,6 +72,10 @@ public class ContextImpl extends Context { return entity == null ? null : entity.allAttributes.get(name); } + public String getResolvedEntityAttribute(String name) { + return entity == null ? null : resolver.replaceTokens(entity.allAttributes.get(name)); + } + public List> getAllEntityFields() { return entity == null ? Collections.EMPTY_LIST : entity.allFieldsList; } @@ -88,7 +92,8 @@ public class ContextImpl extends Context { if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug && currProcess == Context.FULL_DUMP) { //debug is not yet implemented properly for deltas - return DebugLogger.wrapDs(entity.dataSrc); + + entity.dataSrc = docBuilder.writer.getDebugLogger().wrapDs(entity.dataSrc); } return entity.dataSrc; } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java index d1af2b8b81c..2acaa0bcfa0 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java @@ -149,9 +149,8 @@ public class DataConfig { for (Entity entity : entities) entity.clearCache(); } - try { - processor.destroy(); + processor.close(); } catch (Exception e) { /*no op*/ } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java index 3afc0ff2306..8d4e53b9b5c 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java @@ -46,6 +46,7 @@ public class DebugLogger { private Stack debugStack; NamedList output; + SolrWriter writer; private static final String LINE = "---------------------------------------------"; @@ -69,7 +70,11 @@ public class DebugLogger { output = debugStack.peek().lst; } - private DebugInfo peekStack() { + public DebugLogger(SolrWriter solrWriter) { + writer = solrWriter; + } + + private DebugInfo peekStack() { return debugStack.isEmpty() ? null : debugStack.peek(); } @@ -165,8 +170,7 @@ public class DebugLogger { } } - static DataSource wrapDs(final DataSource ds) { - final SolrWriter writer = DocBuilder.INSTANCE.get().writer; + DataSource wrapDs(final DataSource ds) { return new DataSource() { public void init(Context context, Properties initProps) { ds.init(context, initProps); @@ -182,11 +186,11 @@ public class DebugLogger { try { return ds.getData(query); } catch (DataImportHandlerException de) { - DocBuilder.INSTANCE.get().writer.log(SolrWriter.ENTITY_EXCEPTION, + writer.log(SolrWriter.ENTITY_EXCEPTION, null, de); throw de; } catch (Exception e) { - DocBuilder.INSTANCE.get().writer.log(SolrWriter.ENTITY_EXCEPTION, + writer.log(SolrWriter.ENTITY_EXCEPTION, null, e); DataImportHandlerException de = new DataImportHandlerException( DataImportHandlerException.SEVERE, "", e); @@ -200,38 +204,28 @@ public class DebugLogger { }; } - static Transformer wrapTransformer(final Transformer t) { - if (DocBuilder.INSTANCE.get() != null - && DocBuilder.INSTANCE.get().verboseDebug) { - return new Transformer() { - public Object transformRow(Map row, Context context) { - DocBuilder.INSTANCE.get().writer.log(SolrWriter.PRE_TRANSFORMER_ROW, - null, row); - String tName = getTransformerName(t); - Object result = null; - try { - result = t.transformRow(row, context); - DocBuilder.INSTANCE.get().writer.log(SolrWriter.TRANSFORMED_ROW, - tName, result); - } catch (DataImportHandlerException de) { - DocBuilder.INSTANCE.get().writer.log( - SolrWriter.TRANSFORMER_EXCEPTION, tName, de); - de.debugged = true; - throw de; - } catch (Exception e) { - DocBuilder.INSTANCE.get().writer.log( - SolrWriter.TRANSFORMER_EXCEPTION, tName, e); - DataImportHandlerException de = new DataImportHandlerException( - DataImportHandlerException.SEVERE, "", e); - de.debugged = true; - throw de; - } - return result; + Transformer wrapTransformer(final Transformer t) { + return new Transformer() { + public Object transformRow(Map row, Context context) { + writer.log(SolrWriter.PRE_TRANSFORMER_ROW, null, row); + String tName = getTransformerName(t); + Object result = null; + try { + result = t.transformRow(row, context); + writer.log(SolrWriter.TRANSFORMED_ROW, tName, result); + } catch (DataImportHandlerException de) { + writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, de); + de.debugged = true; + throw de; + } catch (Exception e) { + writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, e); + DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e); + de.debugged = true; + throw de; } - }; - } else { - return t; - } + return result; + } + }; } public static String getStacktraceString(Exception e) { @@ -242,8 +236,8 @@ public class DebugLogger { static String getTransformerName(Transformer t) { Class transClass = t.getClass(); - if (t instanceof EntityProcessorBase.ReflectionTransformer) { - return ((EntityProcessorBase.ReflectionTransformer) t).trans; + if (t instanceof EntityProcessorWrapper.ReflectionTransformer) { + return ((EntityProcessorWrapper.ReflectionTransformer) t).trans; } if (t instanceof ScriptTransformer) { ScriptTransformer scriptTransformer = (ScriptTransformer) t; diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java index 6d691a49644..330a96769be 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java @@ -100,18 +100,22 @@ public class DocBuilder { private void invokeEventListener(String className) { try { EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance(); - String currentProcess; - if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) { - currentProcess = Context.DELTA_DUMP; - } else { - currentProcess = Context.FULL_DUMP; - } - listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this)); + notifyListener(listener); } catch (Exception e) { DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e, "Unable to load class : " + className); } } + private void notifyListener(EventListener listener) { + String currentProcess; + if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) { + currentProcess = Context.DELTA_DUMP; + } else { + currentProcess = Context.FULL_DUMP; + } + listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this)); + } + @SuppressWarnings("unchecked") public void execute() { dataImporter.store(DataImporter.STATUS_MSGS, statusMessages); @@ -333,8 +337,10 @@ public class DocBuilder { } Map arow = entityProcessor.nextRow(); - if (arow == null) - break; + if (arow == null) { + entityProcessor.destroy(); + break; + } if (arow.containsKey(DOC_BOOST)) { setDocumentBoost(doc, arow); @@ -539,7 +545,7 @@ public class DocBuilder { + entity.name, e); } } - return entity.processor = entityProcessor; + return entity.processor = new EntityProcessorWrapper(entityProcessor, this); } /** diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java index 266f412de76..bde4c3f0437 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java @@ -90,7 +90,16 @@ public abstract class EntityProcessor { public abstract Map nextModifiedParentRowKey(); /** - * Invoked when the Entity processor is detroyed. towards the end of injestion. Called only once + * Invoked for each parent-row after the last row for this entity is processed. If this is the root-most + * entity, it will be called only once in the import, at the very end. + * */ public abstract void destroy(); + + /** + * Invoked when the Entity processor is detroyed. towards the end of import. + */ + public void close() { + //no-op + } } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java index dccca079133..a348b2c5870 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java @@ -20,7 +20,6 @@ import static org.apache.solr.handler.dataimport.DataImportHandlerException.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Method; import java.util.*; /** @@ -47,15 +46,13 @@ public class EntityProcessorBase extends EntityProcessor { protected List transformers; - protected List> rowcache; - protected String query; protected String onError = ABORT; + public void init(Context context) { rowIterator = null; - rowcache = null; this.context = context; if (isFirstInit) { entityName = context.getEntityAttribute("name"); @@ -68,158 +65,6 @@ public class EntityProcessorBase extends EntityProcessor { } - @SuppressWarnings("unchecked") - void loadTransformers() { - String transClasses = context.getEntityAttribute(TRANSFORMER); - - if (transClasses == null) { - transformers = Collections.EMPTY_LIST; - return; - } - - String[] transArr = transClasses.split(","); - transformers = new ArrayList() { - public boolean add(Transformer transformer) { - return super.add(DebugLogger.wrapTransformer(transformer)); - } - }; - for (String aTransArr : transArr) { - String trans = aTransArr.trim(); - if (trans.startsWith("script:")) { - String functionName = trans.substring("script:".length()); - ScriptTransformer scriptTransformer = new ScriptTransformer(); - scriptTransformer.setFunctionName(functionName); - transformers.add(scriptTransformer); - continue; - } - try { - Class clazz = DocBuilder.loadClass(trans, context.getSolrCore()); - if (clazz.newInstance() instanceof Transformer) { - transformers.add((Transformer) clazz.newInstance()); - } else { - final Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class); - if (meth == null) { - String msg = "Transformer :" - + trans - + "does not implement Transformer interface or does not have a transformRow(Map m)method"; - log.error(msg); - throw new DataImportHandlerException( - SEVERE, msg); - } - transformers.add(new ReflectionTransformer(meth, clazz, trans)); - } - } catch (Exception e) { - log.error("Unable to load Transformer: " + aTransArr, e); - throw new DataImportHandlerException(SEVERE, - e); - } - } - - } - - @SuppressWarnings("unchecked") - static class ReflectionTransformer extends Transformer { - final Method meth; - - final Class clazz; - - final String trans; - - final Object o; - - public ReflectionTransformer(Method meth, Class clazz, String trans) - throws Exception { - this.meth = meth; - this.clazz = clazz; - this.trans = trans; - o = clazz.newInstance(); - } - - public Object transformRow(Map aRow, Context context) { - try { - return meth.invoke(o, aRow); - } catch (Exception e) { - log.warn("method invocation failed on transformer : " + trans, e); - throw new DataImportHandlerException(WARN, e); - } - } - } - - protected Map getFromRowCache() { - Map r = rowcache.remove(0); - if (rowcache.isEmpty()) - rowcache = null; - return r; - } - - @SuppressWarnings("unchecked") - protected Map applyTransformer(Map row) { - if (transformers == null) - loadTransformers(); - if (transformers == Collections.EMPTY_LIST) - return row; - Map transformedRow = row; - List> rows = null; - boolean stopTransform = checkStopTransform(row); - for (Transformer t : transformers) { - if(stopTransform) break; - try { - if (rows != null) { - List> tmpRows = new ArrayList>(); - for (Map map : rows) { - resolver.addNamespace(entityName, map); - Object o = t.transformRow(map, context); - if (o == null) - continue; - if (o instanceof Map) { - Map oMap = (Map) o; - stopTransform = checkStopTransform(oMap); - tmpRows.add((Map) o); - } else if (o instanceof List) { - tmpRows.addAll((List) o); - } else { - log.error("Transformer must return Map or a List>"); - } - } - rows = tmpRows; - } else { - resolver.addNamespace(entityName, transformedRow); - Object o = t.transformRow(transformedRow, context); - if (o == null) - return null; - if (o instanceof Map) { - Map oMap = (Map) o; - stopTransform = checkStopTransform(oMap); - transformedRow = (Map) o; - } else if (o instanceof List) { - rows = (List) o; - } else { - log.error("Transformer must return Map or a List>"); - } - } - } catch (Exception e) { - log.warn("transformer threw error", e); - if (ABORT.equals(onError)) { - wrapAndThrow(SEVERE, e); - } else if (SKIP.equals(onError)) { - wrapAndThrow(DataImportHandlerException.SKIP, e); - } - // onError = continue - } - } - if (rows == null) { - return transformedRow; - } else { - rowcache = rows; - return getFromRowCache(); - } - - } - - private boolean checkStopTransform(Map oMap) { - return oMap.get("$stopTransform") != null - && Boolean.parseBoolean(oMap.get("$stopTransform").toString()); - } protected Map getNext() { try { @@ -417,7 +262,7 @@ public class EntityProcessorBase extends EntityProcessor { Map r = dataSourceRowCache.remove(0); if (dataSourceRowCache.isEmpty()) dataSourceRowCache = null; - return r == null ? null : applyTransformer(r); + return r; } public static final String TRANSFORMER = "transformer"; diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java new file mode 100644 index 00000000000..419c0fe7a04 --- /dev/null +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.dataimport; + +import static org.apache.solr.handler.dataimport.DataImportHandlerException.*; +import static org.apache.solr.handler.dataimport.EntityProcessorBase.*; +import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A Wrapper over EntityProcessor instance which performs transforms and handles multi-row outputs correctly. + * + * @version $Id$ + * @since solr 1.4 + */ +public class EntityProcessorWrapper extends EntityProcessor { + private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class); + + private EntityProcessor delegate; + private DocBuilder docBuilder; + + private String onError; + private Context context; + private VariableResolverImpl resolver; + private String entityName; + + protected List transformers; + + protected List> rowcache; + + public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) { + this.delegate = delegate; + this.docBuilder = docBuilder; + } + + public void init(Context context) { + rowcache = null; + this.context = context; + resolver = (VariableResolverImpl) context.getVariableResolver(); + if (entityName == null) { + onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR)); + if (onError == null) onError = ABORT; + entityName = context.getEntityAttribute(DataConfig.NAME); + } + delegate.init(context); + + } + + @SuppressWarnings("unchecked") + void loadTransformers() { + String transClasses = context.getEntityAttribute(TRANSFORMER); + + if (transClasses == null) { + transformers = Collections.EMPTY_LIST; + return; + } + + String[] transArr = transClasses.split(","); + transformers = new ArrayList() { + public boolean add(Transformer transformer) { + if (docBuilder != null && docBuilder.verboseDebug) { + transformer = docBuilder.writer.getDebugLogger().wrapTransformer(transformer); + } + return super.add(transformer); + } + }; + for (String aTransArr : transArr) { + String trans = aTransArr.trim(); + if (trans.startsWith("script:")) { + String functionName = trans.substring("script:".length()); + ScriptTransformer scriptTransformer = new ScriptTransformer(); + scriptTransformer.setFunctionName(functionName); + transformers.add(scriptTransformer); + continue; + } + try { + Class clazz = DocBuilder.loadClass(trans, context.getSolrCore()); + if (clazz.newInstance() instanceof Transformer) { + transformers.add((Transformer) clazz.newInstance()); + } else { + final Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class); + if (meth == null) { + String msg = "Transformer :" + + trans + + "does not implement Transformer interface or does not have a transformRow(Map m)method"; + log.error(msg); + throw new DataImportHandlerException( + SEVERE, msg); + } + transformers.add(new ReflectionTransformer(meth, clazz, trans)); + } + } catch (Exception e) { + log.error("Unable to load Transformer: " + aTransArr, e); + throw new DataImportHandlerException(SEVERE, + e); + } + } + + } + + @SuppressWarnings("unchecked") + static class ReflectionTransformer extends Transformer { + final Method meth; + + final Class clazz; + + final String trans; + + final Object o; + + public ReflectionTransformer(Method meth, Class clazz, String trans) + throws Exception { + this.meth = meth; + this.clazz = clazz; + this.trans = trans; + o = clazz.newInstance(); + } + + public Object transformRow(Map aRow, Context context) { + try { + return meth.invoke(o, aRow); + } catch (Exception e) { + log.warn("method invocation failed on transformer : " + trans, e); + throw new DataImportHandlerException(WARN, e); + } + } + } + + protected Map getFromRowCache() { + Map r = rowcache.remove(0); + if (rowcache.isEmpty()) + rowcache = null; + return r; + } + + @SuppressWarnings("unchecked") + protected Map applyTransformer(Map row) { + if (transformers == null) + loadTransformers(); + if (transformers == Collections.EMPTY_LIST) + return row; + Map transformedRow = row; + List> rows = null; + boolean stopTransform = checkStopTransform(row); + for (Transformer t : transformers) { + if (stopTransform) break; + try { + if (rows != null) { + List> tmpRows = new ArrayList>(); + for (Map map : rows) { + resolver.addNamespace(entityName, map); + Object o = t.transformRow(map, context); + if (o == null) + continue; + if (o instanceof Map) { + Map oMap = (Map) o; + stopTransform = checkStopTransform(oMap); + tmpRows.add((Map) o); + } else if (o instanceof List) { + tmpRows.addAll((List) o); + } else { + log.error("Transformer must return Map or a List>"); + } + } + rows = tmpRows; + } else { + resolver.addNamespace(entityName, transformedRow); + Object o = t.transformRow(transformedRow, context); + if (o == null) + return null; + if (o instanceof Map) { + Map oMap = (Map) o; + stopTransform = checkStopTransform(oMap); + transformedRow = (Map) o; + } else if (o instanceof List) { + rows = (List) o; + } else { + log.error("Transformer must return Map or a List>"); + } + } + } catch (Exception e) { + log.warn("transformer threw error", e); + if (ABORT.equals(onError)) { + wrapAndThrow(SEVERE, e); + } else if (SKIP.equals(onError)) { + wrapAndThrow(DataImportHandlerException.SKIP, e); + } + // onError = continue + } + } + if (rows == null) { + return transformedRow; + } else { + rowcache = rows; + return getFromRowCache(); + } + + } + + private boolean checkStopTransform(Map oMap) { + return oMap.get("$stopTransform") != null + && Boolean.parseBoolean(oMap.get("$stopTransform").toString()); + } + + public Map nextRow() { + if (rowcache != null) { + return getFromRowCache(); + } + while (true) { + Map arow = delegate.nextRow(); + if (arow == null) { + return null; + } else { + arow = applyTransformer(arow); + if (arow != null) return arow; + } + } + } + + public Map nextModifiedRowKey() { + return delegate.nextModifiedRowKey(); + } + + public Map nextDeletedRowKey() { + return delegate.nextDeletedRowKey(); + } + + public Map nextModifiedParentRowKey() { + return delegate.nextModifiedParentRowKey(); + } + + public void destroy() { + delegate.destroy(); + } + + @Override + public void close() { + delegate.close(); + } +} diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java index 6da9519a501..96dd7b34edb 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java @@ -134,16 +134,10 @@ public class FileListEntityProcessor extends EntityProcessorBase { return getAndApplyTrans(); } - private Map getAndApplyTrans() { - if (rowcache != null) - return getFromRowCache(); + private Map getAndApplyTrans() { while (true) { Map r = getNext(); - if (r == null) - return null; - r = applyTransformer(r); - if (r != null) - return r; + return r; } } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/PlainTextEntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/PlainTextEntityProcessor.java index b991c51612f..11a96d9122c 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/PlainTextEntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/PlainTextEntityProcessor.java @@ -77,7 +77,7 @@ public class PlainTextEntityProcessor extends EntityProcessorBase { Map row = new HashMap(); row.put(PLAIN_TEXT, sw.toString()); ended = true; - return super.applyTransformer(row); + return row; } public static final String PLAIN_TEXT = "plainText"; diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java index 390f51831a4..c7ee8cae681 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java @@ -230,6 +230,13 @@ public class SolrWriter { return null; } + public DebugLogger getDebugLogger() { + if (debugLogger == null) { + debugLogger = new DebugLogger(this); + } + return debugLogger; + } + /** * This method is used for verbose debugging * @@ -238,10 +245,7 @@ public class SolrWriter { * @param row The actual data . Can be a Map or a List> */ public void log(int event, String name, Object row) { - if (debugLogger == null) { - debugLogger = new DebugLogger(); - } - debugLogger.log(event, name, row); + getDebugLogger().log(event, name, row); } public static final int START_ENTITY = 1, END_ENTITY = 2, diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java index 5f9a1d71c2e..9609720c01d 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java @@ -65,20 +65,14 @@ public class SqlEntityProcessor extends EntityProcessorBase { } } - public Map nextRow() { - if (rowcache != null) - return getFromRowCache(); + public Map nextRow() { if (rowIterator == null) { String q = getQuery(); initQuery(resolver.replaceTokens(q)); } while (true) { Map r = getNext(); - if (r == null) - return null; - r = applyTransformer(r); - if (r != null) - return r; + return r; } } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java index 1b720b8b330..7f509addd78 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java @@ -176,8 +176,6 @@ public class XPathEntityProcessor extends EntityProcessorBase { private Map fetchNextRow() { Map r = null; while (true) { - if (rowcache != null) - return getFromRowCache(); if (rowIterator == null) initQuery(resolver.replaceTokens(context.getEntityAttribute(URL))); r = getNext(); @@ -197,9 +195,7 @@ public class XPathEntityProcessor extends EntityProcessorBase { } } addCommonFields(r); - r = applyTransformer(r); - if (r != null) - return readUsefulVars(r); + return r; } } diff --git a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java index 3ed67922b2e..633a17645bd 100644 --- a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java +++ b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java @@ -51,7 +51,7 @@ public class TestCachedSqlEntityProcessor { rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc", "another one")); MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator()); - CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor(); + EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null); csep.init(context); rows = new ArrayList>(); while (true) { @@ -93,7 +93,7 @@ public class TestCachedSqlEntityProcessor { rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc", "another one")); MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator()); - CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor(); + EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null); csep.init(context); rows = new ArrayList>(); while (true) { @@ -136,7 +136,7 @@ public class TestCachedSqlEntityProcessor { rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc", "another one")); MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator()); - CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor(); + EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null); csep.init(context); rows = new ArrayList>(); while (true) { @@ -226,7 +226,7 @@ public class TestCachedSqlEntityProcessor { rows.add(AbstractDataImportHandlerTest.createMap("id", 3, "desc", "another three")); rows.add(AbstractDataImportHandlerTest.createMap("id", 3, "desc", "another another three")); MockDataSource.setIterator(q, rows.iterator()); - CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor(); + EntityProcessor csep = new EntityProcessorWrapper(new CachedSqlEntityProcessor(), null); csep.init(context); rows = new ArrayList>(); while (true) { diff --git a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEntityProcessorBase.java b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEntityProcessorBase.java index c14a8d23c90..811979d2eca 100644 --- a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEntityProcessorBase.java +++ b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEntityProcessorBase.java @@ -48,7 +48,7 @@ public class TestEntityProcessorBase { Map src = new HashMap(); src.put("A", "NA"); src.put("B", "NA"); - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null); sep.init(context); Map res = sep.applyTransformer(src); Assert.assertNotNull(res.get("T1")); diff --git a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestScriptTransformer.java b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestScriptTransformer.java index 60b0d3d18ba..b8dea989534 100644 --- a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestScriptTransformer.java +++ b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestScriptTransformer.java @@ -52,7 +52,7 @@ public class TestScriptTransformer { Context context = getContext("f1", script); Map map = new HashMap(); map.put("name", "Scott"); - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null); sep.init(context); sep.applyTransformer(map); Assert.assertEquals(map.get("name"), "Hello Scott"); @@ -82,7 +82,7 @@ public class TestScriptTransformer { Context context = getContext("f1", script); Map map = new HashMap(); map.put("name", "Scott"); - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null); sep.init(context); sep.applyTransformer(map); Assert.assertEquals(map.get("name"), "Hello Scott"); @@ -115,7 +115,7 @@ public class TestScriptTransformer { Map map = new HashMap(); map.put("nextToken", "hello"); - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null); sep.init(c); sep.applyTransformer(map); Assert.assertEquals("true", map.get("$hasMore")); diff --git a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java index 02464db91e2..15c790bdb36 100644 --- a/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java +++ b/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java @@ -55,7 +55,7 @@ public class TestSqlEntityProcessor { @Test public void tranformer() { - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessor sep = new EntityProcessorWrapper( new SqlEntityProcessor(), null); List> rows = getRows(2); VariableResolverImpl vr = new VariableResolverImpl(); HashMap ea = new HashMap(); @@ -79,7 +79,7 @@ public class TestSqlEntityProcessor { @Test public void tranformerWithReflection() { - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null); List> rows = getRows(2); VariableResolverImpl vr = new VariableResolverImpl(); HashMap ea = new HashMap(); @@ -103,7 +103,7 @@ public class TestSqlEntityProcessor { @Test public void tranformerList() { - SqlEntityProcessor sep = new SqlEntityProcessor(); + EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(),null); List> rows = getRows(2); VariableResolverImpl vr = new VariableResolverImpl();