SOLR-1352 . Multithreaded implementation

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@898209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2010-01-12 07:49:40 +00:00
parent c66a2853bb
commit 9f74e4337b
14 changed files with 615 additions and 98 deletions

View File

@ -216,9 +216,11 @@ public abstract class Context {
/** Resolve variables in a template
* @param template
*
* @return The string w/ variables resolved
*/
public abstract String replaceTokens(String template);
static final ThreadLocal<Context> CURRENT_CONTEXT = new ThreadLocal<Context>();
}

View File

@ -20,9 +20,9 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.core.SolrCore;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
@ -122,23 +122,32 @@ public class ContextImpl extends Context {
}
public void setSessionAttribute(String name, Object val, String scope) {
if(name == null) return;
if (Context.SCOPE_ENTITY.equals(scope)) {
if (entitySession == null)
entitySession = new HashMap<String, Object>();
entitySession.put(name, val);
entitySession = new ConcurrentHashMap<String, Object>();
putVal(name, val,entitySession);
} else if (Context.SCOPE_GLOBAL.equals(scope)) {
if (globalSession != null) {
globalSession.put(name, val);
putVal(name, val,globalSession);
}
} else if (Context.SCOPE_DOC.equals(scope)) {
DocBuilder.DocWrapper doc = getDocument();
if (doc != null)
doc.setSessionAttribute(name, val);
} else if (SCOPE_SOLR_CORE.equals(scope)){
if(dataImporter != null) dataImporter.getCoreScopeSession().put(name, val);
if(dataImporter != null) {
putVal(name, val,dataImporter.getCoreScopeSession());
}
}
}
private void putVal(String name, Object val, Map map) {
if(val == null) map.remove(name);
else entitySession.put(name, val);
}
public Object getSessionAttribute(String name, String scope) {
if (Context.SCOPE_ENTITY.equals(scope)) {
if (entitySession == null)

View File

@ -56,6 +56,8 @@ public class DataConfig {
public Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
boolean isMultiThreaded = false;
public static class Document {
// TODO - remove from here and add it to entity
public String deleteQuery;

View File

@ -38,6 +38,7 @@ import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p> Stores all configuration information for pulling and indexing data. </p>
@ -85,7 +86,7 @@ public class DataImporter {
* Only for testing purposes
*/
DataImporter() {
coreScopeSession = new HashMap<String, Object>();
coreScopeSession = new ConcurrentHashMap<String, Object>();
}
DataImporter(String dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session) {
@ -206,6 +207,10 @@ public class DataImporter {
// if in this chain no document root is found()
e.isDocRoot = true;
}
if (e.allAttributes.get("threads") != null) {
if(docRootFound) throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "'threads' not allowed below rootEntity ");
config.isMultiThreaded = true;
}
if (e.fields != null) {
for (DataConfig.Field f : e.fields) {

View File

@ -20,6 +20,8 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrCore;
import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import org.apache.solr.schema.SchemaField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.text.ParseException;
import java.util.concurrent.*;
/**
* <p> DocBuilder is responsible for creating Solr documents out of the given configuration. It also maintains
@ -61,7 +63,7 @@ public class DocBuilder {
boolean verboseDebug = false;
private Map<String, Object> session = new HashMap<String, Object>();
Map<String, Object> session = new ConcurrentHashMap<String, Object>();
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
Map<String, Object> functionsNamespace;
@ -81,8 +83,9 @@ public class DocBuilder {
public VariableResolverImpl getVariableResolver() {
try {
VariableResolverImpl resolver = null;
if(dataImporter != null && dataImporter.getCore() != null) resolver = new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties());
else resolver = new VariableResolverImpl();
if(dataImporter != null && dataImporter.getCore() != null){
resolver = new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties());
} else resolver = new VariableResolverImpl();
Map<String, Object> indexerNamespace = new HashMap<String, Object>();
if (persistedProperties.getProperty(LAST_INDEX_TIME) != null) {
indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.getProperty(LAST_INDEX_TIME));
@ -106,7 +109,7 @@ public class DocBuilder {
resolver.addNamespace(DataConfig.IMPORTER_NS, indexerNamespace);
return resolver;
} catch (Exception e) {
DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e);
wrapAndThrow(SEVERE, e);
// unreachable statement
return null;
}
@ -117,7 +120,7 @@ public class DocBuilder {
EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
notifyListener(listener);
} catch (Exception e) {
DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e, "Unable to load class : " + className);
wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
}
}
@ -247,8 +250,17 @@ public class DocBuilder {
@SuppressWarnings("unchecked")
private void doFullDump() {
addStatusMessage("Full Dump Started");
buildDocument(getVariableResolver(), null, null, root, true,
null);
if(dataImporter.getConfig().isMultiThreaded && !verboseDebug){
try {
LOG.info("running multithreaded full-import");
new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
} catch (Exception e) {
LOG.error("error in import", e);
}
} else {
buildDocument(getVariableResolver(), null, null, root, true, null);
}
}
@SuppressWarnings("unchecked")
@ -269,7 +281,7 @@ public class DocBuilder {
addStatusMessage("Deltas Obtained");
addStatusMessage("Building documents");
if (!deletedKeys.isEmpty()) {
allPks.removeAll(deletedKeys);
allPks.removeAll(deletedKeys);
deleteAll(deletedKeys);
// Make sure that documents are not re-created
}
@ -298,21 +310,222 @@ public class DocBuilder {
Iterator<Map<String, Object>> iter = deletedKeys.iterator();
while (iter.hasNext()) {
Map<String, Object> map = iter.next();
Object key = map.get(root.getPk());
Object key = map.get(root.getPk());
if(key == null) {
LOG.warn("no key was available for deleteted pk query");
continue;
}
writer.deleteDoc(key);
importStatistics.deletedDocCount.incrementAndGet();
importStatistics.deletedDocCount.incrementAndGet();
iter.remove();
}
}
Executor executorSvc = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
new SynchronousQueue<Runnable>() // directly hand off tasks
);
@SuppressWarnings("unchecked")
public void addStatusMessage(String msg) {
statusMessages.put(msg, DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
}
EntityRunner createRunner(DataConfig.Entity entity, EntityRunner parent){
return new EntityRunner(entity, parent);
}
/**This class is a just a structure to hold runtime information of one entity
*
*/
class EntityRunner {
final DataConfig.Entity entity;
private EntityProcessor entityProcessor;
private final List<ThreadedEntityProcessorWrapper> entityProcessorWrapper = new ArrayList<ThreadedEntityProcessorWrapper>();
private DocWrapper docWrapper;
private volatile boolean entityInitialized ;
String currentProcess;
ThreadLocal<ThreadedEntityProcessorWrapper> currentEntityProcWrapper = new ThreadLocal<ThreadedEntityProcessorWrapper>();
private ContextImpl context;
EntityRunner parent;
AtomicBoolean entityEnded = new AtomicBoolean(false);
private Exception exception;
public EntityRunner(DataConfig.Entity entity, EntityRunner parent) {
this.parent = parent;
this.entity = entity;
if (entity.proc == null) {
entityProcessor = new SqlEntityProcessor();
} else {
try {
entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
.newInstance();
} catch (Exception e) {
wrapAndThrow(SEVERE, e,
"Unable to load EntityProcessor implementation for entity:" + entity.name);
}
}
int threads = 1;
if (entity.allAttributes.get("threads") != null) {
threads = Integer.parseInt(entity.allAttributes.get("threads"));
}
for (int i = 0; i < threads; i++) {
entityProcessorWrapper.add(new ThreadedEntityProcessorWrapper(entityProcessor, DocBuilder.this, this, getVariableResolver()));
}
context = new ThreadedContext(this, DocBuilder.this);
}
public void run(DocWrapper docWrapper, final String currProcess, final EntityRow rows) throws Exception {
entityInitialized = false;
this.docWrapper = docWrapper;
this.currentProcess = currProcess;
entityEnded.set(false);
try {
if(entityProcessorWrapper.size() <= 1){
runAThread(entityProcessorWrapper.get(0), rows, currProcess);
} else {
final CountDownLatch latch = new CountDownLatch(entityProcessorWrapper.size());
for (final ThreadedEntityProcessorWrapper processorWrapper : entityProcessorWrapper) {
Runnable runnable = new Runnable() {
public void run() {
try {
runAThread(processorWrapper, rows, currProcess);
}catch(Exception e) {
entityEnded.set(true);
exception = e;
} finally {
latch.countDown();
}
}
};
executorSvc.execute(runnable);
}
try {
latch.await();
} catch (InterruptedException e) {
//TODO
}
Exception copy = exception;
if(copy != null){
exception = null;
throw copy;
}
}
} finally {
entityProcessor.destroy();
}
}
private void runAThread(ThreadedEntityProcessorWrapper epw, EntityRow rows, String currProcess) throws Exception {
currentEntityProcWrapper.set(epw);
epw.threadedInit(context);
initEntity();
try {
epw.init(rows);
DocWrapper docWrapper = this.docWrapper;
Context.CURRENT_CONTEXT.set(context);
for (; ;) {
try {
Map<String, Object> arow = epw.nextRow();
if (arow == null) {
break;
} else {
importStatistics.rowsCount.incrementAndGet();
if (docWrapper == null && entity.isDocRoot) {
docWrapper = new DocWrapper();
context.setDoc(docWrapper);
DataConfig.Entity e = entity.parentEntity;
for (EntityRow row = rows; row != null&& e !=null; row = row.tail,e=e.parentEntity) {
addFields(e, docWrapper, row.row, epw.resolver);
}
}
if (docWrapper != null) {
handleSpecialCommands(arow, docWrapper);
addFields(entity, docWrapper, arow, epw.resolver);
}
if (entity.entities != null) {
EntityRow nextRow = new EntityRow(arow, rows, entity.name);
for (DataConfig.Entity e : entity.entities) {
epw.children.get(e).run(docWrapper,currProcess,nextRow);
}
}
}
if (entity.isDocRoot) {
LOG.info("a row on docroot" + docWrapper);
if (!docWrapper.isEmpty()) {
LOG.info("adding a doc "+docWrapper);
boolean result = writer.upload(docWrapper);
docWrapper = null;
if (result){
importStatistics.docCount.incrementAndGet();
} else {
importStatistics.failedDocCount.incrementAndGet();
}
}
}
} catch (DataImportHandlerException dihe) {
exception = dihe;
if(dihe.getErrCode() == SKIP_ROW || dihe.getErrCode() == SKIP) {
importStatistics.skipDocCount.getAndIncrement();
exception = null;//should not propogate up
continue;
}
if (entity.isDocRoot) {
if (dihe.getErrCode() == DataImportHandlerException.SKIP) {
importStatistics.skipDocCount.getAndIncrement();
exception = null;//should not propogate up
} else {
LOG.error("Exception while processing: "
+ entity.name + " document : " + docWrapper, dihe);
}
if (dihe.getErrCode() == DataImportHandlerException.SEVERE)
throw dihe;
} else {
//if this is not the docRoot then the execution has happened in the same thread. so propogate up,
// it will be handled at the docroot
entityEnded.set(true);
throw dihe;
}
entityEnded.set(true);
}
}
} finally {
epw.destroy();
currentEntityProcWrapper.remove();
Context.CURRENT_CONTEXT.remove();
}
}
private void initEntity() {
if (!entityInitialized) {
synchronized (this) {
if (!entityInitialized) {
entityProcessor.init(context);
entityInitialized = true;
}
}
}
}
}
/**A reverse linked list .
*
*/
static class EntityRow {
final Map<String, Object> row;
final EntityRow tail;
final String name;
EntityRow(Map<String, Object> row, EntityRow tail, String name) {
this.row = row;
this.tail = tail;
this.name = name;
}
}
@SuppressWarnings("unchecked")
private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
@ -325,7 +538,8 @@ public class DocBuilder {
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
entityProcessor.init(ctx);
Context.CURRENT_CONTEXT.set(ctx);
if (requestParameters.start > 0) {
writer.log(SolrWriter.DISABLE_LOGGING, null, null);
}
@ -392,6 +606,10 @@ public class DocBuilder {
}
vr.removeNamespace(entity.name);
}
/*The child entities would have changed the CURRENT_CONTEXT. So when they are done, set it back to the old.
*
*/
Context.CURRENT_CONTEXT.set(ctx);
if (entity.isDocRoot) {
if (stop.get())
@ -402,7 +620,7 @@ public class DocBuilder {
if (result){
importStatistics.docCount.incrementAndGet();
} else {
importStatistics.failedDocCount.incrementAndGet();
importStatistics.failedDocCount.incrementAndGet();
}
}
}
@ -435,6 +653,7 @@ public class DocBuilder {
writer.log(SolrWriter.ROW_END, entity.name, null);
if (entity.isDocRoot)
writer.log(SolrWriter.END_DOC, null, null);
Context.CURRENT_CONTEXT.remove();
}
}
}
@ -573,7 +792,7 @@ public class DocBuilder {
private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {
if (entity.processor != null)
return entity.processor;
EntityProcessor entityProcessor;
EntityProcessor entityProcessor = null;
if (entity.proc == null) {
entityProcessor = new SqlEntityProcessor();
} else {
@ -581,9 +800,8 @@ public class DocBuilder {
entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
.newInstance();
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to load EntityProcessor implementation for entity:"
+ entity.name, e);
wrapAndThrow (SEVERE,e,
"Unable to load EntityProcessor implementation for entity:" + entity.name);
}
}
return entity.processor = new EntityProcessorWrapper(entityProcessor, this);

View File

@ -37,20 +37,18 @@ import java.util.Map;
public class EntityProcessorWrapper extends EntityProcessor {
private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class);
private EntityProcessor delegate;
EntityProcessor delegate;
private DocBuilder docBuilder;
private String onError;
private Context context;
private VariableResolverImpl resolver;
private String entityName;
String onError;
protected Context context;
protected VariableResolverImpl resolver;
String entityName;
protected List<Transformer> transformers;
protected List<Map<String, Object>> rowcache;
private Context contextCopy;
public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
this.delegate = delegate;
this.docBuilder = docBuilder;
@ -61,8 +59,6 @@ public class EntityProcessorWrapper extends EntityProcessor {
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
//context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
contextCopy = resolver.context;
resolver.context = context;
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) onError = ABORT;
@ -276,8 +272,6 @@ public class EntityProcessorWrapper extends EntityProcessor {
public void destroy() {
delegate.destroy();
resolver.context = contextCopy;
contextCopy = null;
}
public VariableResolverImpl getVariableResolver() {

View File

@ -212,8 +212,7 @@ public class EvaluatorBag {
if (evaluator == null)
return null;
VariableResolverImpl vri = VariableResolverImpl.CURRENT_VARIABLE_RESOLVER.get();
Context ctx = vri == null ? null : vri.context;
return evaluator.evaluate(m.group(2), ctx);
return evaluator.evaluate(m.group(2), Context.CURRENT_CONTEXT.get());
}
};

View File

@ -0,0 +1,76 @@
package org.apache.solr.handler.dataimport;
public class ThreadedContext extends ContextImpl{
private DocBuilder.EntityRunner entityRunner;
private boolean limitedContext = false;
public ThreadedContext(DocBuilder.EntityRunner entityRunner, DocBuilder docBuilder) {
super(entityRunner.entity,
null,//to be fethed realtime
null,
null,
docBuilder.session,
null,
docBuilder);
this.entityRunner = entityRunner;
}
@Override
public VariableResolver getVariableResolver() {
checkLimited();
return entityRunner.currentEntityProcWrapper.get().resolver;
}
@Override
public Context getParentContext() {
ThreadedContext ctx = new ThreadedContext(entityRunner.parent, docBuilder);
ctx.limitedContext = true;
return ctx;
}
@Override
public String currentProcess() {
return entityRunner.currentProcess;
}
@Override
public EntityProcessor getEntityProcessor() {
return entityRunner.currentEntityProcWrapper.get().delegate;
}
@Override
public DataSource getDataSource() {
checkLimited();
return super.getDataSource();
}
private void checkLimited() {
if(limitedContext) throw new RuntimeException("parentContext does not support this method");
}
@Override
public String getResolvedEntityAttribute(String name) {
checkLimited();
return super.getResolvedEntityAttribute(name);
}
@Override
public void setSessionAttribute(String name, Object val, String scope) {
checkLimited();
super.setSessionAttribute(name, val, scope);
}
@Override
public Object resolve(String var) {
return getVariableResolver().resolve(var);
}
@Override
public String replaceTokens(String template) {
return getVariableResolver().replaceTokens(template);
}
}

View File

@ -0,0 +1,98 @@
package org.apache.solr.handler.dataimport;
import static org.apache.solr.handler.dataimport.EntityProcessorBase.ON_ERROR;
import static org.apache.solr.handler.dataimport.EntityProcessorBase.ABORT;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
/**
* Each Entity may have only a single EntityProcessor . But the same entity can be run by
* multiple EntityProcessorWrapper (1 per thread) . thhis helps running transformations in multiple threads
*/
public class ThreadedEntityProcessorWrapper extends EntityProcessorWrapper {
private static final Logger LOG = LoggerFactory.getLogger(ThreadedEntityProcessorWrapper.class);
DocBuilder.EntityRunner entityRunner;
/**For each child entity there is one EntityRunner
*/
Map<DataConfig.Entity ,DocBuilder.EntityRunner> children;
public ThreadedEntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder,
DocBuilder.EntityRunner entityRunner,
VariableResolverImpl resolver) {
super(delegate, docBuilder);
this.entityRunner = entityRunner;
this.resolver = resolver;
if (entityRunner.entity.entities == null) {
children = Collections.emptyMap();
} else {
children = new HashMap<DataConfig.Entity, DocBuilder.EntityRunner>(entityRunner.entity.entities.size());
for (DataConfig.Entity e : entityRunner.entity.entities) {
DocBuilder.EntityRunner runner = docBuilder.createRunner(e, entityRunner);
children.put(e, runner);
}
}
}
void threadedInit(Context context){
rowcache = null;
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
//context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) onError = ABORT;
entityName = context.getEntityAttribute(DataConfig.NAME);
}
}
@Override
public Map<String, Object> nextRow() {
if (rowcache != null) {
return getFromRowCache();
}
while (true) {
Map<String, Object> arow = null;
synchronized (delegate) {
if(entityRunner.entityEnded.get()) return null;
try {
arow = delegate.nextRow();
} catch (Exception e) {
if (ABORT.equals(onError)) {
wrapAndThrow(SEVERE, e);
} else {
//SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state
LOG.error("Exception in entity : " + entityName, e);
return null;
}
}
LOG.info("arow : "+arow);
if(arow == null) entityRunner.entityEnded.set(true);
}
if (arow == null) {
return null;
} else {
arow = applyTransformer(arow);
if (arow != null) {
delegate.postTransform(arow);
return arow;
}
}
}
}
public void init(DocBuilder.EntityRow rows) {
for (DocBuilder.EntityRow row = rows; row != null; row = row.tail) resolver.addNamespace(row.name, row.row);
}
}

View File

@ -239,6 +239,7 @@ public class TestDocBuilder {
}
public static final String dc_singleEntity = "<dataConfig>\n"
+ "<dataSource type=\"MockDataSource\"/>\n"
+ " <document name=\"X\" >\n"
+ " <entity name=\"x\" query=\"select * from x\">\n"
+ " <field column=\"id\"/>\n"
@ -247,6 +248,7 @@ public class TestDocBuilder {
+ " </document>\n" + "</dataConfig>";
public static final String dc_deltaConfig = "<dataConfig>\n"
+ "<dataSource type=\"MockDataSource\"/>\n"
+ " <document name=\"X\" >\n"
+ " <entity name=\"x\" query=\"select * from x\" deltaQuery=\"select id from x\">\n"
+ " <field column=\"id\"/>\n"

View File

@ -98,15 +98,19 @@ public class TestEvaluatorBag {
public void testEscapeSolrQueryFunction() {
final VariableResolverImpl resolver = new VariableResolverImpl();
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
resolver.context = context;
Map m= new HashMap();
m.put("query","c:t");
resolver.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST, null));
resolver.addNamespace("e",m);
String s = resolver
.replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
org.junit.Assert.assertEquals("c\\:t", s);
Context.CURRENT_CONTEXT.set(context);
try {
Map m= new HashMap();
m.put("query","c:t");
resolver.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST, null));
resolver.addNamespace("e",m);
String s = resolver
.replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
org.junit.Assert.assertEquals("c\\:t", s);
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
/**
@ -115,31 +119,39 @@ public class TestEvaluatorBag {
@Test
public void testGetDateFormatEvaluator() {
Evaluator dateFormatEval = EvaluatorBag.getDateFormatEvaluator();
resolver.context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
Context.CURRENT_CONTEXT.set(context);
try {
long time = System.currentTimeMillis();
assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time - 2*86400*1000)),
dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
long time = System.currentTimeMillis();
assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time - 2*86400*1000)),
dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", resolver.context));
Map<String, Object> map = new HashMap<String, Object>();
map.put("key", new Date(time));
resolver.addNamespace("A", map);
Map<String, Object> map = new HashMap<String, Object>();
map.put("key", new Date(time));
resolver.addNamespace("A", map);
assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time)),
dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", resolver.context));
assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time)),
dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
private void runTests(Map<String, String> tests, Evaluator evaluator) {
ContextImpl ctx = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
resolver.context = ctx;
for (Map.Entry<String, String> entry : tests.entrySet()) {
Map<String, Object> values = new HashMap<String, Object>();
values.put("key", entry.getKey());
resolver.addNamespace("A", values);
Context.CURRENT_CONTEXT.set(ctx);
try {
for (Map.Entry<String, String> entry : tests.entrySet()) {
Map<String, Object> values = new HashMap<String, Object>();
values.put("key", entry.getKey());
resolver.addNamespace("A", values);
String expected = (String) entry.getValue();
String actual = evaluator.evaluate("A.key", ctx);
assertEquals(expected, actual);
String expected = (String) entry.getValue();
String actual = evaluator.evaluate("A.key", ctx);
assertEquals(expected, actual);
}
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
}

View File

@ -71,6 +71,26 @@ public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTest {
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("desc:hello"), "//*[@numFound='1']");
}
@Test
@SuppressWarnings("unchecked")
public void testCompositePk_FullImport_MT() throws Exception {
List parentRow = new ArrayList();
parentRow.add(createMap("id", "1"));
parentRow.add(createMap("id", "2"));
MockDataSource.setIterator("select * from x", parentRow.iterator());
List childRow = new ArrayList();
childRow.add(createMap("desc", "hello"));
MockDataSource.setIterator("select * from y where y.A=1", childRow.iterator());
MockDataSource.setIterator("select * from y where y.A=2", childRow.iterator());
super.runFullImport(dataConfig_2threads);
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("id:2"), "//*[@numFound='1']");
assertQ(req("desc:hello"), "//*[@numFound='2']");
}
@Test
@SuppressWarnings("unchecked")
@ -234,6 +254,15 @@ public class TestSqlEntityProcessor2 extends AbstractDataImportHandlerTest {
+ " </entity>\n" + " </entity>\n"
+ " </document>\n" + "</dataConfig>\n";
private static String dataConfig_2threads = "<dataConfig><dataSource type=\"MockDataSource\"/>\n"
+ " <document>\n"
+ " <entity name=\"x\" pk=\"id\" query=\"select * from x\" threads=\"2\">\n"
+ " <field column=\"id\" />\n"
+ " <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
+ " <field column=\"desc\" />\n"
+ " </entity>\n" + " </entity>\n"
+ " </document>\n" + "</dataConfig>\n";
private static String dataConfig_deltaimportquery = "<dataConfig><dataSource type=\"MockDataSource\"/>\n"
+ " <document>\n"
+ " <entity name=\"x\" deltaImportQuery=\"select * from x where id=${dataimporter.delta.id}\" deltaQuery=\"select id from x where last_modified > NOW\">\n"

View File

@ -0,0 +1,56 @@
package org.apache.solr.handler.dataimport;
import org.junit.Test;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
public class TestThreaded extends AbstractDataImportHandlerTest {
@Test
@SuppressWarnings("unchecked")
public void testCompositePk_FullImport() throws Exception {
List parentRow = new ArrayList();
// parentRow.add(createMap("id", "1"));
parentRow.add(createMap("id", "2"));
parentRow.add(createMap("id", "3"));
parentRow.add(createMap("id", "4"));
parentRow.add(createMap("id", "1"));
MockDataSource.setIterator("select * from x", parentRow.iterator());
List childRow = new ArrayList();
Map map = createMap("desc", "hello");
childRow.add(map);
MockDataSource.setIterator("select * from y where y.A=1", childRow.iterator());
MockDataSource.setIterator("select * from y where y.A=2", childRow.iterator());
MockDataSource.setIterator("select * from y where y.A=3", childRow.iterator());
MockDataSource.setIterator("select * from y where y.A=4", childRow.iterator());
super.runFullImport(dataConfig);
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("*:*"), "//*[@numFound='4']");
assertQ(req("desc:hello"), "//*[@numFound='4']");
}
@Override
public String getSchemaFile() {
return "dataimport-schema.xml";
}
@Override
public String getSolrConfigFile() {
return "dataimport-solrconfig.xml";
}
private static String dataConfig = "<dataConfig>\n"
+"<dataSource type=\"MockDataSource\"/>\n"
+ " <document>\n"
+ " <entity name=\"x\" threads=\"2\" query=\"select * from x\" deletedPkQuery=\"select id from x where last_modified > NOW AND deleted='true'\" deltaQuery=\"select id from x where last_modified > NOW\">\n"
+ " <field column=\"id\" />\n"
+ " <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
+ " <field column=\"desc\" />\n"
+ " </entity>\n" + " </entity>\n"
+ " </document>\n" + "</dataConfig>";
}

View File

@ -84,30 +84,40 @@ public class TestVariableResolver {
@Test
public void dateNamespaceWithValue() {
VariableResolverImpl vri = new VariableResolverImpl();
vri.context = new ContextImpl(null,vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
vri.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST, null));
Map<String, Object> ns = new HashMap<String, Object>();
Date d = new Date();
ns.put("dt", d);
vri.addNamespace("A", ns);
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
Context.CURRENT_CONTEXT.set(context);
try {
vri.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST, null));
Map<String, Object> ns = new HashMap<String, Object>();
Date d = new Date();
ns.put("dt", d);
vri.addNamespace("A", ns);
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
@Test
public void dateNamespaceWithExpr() throws Exception {
VariableResolverImpl vri = new VariableResolverImpl();
vri.context = new ContextImpl(null,vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
vri.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST,null));
ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
Context.CURRENT_CONTEXT.set(context);
try {
vri.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(Collections.EMPTY_LIST,null));
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
@Test
@ -130,26 +140,31 @@ public class TestVariableResolver {
@Test
public void testFunctionNamespace1() throws Exception {
final VariableResolverImpl resolver = new VariableResolverImpl();
resolver.context = new ContextImpl(null,resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
Map<String ,String > m = new HashMap<String, String>();
m.put("name","test");
m.put("class",E.class.getName());
l.add(m);
VariableResolverImpl resolver = new VariableResolverImpl();
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
Context.CURRENT_CONTEXT.set(context);
try {
final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
Map<String ,String > m = new HashMap<String, String>();
m.put("name","test");
m.put("class",E.class.getName());
l.add(m);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
format.setTimeZone(TimeZone.getTimeZone("UTC"));
DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
resolver.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(l,null));
String s = resolver
.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
.format(dmp.parseMath("/DAY")), s);
Assert.assertEquals("Hello World", resolver
.replaceTokens("${dataimporter.functions.test('TEST')}"));
resolver.addNamespace("dataimporter.functions", EvaluatorBag
.getFunctionsNamespace(l,null));
String s = resolver
.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
.format(dmp.parseMath("/DAY")), s);
Assert.assertEquals("Hello World", resolver
.replaceTokens("${dataimporter.functions.test('TEST')}"));
} finally {
Context.CURRENT_CONTEXT.remove();
}
}
public static class E extends Evaluator{