SOLR-3671: DIHWriter fix

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1592583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Dyer 2014-05-05 17:40:32 +00:00
parent 88846a958d
commit 3c14dc6a57
5 changed files with 133 additions and 42 deletions

View File

@ -158,6 +158,8 @@ Other Changes
* SOLR-6022: Deprecate getAnalyzer() in IndexField and FieldType, and add getIndexAnalyzer(). * SOLR-6022: Deprecate getAnalyzer() in IndexField and FieldType, and add getIndexAnalyzer().
(Ryan Ernst) (Ryan Ernst)
* SOLR_3671: Fix DIHWriter interface usage so users may implement writers that output
documents to a location external to Solr (ex. a NoSql db). (Roman Chyla via James Dyer)
Build Build
--------------------- ---------------------

View File

@ -17,6 +17,7 @@
package org.apache.solr.handler.dataimport; package org.apache.solr.handler.dataimport;
import static org.apache.solr.handler.dataimport.DataImporter.IMPORT_CMD; import static org.apache.solr.handler.dataimport.DataImporter.IMPORT_CMD;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
@ -43,6 +44,8 @@ import org.apache.solr.util.plugin.SolrCoreAware;
import java.util.*; import java.util.*;
import java.io.StringReader; import java.io.StringReader;
import java.lang.reflect.Constructor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
@ -76,6 +79,9 @@ public class DataImportHandler extends RequestHandlerBase implements
private String myName = "dataimport"; private String myName = "dataimport";
private static final String PARAM_WRITER_IMPL = "writerImpl";
private static final String DEFAULT_WRITER_NAME = "SolrWriter";
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void init(NamedList args) { public void init(NamedList args) {
@ -171,7 +177,7 @@ public class DataImportHandler extends RequestHandlerBase implements
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN)); req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp); UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
SolrResourceLoader loader = req.getCore().getResourceLoader(); SolrResourceLoader loader = req.getCore().getResourceLoader();
SolrWriter sw = getSolrWriter(processor, loader, requestParams, req); DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
if (requestParams.isDebug()) { if (requestParams.isDebug()) {
if (debugEnabled) { if (debugEnabled) {
@ -224,23 +230,44 @@ public class DataImportHandler extends RequestHandlerBase implements
return result; return result;
} }
private SolrWriter getSolrWriter(final UpdateRequestProcessor processor, private DIHWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final RequestInfo requestParams, SolrQueryRequest req) { final SolrResourceLoader loader, final RequestInfo requestParams,
SolrQueryRequest req) {
return new SolrWriter(processor, req) { SolrParams reqParams = req.getParams();
String writerClassStr = null;
@Override if (reqParams != null && reqParams.get(PARAM_WRITER_IMPL) != null) {
public boolean upload(SolrInputDocument document) { writerClassStr = (String) reqParams.get(PARAM_WRITER_IMPL);
try { }
return super.upload(document); DIHWriter writer;
} catch (RuntimeException e) { if (writerClassStr != null
LOG.error( "Exception while adding: " + document, e); && !writerClassStr.equals(DEFAULT_WRITER_NAME)
return false; && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "."
} + DEFAULT_WRITER_NAME)) {
try {
@SuppressWarnings("unchecked")
Class<DIHWriter> writerClass = DocBuilder.loadClass(writerClassStr, req.getCore());
Constructor<DIHWriter> cnstr = writerClass.getConstructor(new Class[] {
UpdateRequestProcessor.class, SolrQueryRequest.class});
return cnstr.newInstance((Object) processor, (Object) req);
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to load Writer implementation:" + writerClassStr, e);
} }
}; } else {
return new SolrWriter(processor, req) {
@Override
public boolean upload(SolrInputDocument document) {
try {
return super.upload(document);
} catch (RuntimeException e) {
LOG.error("Exception while adding: " + document, e);
return false;
}
}
};
}
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public NamedList getStatistics() { public NamedList getStatistics() {

View File

@ -399,7 +399,7 @@ public class DataImporter {
return importLock.isLocked(); return importLock.isLocked();
} }
public void doFullImport(SolrWriter writer, RequestInfo requestParams) { public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
LOG.info("Starting Full Import"); LOG.info("Starting Full Import");
setStatus(Status.RUNNING_FULL_DUMP); setStatus(Status.RUNNING_FULL_DUMP);
boolean success = false; boolean success = false;
@ -425,14 +425,14 @@ public class DataImporter {
} }
private void checkWritablePersistFile(SolrWriter writer, DIHProperties dihPropWriter) { private void checkWritablePersistFile(DIHWriter writer, DIHProperties dihPropWriter) {
if (isDeltaImportSupported && !dihPropWriter.isWritable()) { if (isDeltaImportSupported && !dihPropWriter.isWritable()) {
throw new DataImportHandlerException(SEVERE, throw new DataImportHandlerException(SEVERE,
"Properties is not writable. Delta imports are supported by data config but will not work."); "Properties is not writable. Delta imports are supported by data config but will not work.");
} }
} }
public void doDeltaImport(SolrWriter writer, RequestInfo requestParams) { public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
LOG.info("Starting Delta Import"); LOG.info("Starting Delta Import");
setStatus(Status.RUNNING_DELTA_DUMP); setStatus(Status.RUNNING_DELTA_DUMP);
boolean success = false; boolean success = false;
@ -457,7 +457,7 @@ public class DataImporter {
} }
public void runAsync(final RequestInfo reqParams, final SolrWriter sw) { public void runAsync(final RequestInfo reqParams, final DIHWriter sw) {
new Thread() { new Thread() {
@Override @Override
public void run() { public void run() {
@ -466,7 +466,7 @@ public class DataImporter {
}.start(); }.start();
} }
void runCmd(RequestInfo reqParams, SolrWriter sw) { void runCmd(RequestInfo reqParams, DIHWriter sw) {
String command = reqParams.getCommand(); String command = reqParams.getCommand();
if (command.equals(ABORT_CMD)) { if (command.equals(ABORT_CMD)) {
if (docBuilder != null) { if (docBuilder != null) {

View File

@ -80,13 +80,10 @@ public class DocBuilder {
private Map<String, Object> persistedProperties; private Map<String, Object> persistedProperties;
private DIHProperties propWriter; private DIHProperties propWriter;
private static final String PARAM_WRITER_IMPL = "writerImpl";
private static final String DEFAULT_WRITER_NAME = "SolrWriter";
private DebugLogger debugLogger; private DebugLogger debugLogger;
private final RequestInfo reqParams; private final RequestInfo reqParams;
@SuppressWarnings("unchecked") public DocBuilder(DataImporter dataImporter, DIHWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
INSTANCE.set(this); INSTANCE.set(this);
this.dataImporter = dataImporter; this.dataImporter = dataImporter;
this.reqParams = reqParams; this.reqParams = reqParams;
@ -95,20 +92,7 @@ public class DocBuilder {
verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose; verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
persistedProperties = propWriter.readIndexerProperties(); persistedProperties = propWriter.readIndexerProperties();
String writerClassStr = null; writer = solrWriter;
if(reqParams!=null && reqParams.getRawParams() != null) {
writerClassStr = (String) reqParams.getRawParams().get(PARAM_WRITER_IMPL);
}
if(writerClassStr != null && !writerClassStr.equals(DEFAULT_WRITER_NAME) && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "." + DEFAULT_WRITER_NAME)) {
try {
Class<DIHWriter> writerClass = loadClass(writerClassStr, dataImporter.getCore());
this.writer = writerClass.newInstance();
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to load Writer implementation:" + writerClassStr, e);
}
} else {
writer = solrWriter;
}
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this); ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
writer.init(ctx); writer.init(ctx);
} }
@ -905,9 +889,6 @@ public class DocBuilder {
return reqParams; return reqParams;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static Class loadClass(String name, SolrCore core) throws ClassNotFoundException { static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
try { try {

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.*;
/**
* <p>
* Test for writerImpl paramater (to provide own SolrWriter)
* </p>
*
*
* @since solr 4.0
*/
public class TestWriterImpl extends AbstractDataImportHandlerTestCase {
@BeforeClass
public static void beforeClass() throws Exception {
initCore("dataimport-nodatasource-solrconfig.xml", "dataimport-schema.xml");
}
@Test
@SuppressWarnings("unchecked")
public void testDataConfigWithDataSource() throws Exception {
List rows = new ArrayList();
rows.add(createMap("id", "1", "desc", "one"));
rows.add(createMap("id", "2", "desc", "two"));
rows.add(createMap("id", "3", "desc", "break"));
rows.add(createMap("id", "4", "desc", "four"));
MockDataSource.setIterator("select * from x", rows.iterator());
Map extraParams = createMap("writerImpl", TestSolrWriter.class.getName(),
"commit", "true");
runFullImport(loadDataConfig("data-config-with-datasource.xml"),
extraParams);
assertQ(req("id:1"), "//*[@numFound='1']");
assertQ(req("id:2"), "//*[@numFound='1']");
assertQ(req("id:3"), "//*[@numFound='0']");
assertQ(req("id:4"), "//*[@numFound='1']");
}
public static class TestSolrWriter extends SolrWriter {
public TestSolrWriter(UpdateRequestProcessor processor, SolrQueryRequest req) {
super(processor, req);
}
@Override
public boolean upload(SolrInputDocument doc) {
if (doc.getField("desc").getFirstValue().equals("break")) {
return false;
}
return super.upload(doc);
}
}
}