mirror of https://github.com/apache/lucene.git
SOLR-1120 -- Simplify EntityProcessor API
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@766608 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8911cc1907
commit
a336753fdc
|
@ -24,6 +24,11 @@ The Context API has been changed in a non back-compatible way. In particular, th
|
|||
now returns a String describing the type of the current import process instead of an int. Similarily, the public
|
||||
constants in Context viz. FULL_DUMP, DELTA_DUMP and FIND_DELTA are changed to a String type. See SOLR-969 for details.
|
||||
|
||||
The EntityProcessor API has been simplified by moving logic for applying transformers and handling multi-row outputs
|
||||
from Transformers into an EntityProcessorWrapper class. The EntityProcessor#destroy is now called once per
|
||||
parent-row at the end of row (end of data). A new method EntityProcessor#close is added which is called at the end
|
||||
of import.
|
||||
|
||||
Detailed Change List
|
||||
----------------------
|
||||
|
||||
|
@ -253,6 +258,13 @@ Other
|
|||
Change Context.currentProcess() to return a string instead of an integer.
|
||||
(Kay Kay, Noble Paul, shalin)
|
||||
|
||||
9. SOLR-1120: Simplified EntityProcessor API by moving logic for applying transformers and handling multi-row outputs
|
||||
from Transformers into an EntityProcessorWrapper class. The behavior of the method
|
||||
EntityProcessor#destroy has been modified to be called once per parent-row at the end of row. A new
|
||||
method EntityProcessor#close is added which is called at the end of import. A new method
|
||||
Context#getResolvedEntityAttribute is added which returns the resolved value of an entity's attribute.
|
||||
(Noble Paul, shalin)
|
||||
|
||||
================== Release 1.3.0 20080915 ==================
|
||||
|
||||
Status
|
||||
|
|
|
@ -103,11 +103,7 @@ public class MailEntityProcessor extends EntityProcessorBase {
|
|||
mail = getNextMail();
|
||||
if (mail != null)
|
||||
row = getDocumentFromMail(mail);
|
||||
}
|
||||
while (row == null && mail != null);
|
||||
if (row != null) {
|
||||
row = super.applyTransformer(row);
|
||||
}
|
||||
} while (row == null && mail != null);
|
||||
return row;
|
||||
}
|
||||
|
||||
|
|
|
@ -118,8 +118,12 @@ public abstract class AbstractDataImportHandlerTest extends
|
|||
}
|
||||
|
||||
public String getEntityAttribute(String name) {
|
||||
return entityAttrs == null ? delegate.getEntityAttribute(name)
|
||||
: entityAttrs.get(name);
|
||||
return entityAttrs == null ? delegate.getEntityAttribute(name) : entityAttrs.get(name);
|
||||
}
|
||||
|
||||
public String getResolvedEntityAttribute(String name) {
|
||||
return entityAttrs == null ? delegate.getResolvedEntityAttribute(name) :
|
||||
delegate.getResolvedEntityAttribute(entityAttrs.get(name));
|
||||
}
|
||||
|
||||
public List<Map<String, String>> getAllEntityFields() {
|
||||
|
|
|
@ -46,7 +46,6 @@ public class CachedSqlEntityProcessor extends SqlEntityProcessor {
|
|||
}
|
||||
|
||||
public Map<String, Object> nextRow() {
|
||||
if (rowcache != null) return getFromRowCache();
|
||||
if (dataSourceRowCache != null)
|
||||
return getFromRowCacheTransformed();
|
||||
if (!isFirst)
|
||||
|
|
|
@ -70,6 +70,13 @@ public abstract class Context {
|
|||
*/
|
||||
public abstract String getEntityAttribute(String name);
|
||||
|
||||
/**
|
||||
* Get the value of any attribute put into this entity after resolving all variables found in the attribute value
|
||||
* @param name name of the attribute
|
||||
* @return value of the named attribute after resolving all variables
|
||||
*/
|
||||
public abstract String getResolvedEntityAttribute(String name);
|
||||
|
||||
/**
|
||||
* Returns all the fields put into an entity. each item (which is a map ) in
|
||||
* the list corresponds to one field. each if the map contains the attribute
|
||||
|
|
|
@ -72,6 +72,10 @@ public class ContextImpl extends Context {
|
|||
return entity == null ? null : entity.allAttributes.get(name);
|
||||
}
|
||||
|
||||
public String getResolvedEntityAttribute(String name) {
|
||||
return entity == null ? null : resolver.replaceTokens(entity.allAttributes.get(name));
|
||||
}
|
||||
|
||||
public List<Map<String, String>> getAllEntityFields() {
|
||||
return entity == null ? Collections.EMPTY_LIST : entity.allFieldsList;
|
||||
}
|
||||
|
@ -88,7 +92,8 @@ public class ContextImpl extends Context {
|
|||
if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug &&
|
||||
currProcess == Context.FULL_DUMP) {
|
||||
//debug is not yet implemented properly for deltas
|
||||
return DebugLogger.wrapDs(entity.dataSrc);
|
||||
|
||||
entity.dataSrc = docBuilder.writer.getDebugLogger().wrapDs(entity.dataSrc);
|
||||
}
|
||||
return entity.dataSrc;
|
||||
}
|
||||
|
|
|
@ -149,9 +149,8 @@ public class DataConfig {
|
|||
for (Entity entity : entities)
|
||||
entity.clearCache();
|
||||
}
|
||||
|
||||
try {
|
||||
processor.destroy();
|
||||
processor.close();
|
||||
} catch (Exception e) {
|
||||
/*no op*/
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ public class DebugLogger {
|
|||
private Stack<DebugInfo> debugStack;
|
||||
|
||||
NamedList output;
|
||||
SolrWriter writer;
|
||||
|
||||
private static final String LINE = "---------------------------------------------";
|
||||
|
||||
|
@ -69,7 +70,11 @@ public class DebugLogger {
|
|||
output = debugStack.peek().lst;
|
||||
}
|
||||
|
||||
private DebugInfo peekStack() {
|
||||
public DebugLogger(SolrWriter solrWriter) {
|
||||
writer = solrWriter;
|
||||
}
|
||||
|
||||
private DebugInfo peekStack() {
|
||||
return debugStack.isEmpty() ? null : debugStack.peek();
|
||||
}
|
||||
|
||||
|
@ -165,8 +170,7 @@ public class DebugLogger {
|
|||
}
|
||||
}
|
||||
|
||||
static DataSource wrapDs(final DataSource ds) {
|
||||
final SolrWriter writer = DocBuilder.INSTANCE.get().writer;
|
||||
DataSource wrapDs(final DataSource ds) {
|
||||
return new DataSource() {
|
||||
public void init(Context context, Properties initProps) {
|
||||
ds.init(context, initProps);
|
||||
|
@ -182,11 +186,11 @@ public class DebugLogger {
|
|||
try {
|
||||
return ds.getData(query);
|
||||
} catch (DataImportHandlerException de) {
|
||||
DocBuilder.INSTANCE.get().writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
null, de);
|
||||
throw de;
|
||||
} catch (Exception e) {
|
||||
DocBuilder.INSTANCE.get().writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
null, e);
|
||||
DataImportHandlerException de = new DataImportHandlerException(
|
||||
DataImportHandlerException.SEVERE, "", e);
|
||||
|
@ -200,38 +204,28 @@ public class DebugLogger {
|
|||
};
|
||||
}
|
||||
|
||||
static Transformer wrapTransformer(final Transformer t) {
|
||||
if (DocBuilder.INSTANCE.get() != null
|
||||
&& DocBuilder.INSTANCE.get().verboseDebug) {
|
||||
return new Transformer() {
|
||||
public Object transformRow(Map<String, Object> row, Context context) {
|
||||
DocBuilder.INSTANCE.get().writer.log(SolrWriter.PRE_TRANSFORMER_ROW,
|
||||
null, row);
|
||||
String tName = getTransformerName(t);
|
||||
Object result = null;
|
||||
try {
|
||||
result = t.transformRow(row, context);
|
||||
DocBuilder.INSTANCE.get().writer.log(SolrWriter.TRANSFORMED_ROW,
|
||||
tName, result);
|
||||
} catch (DataImportHandlerException de) {
|
||||
DocBuilder.INSTANCE.get().writer.log(
|
||||
SolrWriter.TRANSFORMER_EXCEPTION, tName, de);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
} catch (Exception e) {
|
||||
DocBuilder.INSTANCE.get().writer.log(
|
||||
SolrWriter.TRANSFORMER_EXCEPTION, tName, e);
|
||||
DataImportHandlerException de = new DataImportHandlerException(
|
||||
DataImportHandlerException.SEVERE, "", e);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
}
|
||||
return result;
|
||||
Transformer wrapTransformer(final Transformer t) {
|
||||
return new Transformer() {
|
||||
public Object transformRow(Map<String, Object> row, Context context) {
|
||||
writer.log(SolrWriter.PRE_TRANSFORMER_ROW, null, row);
|
||||
String tName = getTransformerName(t);
|
||||
Object result = null;
|
||||
try {
|
||||
result = t.transformRow(row, context);
|
||||
writer.log(SolrWriter.TRANSFORMED_ROW, tName, result);
|
||||
} catch (DataImportHandlerException de) {
|
||||
writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, de);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
} catch (Exception e) {
|
||||
writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, e);
|
||||
DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return t;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static String getStacktraceString(Exception e) {
|
||||
|
@ -242,8 +236,8 @@ public class DebugLogger {
|
|||
|
||||
static String getTransformerName(Transformer t) {
|
||||
Class transClass = t.getClass();
|
||||
if (t instanceof EntityProcessorBase.ReflectionTransformer) {
|
||||
return ((EntityProcessorBase.ReflectionTransformer) t).trans;
|
||||
if (t instanceof EntityProcessorWrapper.ReflectionTransformer) {
|
||||
return ((EntityProcessorWrapper.ReflectionTransformer) t).trans;
|
||||
}
|
||||
if (t instanceof ScriptTransformer) {
|
||||
ScriptTransformer scriptTransformer = (ScriptTransformer) t;
|
||||
|
|
|
@ -100,18 +100,22 @@ public class DocBuilder {
|
|||
private void invokeEventListener(String className) {
|
||||
try {
|
||||
EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
|
||||
String currentProcess;
|
||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
||||
currentProcess = Context.DELTA_DUMP;
|
||||
} else {
|
||||
currentProcess = Context.FULL_DUMP;
|
||||
}
|
||||
listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this));
|
||||
notifyListener(listener);
|
||||
} catch (Exception e) {
|
||||
DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e, "Unable to load class : " + className);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyListener(EventListener listener) {
|
||||
String currentProcess;
|
||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
||||
currentProcess = Context.DELTA_DUMP;
|
||||
} else {
|
||||
currentProcess = Context.FULL_DUMP;
|
||||
}
|
||||
listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void execute() {
|
||||
dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
|
||||
|
@ -333,8 +337,10 @@ public class DocBuilder {
|
|||
}
|
||||
|
||||
Map<String, Object> arow = entityProcessor.nextRow();
|
||||
if (arow == null)
|
||||
if (arow == null) {
|
||||
entityProcessor.destroy();
|
||||
break;
|
||||
}
|
||||
|
||||
if (arow.containsKey(DOC_BOOST)) {
|
||||
setDocumentBoost(doc, arow);
|
||||
|
@ -539,7 +545,7 @@ public class DocBuilder {
|
|||
+ entity.name, e);
|
||||
}
|
||||
}
|
||||
return entity.processor = entityProcessor;
|
||||
return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -90,7 +90,16 @@ public abstract class EntityProcessor {
|
|||
public abstract Map<String, Object> nextModifiedParentRowKey();
|
||||
|
||||
/**
|
||||
* Invoked when the Entity processor is detroyed. towards the end of injestion. Called only once
|
||||
* Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
|
||||
* entity, it will be called only once in the import, at the very end.
|
||||
*
|
||||
*/
|
||||
public abstract void destroy();
|
||||
|
||||
/**
|
||||
* Invoked when the Entity processor is detroyed. towards the end of import.
|
||||
*/
|
||||
public void close() {
|
||||
//no-op
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
@ -47,15 +46,13 @@ public class EntityProcessorBase extends EntityProcessor {
|
|||
|
||||
protected List<Transformer> transformers;
|
||||
|
||||
protected List<Map<String, Object>> rowcache;
|
||||
|
||||
protected String query;
|
||||
|
||||
protected String onError = ABORT;
|
||||
|
||||
|
||||
public void init(Context context) {
|
||||
rowIterator = null;
|
||||
rowcache = null;
|
||||
this.context = context;
|
||||
if (isFirstInit) {
|
||||
entityName = context.getEntityAttribute("name");
|
||||
|
@ -68,158 +65,6 @@ public class EntityProcessorBase extends EntityProcessor {
|
|||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void loadTransformers() {
|
||||
String transClasses = context.getEntityAttribute(TRANSFORMER);
|
||||
|
||||
if (transClasses == null) {
|
||||
transformers = Collections.EMPTY_LIST;
|
||||
return;
|
||||
}
|
||||
|
||||
String[] transArr = transClasses.split(",");
|
||||
transformers = new ArrayList<Transformer>() {
|
||||
public boolean add(Transformer transformer) {
|
||||
return super.add(DebugLogger.wrapTransformer(transformer));
|
||||
}
|
||||
};
|
||||
for (String aTransArr : transArr) {
|
||||
String trans = aTransArr.trim();
|
||||
if (trans.startsWith("script:")) {
|
||||
String functionName = trans.substring("script:".length());
|
||||
ScriptTransformer scriptTransformer = new ScriptTransformer();
|
||||
scriptTransformer.setFunctionName(functionName);
|
||||
transformers.add(scriptTransformer);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Class clazz = DocBuilder.loadClass(trans, context.getSolrCore());
|
||||
if (clazz.newInstance() instanceof Transformer) {
|
||||
transformers.add((Transformer) clazz.newInstance());
|
||||
} else {
|
||||
final Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class);
|
||||
if (meth == null) {
|
||||
String msg = "Transformer :"
|
||||
+ trans
|
||||
+ "does not implement Transformer interface or does not have a transformRow(Map m)method";
|
||||
log.error(msg);
|
||||
throw new DataImportHandlerException(
|
||||
SEVERE, msg);
|
||||
}
|
||||
transformers.add(new ReflectionTransformer(meth, clazz, trans));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to load Transformer: " + aTransArr, e);
|
||||
throw new DataImportHandlerException(SEVERE,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static class ReflectionTransformer extends Transformer {
|
||||
final Method meth;
|
||||
|
||||
final Class clazz;
|
||||
|
||||
final String trans;
|
||||
|
||||
final Object o;
|
||||
|
||||
public ReflectionTransformer(Method meth, Class clazz, String trans)
|
||||
throws Exception {
|
||||
this.meth = meth;
|
||||
this.clazz = clazz;
|
||||
this.trans = trans;
|
||||
o = clazz.newInstance();
|
||||
}
|
||||
|
||||
public Object transformRow(Map<String, Object> aRow, Context context) {
|
||||
try {
|
||||
return meth.invoke(o, aRow);
|
||||
} catch (Exception e) {
|
||||
log.warn("method invocation failed on transformer : " + trans, e);
|
||||
throw new DataImportHandlerException(WARN, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, Object> getFromRowCache() {
|
||||
Map<String, Object> r = rowcache.remove(0);
|
||||
if (rowcache.isEmpty())
|
||||
rowcache = null;
|
||||
return r;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Object> applyTransformer(Map<String, Object> row) {
|
||||
if (transformers == null)
|
||||
loadTransformers();
|
||||
if (transformers == Collections.EMPTY_LIST)
|
||||
return row;
|
||||
Map<String, Object> transformedRow = row;
|
||||
List<Map<String, Object>> rows = null;
|
||||
boolean stopTransform = checkStopTransform(row);
|
||||
for (Transformer t : transformers) {
|
||||
if(stopTransform) break;
|
||||
try {
|
||||
if (rows != null) {
|
||||
List<Map<String, Object>> tmpRows = new ArrayList<Map<String, Object>>();
|
||||
for (Map<String, Object> map : rows) {
|
||||
resolver.addNamespace(entityName, map);
|
||||
Object o = t.transformRow(map, context);
|
||||
if (o == null)
|
||||
continue;
|
||||
if (o instanceof Map) {
|
||||
Map oMap = (Map) o;
|
||||
stopTransform = checkStopTransform(oMap);
|
||||
tmpRows.add((Map) o);
|
||||
} else if (o instanceof List) {
|
||||
tmpRows.addAll((List) o);
|
||||
} else {
|
||||
log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
|
||||
}
|
||||
}
|
||||
rows = tmpRows;
|
||||
} else {
|
||||
resolver.addNamespace(entityName, transformedRow);
|
||||
Object o = t.transformRow(transformedRow, context);
|
||||
if (o == null)
|
||||
return null;
|
||||
if (o instanceof Map) {
|
||||
Map oMap = (Map) o;
|
||||
stopTransform = checkStopTransform(oMap);
|
||||
transformedRow = (Map) o;
|
||||
} else if (o instanceof List) {
|
||||
rows = (List) o;
|
||||
} else {
|
||||
log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("transformer threw error", e);
|
||||
if (ABORT.equals(onError)) {
|
||||
wrapAndThrow(SEVERE, e);
|
||||
} else if (SKIP.equals(onError)) {
|
||||
wrapAndThrow(DataImportHandlerException.SKIP, e);
|
||||
}
|
||||
// onError = continue
|
||||
}
|
||||
}
|
||||
if (rows == null) {
|
||||
return transformedRow;
|
||||
} else {
|
||||
rowcache = rows;
|
||||
return getFromRowCache();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean checkStopTransform(Map oMap) {
|
||||
return oMap.get("$stopTransform") != null
|
||||
&& Boolean.parseBoolean(oMap.get("$stopTransform").toString());
|
||||
}
|
||||
|
||||
protected Map<String, Object> getNext() {
|
||||
try {
|
||||
|
@ -417,7 +262,7 @@ public class EntityProcessorBase extends EntityProcessor {
|
|||
Map<String, Object> r = dataSourceRowCache.remove(0);
|
||||
if (dataSourceRowCache.isEmpty())
|
||||
dataSourceRowCache = null;
|
||||
return r == null ? null : applyTransformer(r);
|
||||
return r;
|
||||
}
|
||||
|
||||
public static final String TRANSFORMER = "transformer";
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.handler.dataimport;
|
||||
|
||||
import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
|
||||
import static org.apache.solr.handler.dataimport.EntityProcessorBase.*;
|
||||
import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A Wrapper over EntityProcessor instance which performs transforms and handles multi-row outputs correctly.
|
||||
*
|
||||
* @version $Id$
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class EntityProcessorWrapper extends EntityProcessor {
|
||||
private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class);
|
||||
|
||||
private EntityProcessor delegate;
|
||||
private DocBuilder docBuilder;
|
||||
|
||||
private String onError;
|
||||
private Context context;
|
||||
private VariableResolverImpl resolver;
|
||||
private String entityName;
|
||||
|
||||
protected List<Transformer> transformers;
|
||||
|
||||
protected List<Map<String, Object>> rowcache;
|
||||
|
||||
public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
|
||||
this.delegate = delegate;
|
||||
this.docBuilder = docBuilder;
|
||||
}
|
||||
|
||||
public void init(Context context) {
|
||||
rowcache = null;
|
||||
this.context = context;
|
||||
resolver = (VariableResolverImpl) context.getVariableResolver();
|
||||
if (entityName == null) {
|
||||
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
|
||||
if (onError == null) onError = ABORT;
|
||||
entityName = context.getEntityAttribute(DataConfig.NAME);
|
||||
}
|
||||
delegate.init(context);
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void loadTransformers() {
|
||||
String transClasses = context.getEntityAttribute(TRANSFORMER);
|
||||
|
||||
if (transClasses == null) {
|
||||
transformers = Collections.EMPTY_LIST;
|
||||
return;
|
||||
}
|
||||
|
||||
String[] transArr = transClasses.split(",");
|
||||
transformers = new ArrayList<Transformer>() {
|
||||
public boolean add(Transformer transformer) {
|
||||
if (docBuilder != null && docBuilder.verboseDebug) {
|
||||
transformer = docBuilder.writer.getDebugLogger().wrapTransformer(transformer);
|
||||
}
|
||||
return super.add(transformer);
|
||||
}
|
||||
};
|
||||
for (String aTransArr : transArr) {
|
||||
String trans = aTransArr.trim();
|
||||
if (trans.startsWith("script:")) {
|
||||
String functionName = trans.substring("script:".length());
|
||||
ScriptTransformer scriptTransformer = new ScriptTransformer();
|
||||
scriptTransformer.setFunctionName(functionName);
|
||||
transformers.add(scriptTransformer);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Class clazz = DocBuilder.loadClass(trans, context.getSolrCore());
|
||||
if (clazz.newInstance() instanceof Transformer) {
|
||||
transformers.add((Transformer) clazz.newInstance());
|
||||
} else {
|
||||
final Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class);
|
||||
if (meth == null) {
|
||||
String msg = "Transformer :"
|
||||
+ trans
|
||||
+ "does not implement Transformer interface or does not have a transformRow(Map m)method";
|
||||
log.error(msg);
|
||||
throw new DataImportHandlerException(
|
||||
SEVERE, msg);
|
||||
}
|
||||
transformers.add(new ReflectionTransformer(meth, clazz, trans));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to load Transformer: " + aTransArr, e);
|
||||
throw new DataImportHandlerException(SEVERE,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static class ReflectionTransformer extends Transformer {
|
||||
final Method meth;
|
||||
|
||||
final Class clazz;
|
||||
|
||||
final String trans;
|
||||
|
||||
final Object o;
|
||||
|
||||
public ReflectionTransformer(Method meth, Class clazz, String trans)
|
||||
throws Exception {
|
||||
this.meth = meth;
|
||||
this.clazz = clazz;
|
||||
this.trans = trans;
|
||||
o = clazz.newInstance();
|
||||
}
|
||||
|
||||
public Object transformRow(Map<String, Object> aRow, Context context) {
|
||||
try {
|
||||
return meth.invoke(o, aRow);
|
||||
} catch (Exception e) {
|
||||
log.warn("method invocation failed on transformer : " + trans, e);
|
||||
throw new DataImportHandlerException(WARN, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, Object> getFromRowCache() {
|
||||
Map<String, Object> r = rowcache.remove(0);
|
||||
if (rowcache.isEmpty())
|
||||
rowcache = null;
|
||||
return r;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Object> applyTransformer(Map<String, Object> row) {
|
||||
if (transformers == null)
|
||||
loadTransformers();
|
||||
if (transformers == Collections.EMPTY_LIST)
|
||||
return row;
|
||||
Map<String, Object> transformedRow = row;
|
||||
List<Map<String, Object>> rows = null;
|
||||
boolean stopTransform = checkStopTransform(row);
|
||||
for (Transformer t : transformers) {
|
||||
if (stopTransform) break;
|
||||
try {
|
||||
if (rows != null) {
|
||||
List<Map<String, Object>> tmpRows = new ArrayList<Map<String, Object>>();
|
||||
for (Map<String, Object> map : rows) {
|
||||
resolver.addNamespace(entityName, map);
|
||||
Object o = t.transformRow(map, context);
|
||||
if (o == null)
|
||||
continue;
|
||||
if (o instanceof Map) {
|
||||
Map oMap = (Map) o;
|
||||
stopTransform = checkStopTransform(oMap);
|
||||
tmpRows.add((Map) o);
|
||||
} else if (o instanceof List) {
|
||||
tmpRows.addAll((List) o);
|
||||
} else {
|
||||
log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
|
||||
}
|
||||
}
|
||||
rows = tmpRows;
|
||||
} else {
|
||||
resolver.addNamespace(entityName, transformedRow);
|
||||
Object o = t.transformRow(transformedRow, context);
|
||||
if (o == null)
|
||||
return null;
|
||||
if (o instanceof Map) {
|
||||
Map oMap = (Map) o;
|
||||
stopTransform = checkStopTransform(oMap);
|
||||
transformedRow = (Map) o;
|
||||
} else if (o instanceof List) {
|
||||
rows = (List) o;
|
||||
} else {
|
||||
log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("transformer threw error", e);
|
||||
if (ABORT.equals(onError)) {
|
||||
wrapAndThrow(SEVERE, e);
|
||||
} else if (SKIP.equals(onError)) {
|
||||
wrapAndThrow(DataImportHandlerException.SKIP, e);
|
||||
}
|
||||
// onError = continue
|
||||
}
|
||||
}
|
||||
if (rows == null) {
|
||||
return transformedRow;
|
||||
} else {
|
||||
rowcache = rows;
|
||||
return getFromRowCache();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean checkStopTransform(Map oMap) {
|
||||
return oMap.get("$stopTransform") != null
|
||||
&& Boolean.parseBoolean(oMap.get("$stopTransform").toString());
|
||||
}
|
||||
|
||||
public Map<String, Object> nextRow() {
|
||||
if (rowcache != null) {
|
||||
return getFromRowCache();
|
||||
}
|
||||
while (true) {
|
||||
Map<String, Object> arow = delegate.nextRow();
|
||||
if (arow == null) {
|
||||
return null;
|
||||
} else {
|
||||
arow = applyTransformer(arow);
|
||||
if (arow != null) return arow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Object> nextModifiedRowKey() {
|
||||
return delegate.nextModifiedRowKey();
|
||||
}
|
||||
|
||||
public Map<String, Object> nextDeletedRowKey() {
|
||||
return delegate.nextDeletedRowKey();
|
||||
}
|
||||
|
||||
public Map<String, Object> nextModifiedParentRowKey() {
|
||||
return delegate.nextModifiedParentRowKey();
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
delegate.destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
}
|
|
@ -135,15 +135,9 @@ public class FileListEntityProcessor extends EntityProcessorBase {
|
|||
}
|
||||
|
||||
private Map<String, Object> getAndApplyTrans() {
|
||||
if (rowcache != null)
|
||||
return getFromRowCache();
|
||||
while (true) {
|
||||
Map<String, Object> r = getNext();
|
||||
if (r == null)
|
||||
return null;
|
||||
r = applyTransformer(r);
|
||||
if (r != null)
|
||||
return r;
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ public class PlainTextEntityProcessor extends EntityProcessorBase {
|
|||
Map<String, Object> row = new HashMap<String, Object>();
|
||||
row.put(PLAIN_TEXT, sw.toString());
|
||||
ended = true;
|
||||
return super.applyTransformer(row);
|
||||
return row;
|
||||
}
|
||||
|
||||
public static final String PLAIN_TEXT = "plainText";
|
||||
|
|
|
@ -230,6 +230,13 @@ public class SolrWriter {
|
|||
return null;
|
||||
}
|
||||
|
||||
public DebugLogger getDebugLogger() {
|
||||
if (debugLogger == null) {
|
||||
debugLogger = new DebugLogger(this);
|
||||
}
|
||||
return debugLogger;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used for verbose debugging
|
||||
*
|
||||
|
@ -238,10 +245,7 @@ public class SolrWriter {
|
|||
* @param row The actual data . Can be a Map<String,object> or a List<Map<String,object>>
|
||||
*/
|
||||
public void log(int event, String name, Object row) {
|
||||
if (debugLogger == null) {
|
||||
debugLogger = new DebugLogger();
|
||||
}
|
||||
debugLogger.log(event, name, row);
|
||||
getDebugLogger().log(event, name, row);
|
||||
}
|
||||
|
||||
public static final int START_ENTITY = 1, END_ENTITY = 2,
|
||||
|
|
|
@ -66,19 +66,13 @@ public class SqlEntityProcessor extends EntityProcessorBase {
|
|||
}
|
||||
|
||||
public Map<String, Object> nextRow() {
|
||||
if (rowcache != null)
|
||||
return getFromRowCache();
|
||||
if (rowIterator == null) {
|
||||
String q = getQuery();
|
||||
initQuery(resolver.replaceTokens(q));
|
||||
}
|
||||
while (true) {
|
||||
Map<String, Object> r = getNext();
|
||||
if (r == null)
|
||||
return null;
|
||||
r = applyTransformer(r);
|
||||
if (r != null)
|
||||
return r;
|
||||
return r;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -176,8 +176,6 @@ public class XPathEntityProcessor extends EntityProcessorBase {
|
|||
private Map<String, Object> fetchNextRow() {
|
||||
Map<String, Object> r = null;
|
||||
while (true) {
|
||||
if (rowcache != null)
|
||||
return getFromRowCache();
|
||||
if (rowIterator == null)
|
||||
initQuery(resolver.replaceTokens(context.getEntityAttribute(URL)));
|
||||
r = getNext();
|
||||
|
@ -197,9 +195,7 @@ public class XPathEntityProcessor extends EntityProcessorBase {
|
|||
}
|
||||
}
|
||||
addCommonFields(r);
|
||||
r = applyTransformer(r);
|
||||
if (r != null)
|
||||
return readUsefulVars(r);
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TestCachedSqlEntityProcessor {
|
|||
rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc",
|
||||
"another one"));
|
||||
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
|
||||
CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor();
|
||||
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
|
||||
csep.init(context);
|
||||
rows = new ArrayList<Map<String, Object>>();
|
||||
while (true) {
|
||||
|
@ -93,7 +93,7 @@ public class TestCachedSqlEntityProcessor {
|
|||
rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc",
|
||||
"another one"));
|
||||
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
|
||||
CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor();
|
||||
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
|
||||
csep.init(context);
|
||||
rows = new ArrayList<Map<String, Object>>();
|
||||
while (true) {
|
||||
|
@ -136,7 +136,7 @@ public class TestCachedSqlEntityProcessor {
|
|||
rows.add(AbstractDataImportHandlerTest.createMap("id", 1, "desc",
|
||||
"another one"));
|
||||
MockDataSource.setIterator(vr.replaceTokens(q), rows.iterator());
|
||||
CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor();
|
||||
EntityProcessor csep = new EntityProcessorWrapper( new CachedSqlEntityProcessor(), null);
|
||||
csep.init(context);
|
||||
rows = new ArrayList<Map<String, Object>>();
|
||||
while (true) {
|
||||
|
@ -226,7 +226,7 @@ public class TestCachedSqlEntityProcessor {
|
|||
rows.add(AbstractDataImportHandlerTest.createMap("id", 3, "desc", "another three"));
|
||||
rows.add(AbstractDataImportHandlerTest.createMap("id", 3, "desc", "another another three"));
|
||||
MockDataSource.setIterator(q, rows.iterator());
|
||||
CachedSqlEntityProcessor csep = new CachedSqlEntityProcessor();
|
||||
EntityProcessor csep = new EntityProcessorWrapper(new CachedSqlEntityProcessor(), null);
|
||||
csep.init(context);
|
||||
rows = new ArrayList<Map<String, Object>>();
|
||||
while (true) {
|
||||
|
|
|
@ -48,7 +48,7 @@ public class TestEntityProcessorBase {
|
|||
Map<String, Object> src = new HashMap<String, Object>();
|
||||
src.put("A", "NA");
|
||||
src.put("B", "NA");
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
|
||||
sep.init(context);
|
||||
Map<String, Object> res = sep.applyTransformer(src);
|
||||
Assert.assertNotNull(res.get("T1"));
|
||||
|
|
|
@ -52,7 +52,7 @@ public class TestScriptTransformer {
|
|||
Context context = getContext("f1", script);
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
map.put("name", "Scott");
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
|
||||
sep.init(context);
|
||||
sep.applyTransformer(map);
|
||||
Assert.assertEquals(map.get("name"), "Hello Scott");
|
||||
|
@ -82,7 +82,7 @@ public class TestScriptTransformer {
|
|||
Context context = getContext("f1", script);
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
map.put("name", "Scott");
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
|
||||
sep.init(context);
|
||||
sep.applyTransformer(map);
|
||||
Assert.assertEquals(map.get("name"), "Hello Scott");
|
||||
|
@ -115,7 +115,7 @@ public class TestScriptTransformer {
|
|||
|
||||
Map map = new HashMap();
|
||||
map.put("nextToken", "hello");
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessorWrapper sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
|
||||
sep.init(c);
|
||||
sep.applyTransformer(map);
|
||||
Assert.assertEquals("true", map.get("$hasMore"));
|
||||
|
|
|
@ -55,7 +55,7 @@ public class TestSqlEntityProcessor {
|
|||
|
||||
@Test
|
||||
public void tranformer() {
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessor sep = new EntityProcessorWrapper( new SqlEntityProcessor(), null);
|
||||
List<Map<String, Object>> rows = getRows(2);
|
||||
VariableResolverImpl vr = new VariableResolverImpl();
|
||||
HashMap<String, String> ea = new HashMap<String, String>();
|
||||
|
@ -79,7 +79,7 @@ public class TestSqlEntityProcessor {
|
|||
|
||||
@Test
|
||||
public void tranformerWithReflection() {
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(), null);
|
||||
List<Map<String, Object>> rows = getRows(2);
|
||||
VariableResolverImpl vr = new VariableResolverImpl();
|
||||
HashMap<String, String> ea = new HashMap<String, String>();
|
||||
|
@ -103,7 +103,7 @@ public class TestSqlEntityProcessor {
|
|||
|
||||
@Test
|
||||
public void tranformerList() {
|
||||
SqlEntityProcessor sep = new SqlEntityProcessor();
|
||||
EntityProcessor sep = new EntityProcessorWrapper(new SqlEntityProcessor(),null);
|
||||
List<Map<String, Object>> rows = getRows(2);
|
||||
VariableResolverImpl vr = new VariableResolverImpl();
|
||||
|
||||
|
|
Loading…
Reference in New Issue