SOLR-4051: Configurable DIH Property Writers

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1408873 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Dyer 2012-11-13 18:30:51 +00:00
parent 130df2e8e1
commit 3e45aec329
14 changed files with 1240 additions and 803 deletions

View File

@ -0,0 +1,44 @@
package org.apache.solr.handler.dataimport;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in combstract clapliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Date;
import java.util.Map;
/**
* Implementations write out properties about the last data import
* for use by the next import. ex: to persist the last import timestamp
* so that future delta imports can know what needs to be updated.
*
* @lucene.experimental
*/
public abstract class DIHProperties {
public abstract void init(DataImporter dataImporter, Map<String, String> initParams);
public abstract boolean isWritable();
public abstract void persist(Map<String, Object> props);
public abstract Map<String, Object> readIndexerProperties();
public Date getCurrentTimestamp() {
return new Date();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.apache.solr.handler.dataimport.config.PropertyWriter;
import org.apache.solr.handler.dataimport.config.Script;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
@ -78,7 +79,6 @@ public class DataImporter {
public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
private SolrCore core;
private Map<String, Object> coreScopeSession = new ConcurrentHashMap<String,Object>();
private DIHPropertiesWriter propWriter;
private ReentrantLock importLock = new ReentrantLock();
private boolean isDeltaImportSupported = false;
private final String handlerName;
@ -88,8 +88,6 @@ public class DataImporter {
* Only for testing purposes
*/
DataImporter() {
createPropertyWriter();
propWriter.init(this);
this.handlerName = "dataimport" ;
}
@ -97,19 +95,10 @@ public class DataImporter {
this.handlerName = handlerName;
this.core = core;
this.schema = core.getSchema();
loadSchemaFieldMap();
createPropertyWriter();
loadSchemaFieldMap();
}
private void createPropertyWriter() {
if (this.core == null
|| !this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
propWriter = new SimplePropertiesWriter();
} else {
propWriter = new ZKPropertiesWriter();
}
propWriter.init(this);
}
boolean maybeReloadConfiguration(RequestInfo params,
@ -278,7 +267,7 @@ public class DataImporter {
}
}
}
List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, DATA_SRC);
List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.DATA_SRC);
if (!dataSourceTags.isEmpty()) {
for (Element element : dataSourceTags) {
Map<String,String> p = new HashMap<String,String>();
@ -295,7 +284,54 @@ public class DataImporter {
break;
}
}
return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources);
PropertyWriter pw = null;
List<Element> propertyWriterTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.PROPERTY_WRITER);
if (propertyWriterTags.isEmpty()) {
boolean zookeeper = false;
if (this.core != null
&& this.core.getCoreDescriptor().getCoreContainer()
.isZooKeeperAware()) {
zookeeper = true;
}
pw = new PropertyWriter(zookeeper ? "ZKPropertiesWriter"
: "SimplePropertiesWriter", Collections.<String,String> emptyMap());
} else if (propertyWriterTags.size() > 1) {
throw new DataImportHandlerException(SEVERE, "Only one "
+ ConfigNameConstants.PROPERTY_WRITER + " can be configured.");
} else {
Element pwElement = propertyWriterTags.get(0);
String type = null;
Map<String,String> params = new HashMap<String,String>();
for (Map.Entry<String,String> entry : ConfigParseUtil.getAllAttributes(
pwElement).entrySet()) {
if (TYPE.equals(entry.getKey())) {
type = entry.getValue();
} else {
params.put(entry.getKey(), entry.getValue());
}
}
if (type == null) {
throw new DataImportHandlerException(SEVERE, "The "
+ ConfigNameConstants.PROPERTY_WRITER + " element must specify "
+ TYPE);
}
pw = new PropertyWriter(type, params);
}
return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources, pw);
}
@SuppressWarnings("unchecked")
private DIHProperties createPropertyWriter() {
DIHProperties propWriter = null;
PropertyWriter configPw = config.getPropertyWriter();
try {
Class<DIHProperties> writerClass = DocBuilder.loadClass(configPw.getType(), this.core);
propWriter = writerClass.newInstance();
propWriter.init(this, configPw.getParameters());
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to PropertyWriter implementation:" + configPw.getType(), e);
}
return propWriter;
}
DIHConfiguration getConfig() {
@ -374,11 +410,11 @@ public class DataImporter {
LOG.info("Starting Full Import");
setStatus(Status.RUNNING_FULL_DUMP);
setIndexStartTime(new Date());
try {
docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer);
DIHProperties dihPropWriter = createPropertyWriter();
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
checkWritablePersistFile(writer, dihPropWriter);
docBuilder.execute();
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);
@ -392,10 +428,8 @@ public class DataImporter {
}
private void checkWritablePersistFile(SolrWriter writer) {
// File persistFile = propWriter.getPersistFile();
// boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
if (isDeltaImportSupported && !propWriter.isWritable()) {
private void checkWritablePersistFile(SolrWriter writer, DIHProperties dihPropWriter) {
if (isDeltaImportSupported && !dihPropWriter.isWritable()) {
throw new DataImportHandlerException(SEVERE,
"Properties is not writable. Delta imports are supported by data config but will not work.");
}
@ -406,9 +440,10 @@ public class DataImporter {
setStatus(Status.RUNNING_DELTA_DUMP);
try {
setIndexStartTime(new Date());
docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer);
DIHProperties dihPropWriter = createPropertyWriter();
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
checkWritablePersistFile(writer, dihPropWriter);
docBuilder.execute();
if (!requestParams.isDebug())
cumulativeStatistics.add(docBuilder.importStatistics);

View File

@ -68,16 +68,16 @@ public class DocBuilder {
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
private Map<String, Object> functionsNamespace;
private Properties persistedProperties;
private Map<String, Object> persistedProperties;
private DIHPropertiesWriter propWriter;
private DIHProperties propWriter;
private static final String PARAM_WRITER_IMPL = "writerImpl";
private static final String DEFAULT_WRITER_NAME = "SolrWriter";
private DebugLogger debugLogger;
private final RequestInfo reqParams;
@SuppressWarnings("unchecked")
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, RequestInfo reqParams) {
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
INSTANCE.set(this);
this.dataImporter = dataImporter;
this.reqParams = reqParams;
@ -121,22 +121,22 @@ public class DocBuilder {
resolver = new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties());
} else resolver = new VariableResolverImpl();
Map<String, Object> indexerNamespace = new HashMap<String, Object>();
if (persistedProperties.getProperty(LAST_INDEX_TIME) != null) {
indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.getProperty(LAST_INDEX_TIME));
if (persistedProperties.get(LAST_INDEX_TIME) != null) {
indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
} else {
// set epoch
indexerNamespace.put(LAST_INDEX_TIME, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH));
indexerNamespace.put(LAST_INDEX_TIME, EPOCH);
}
indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
indexerNamespace.put("request", reqParams.getRawParams());
indexerNamespace.put("functions", functionsNamespace);
for (Entity entity : dataImporter.getConfig().getEntities()) {
String key = entity.getName() + "." + SolrWriter.LAST_INDEX_KEY;
String lastIndex = persistedProperties.getProperty(key);
if (lastIndex != null) {
Object lastIndex = persistedProperties.get(key);
if (lastIndex != null && lastIndex instanceof Date) {
indexerNamespace.put(key, lastIndex);
} else {
indexerNamespace.put(key, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH));
indexerNamespace.put(key, EPOCH);
}
}
resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
@ -206,9 +206,8 @@ public class DocBuilder {
}
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
Properties lastIndexTimeProps = new Properties();
lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
Map<String,Object> lastIndexTimeProps = new HashMap<String,Object>();
lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
epwList = new ArrayList<EntityProcessorWrapper>(config.getEntities().size());
for (Entity e : config.getEntities()) {
@ -217,8 +216,7 @@ public class DocBuilder {
for (EntityProcessorWrapper epw : epwList) {
if (entities != null && !entities.contains(epw.getEntity().getName()))
continue;
lastIndexTimeProps.setProperty(epw.getEntity().getName() + "." + LAST_INDEX_KEY,
DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
currentEntityProcessorWrapper = epw;
String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
@ -295,7 +293,7 @@ public class DocBuilder {
}
@SuppressWarnings("unchecked")
private void finish(Properties lastIndexTimeProps) {
private void finish(Map<String,Object> lastIndexTimeProps) {
LOG.info("Import completed successfully");
statusMessages.put("", "Indexing completed. Added/Updated: "
+ importStatistics.docCount + " documents. Deleted "

View File

@ -1,4 +1,5 @@
package org.apache.solr.handler.dataimport;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -16,108 +17,213 @@ package org.apache.solr.handler.dataimport;
* limitations under the License.
*/
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimplePropertiesWriter implements DIHPropertiesWriter {
private static final Logger log = LoggerFactory.getLogger(SimplePropertiesWriter.class);
static final String IMPORTER_PROPERTIES = "dataimport.properties";
/**
* <p>
* Writes properties using {@link Properties#store} .
* The special property "last_index_time" is converted to a formatted date.
* Users can configure the location, filename, locale and date format to use.
* </p>
*/
public class SimplePropertiesWriter extends DIHProperties {
private static final Logger log = LoggerFactory
.getLogger(SimplePropertiesWriter.class);
static final String LAST_INDEX_KEY = "last_index_time";
private String persistFilename = IMPORTER_PROPERTIES;
private String configDir = null;
public void init(DataImporter dataImporter) {
SolrCore core = dataImporter.getCore();
String configDir = core ==null ? ".": core.getResourceLoader().getConfigDir();
String persistFileName = dataImporter.getHandlerName();
this.configDir = configDir;
if(persistFileName != null){
persistFilename = persistFileName + ".properties";
}
protected String filename = null;
protected String configDir = null;
protected Locale locale = null;
protected SimpleDateFormat dateFormat = null;
/**
* The locale to use when writing the properties file. Default is {@link Locale#ROOT}
*/
public static final String LOCALE = "locale";
/**
* The date format to use when writing values for "last_index_time" to the properties file.
* See {@link SimpleDateFormat} for patterns. Default is yyyy-MM-dd HH:mm:ss .
*/
public static final String DATE_FORMAT = "dateFormat";
/**
* The directory to save the properties file in. Default is the current core's "config" directory.
*/
public static final String DIRECTORY = "directory";
/**
* The filename to save the properties file to. Default is this Handler's name from solrconfig.xml.
*/
public static final String FILENAME = "filename";
@Override
public void init(DataImporter dataImporter, Map<String, String> params) {
if(params.get(FILENAME) != null) {
filename = params.get(FILENAME);
} else if(dataImporter.getHandlerName()!=null) {
filename = dataImporter.getHandlerName() + ".properties";
} else {
filename = "dataimport.properties";
}
if(params.get(DIRECTORY) != null) {
configDir = params.get(DIRECTORY);
} else {
SolrCore core = dataImporter.getCore();
configDir = (core == null ? "." : core.getResourceLoader().getConfigDir());
}
if(params.get(LOCALE) != null) {
String localeStr = params.get(LOCALE);
for (Locale l : Locale.getAvailableLocales()) {
if(localeStr.equals(l.getDisplayName())) {
locale = l;
break;
}
}
if(locale==null) {
throw new DataImportHandlerException(SEVERE, "Unsupported locale for PropertWriter: " + localeStr);
}
} else {
locale = Locale.ROOT;
}
if(params.get(DATE_FORMAT) != null) {
dateFormat = new SimpleDateFormat(params.get(DATE_FORMAT), locale);
} else {
dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", locale);
}
}
private File getPersistFile() {
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator))
filePath += File.separator;
filePath += persistFilename;
if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator;
filePath += filename;
return new File(filePath);
}
public boolean isWritable() {
File persistFile = getPersistFile();
return persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
}
@Override
public void persist(Properties p) {
OutputStream propOutput = null;
Properties props = readIndexerProperties();
@Override
public boolean isWritable() {
File persistFile = getPersistFile();
return persistFile.exists() ? persistFile.canWrite() : persistFile
.getParentFile().canWrite();
}
protected String convertDateToString(Date d) {
return dateFormat.format(d);
}
protected Date convertStringToDate(String s) {
try {
props.putAll(p);
return dateFormat.parse(s);
} catch (ParseException e) {
throw new DataImportHandlerException(SEVERE, "Value for "
+ LAST_INDEX_KEY + " is invalid for date format "
+ dateFormat.toLocalizedPattern() + " : " + s);
}
}
/**
* {@link DocBuilder} sends the date as an Object because
* this class knows how to convert it to a String
*/
protected Properties mapToProperties(Map<String,Object> propObjs) {
Properties p = new Properties();
for(Map.Entry<String,Object> entry : propObjs.entrySet()) {
String key = entry.getKey();
String val = null;
String lastKeyPart = key;
int lastDotPos = key.lastIndexOf('.');
if(lastDotPos!=-1 && key.length() > lastDotPos+1) {
lastKeyPart = key.substring(lastDotPos + 1);
}
if(LAST_INDEX_KEY.equals(lastKeyPart) && entry.getValue() instanceof Date) {
val = convertDateToString((Date) entry.getValue());
} else {
val = entry.getValue().toString();
}
p.put(key, val);
}
return p;
}
/**
* We'll send everything back as Strings as this class has
* already converted them.
*/
protected Map<String,Object> propertiesToMap(Properties p) {
Map<String,Object> theMap = new HashMap<String,Object>();
for(Map.Entry<Object,Object> entry : p.entrySet()) {
String key = entry.getKey().toString();
Object val = entry.getValue().toString();
theMap.put(key, val);
}
return theMap;
}
@Override
public void persist(Map<String, Object> propObjs) {
OutputStream propOutput = null;
Properties existingProps = mapToProperties(readIndexerProperties());
Properties newProps = mapToProperties(propObjs);
try {
existingProps.putAll(newProps);
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator))
if (configDir != null && !configDir.endsWith(File.separator)) {
filePath += File.separator;
filePath += persistFilename;
}
filePath += filename;
propOutput = new FileOutputStream(filePath);
props.store(propOutput, null);
log.info("Wrote last indexed time to " + persistFilename);
existingProps.store(propOutput, null);
log.info("Wrote last indexed time to " + filename);
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to persist Index Start Time", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to persist Index Start Time", e);
} finally {
try {
if (propOutput != null)
propOutput.close();
if (propOutput != null) propOutput.close();
} catch (IOException e) {
propOutput = null;
}
}
}
@Override
public Properties readIndexerProperties() {
public Map<String, Object> readIndexerProperties() {
Properties props = new Properties();
InputStream propInput = null;
InputStream propInput = null;
try {
propInput = new FileInputStream(configDir + persistFilename);
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator)) {
filePath += File.separator;
}
filePath += filename;
propInput = new FileInputStream(filePath);
props.load(propInput);
log.info("Read " + persistFilename);
log.info("Read " + filename);
} catch (Exception e) {
log.warn("Unable to read: " + persistFilename);
log.warn("Unable to read: " + filename);
} finally {
try {
if (propInput != null)
propInput.close();
if (propInput != null) propInput.close();
} catch (IOException e) {
propInput = null;
}
}
return props;
}
return propertiesToMap(props);
}
}

View File

@ -79,7 +79,7 @@ public class TemplateString {
String[] s = new String[variables.size()];
for (int i = 0; i < variables.size(); i++) {
Object val = resolver.resolve(variables.get(i));
s[i] = val == null ? "" : getObjectAsString(val);
s[i] = val == null ? "" : val.toString();
}
StringBuilder sb = new StringBuilder();
@ -93,14 +93,6 @@ public class TemplateString {
return sb.toString();
}
private String getObjectAsString(Object val) {
if (val instanceof Date) {
Date d = (Date) val;
return DataImporter.DATE_TIME_FORMAT.get().format(d);
}
return val.toString();
}
/**
* Returns the variables in the given string.
*

View File

@ -18,6 +18,7 @@ package org.apache.solr.handler.dataimport;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Map;
import java.util.Properties;
import org.apache.solr.common.cloud.SolrZkClient;
@ -25,7 +26,13 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKPropertiesWriter implements DIHPropertiesWriter {
/**
* <p>
* A SolrCloud-friendly extension of {@link SimplePropertiesWriter}.
* This implementation ignores the "directory" parameter, saving
* the properties file under /configs/[solrcloud collection name]/
*/
public class ZKPropertiesWriter extends SimplePropertiesWriter {
private static final Logger log = LoggerFactory
.getLogger(ZKPropertiesWriter.class);
@ -34,16 +41,11 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter {
private SolrZkClient zkClient;
@Override
public void init(DataImporter dataImporter) {
public void init(DataImporter dataImporter, Map<String, String> params) {
super.init(dataImporter, params);
String collection = dataImporter.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName();
String persistFilename;
if(dataImporter.getHandlerName() != null){
persistFilename = dataImporter.getHandlerName() + ".properties";
} else {
persistFilename = SimplePropertiesWriter.IMPORTER_PROPERTIES;
}
path = "/configs/" + collection + "/" + persistFilename;
path = "/configs/" + collection + "/" + filename;
zkClient = dataImporter.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getZkClient();
}
@ -54,9 +56,9 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter {
}
@Override
public void persist(Properties props) {
Properties existing = readIndexerProperties();
existing.putAll(props);
public void persist(Map<String, Object> propObjs) {
Properties existing = mapToProperties(readIndexerProperties());
existing.putAll(mapToProperties(propObjs));
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
existing.store(output, "");
@ -78,7 +80,7 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter {
}
@Override
public Properties readIndexerProperties() {
public Map<String, Object> readIndexerProperties() {
Properties props = new Properties();
try {
byte[] data = zkClient.getData(path, null, null, false);
@ -90,6 +92,6 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter {
log.warn(
"Could not read DIH properties from " + path + " :" + e.getClass(), e);
}
return props;
return propertiesToMap(props);
}
}

View File

@ -28,6 +28,8 @@ public class ConfigNameConstants {
public static final String NAME = "name";
public static final String PROCESSOR = "processor";
public static final String PROPERTY_WRITER = "propertyWriter";
/**
* @deprecated use IMPORTER_NS_SHORT instead

View File

@ -43,13 +43,18 @@ import org.w3c.dom.Element;
public class DIHConfiguration {
// TODO - remove from here and add it to entity
private final String deleteQuery;
private final List<Entity> entities;
private final String onImportStart;
private final String onImportEnd;
private final List<Map<String, String>> functions;
private final Script script;
private final Map<String, Map<String,String>> dataSources;
public DIHConfiguration(Element element, DataImporter di, List<Map<String, String>> functions, Script script, Map<String, Map<String,String>> dataSources) {
private final PropertyWriter propertyWriter;
public DIHConfiguration(Element element, DataImporter di,
List<Map<String,String>> functions, Script script,
Map<String,Map<String,String>> dataSources, PropertyWriter pw) {
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
@ -73,6 +78,7 @@ public class DIHConfiguration {
this.functions = Collections.unmodifiableList(modFunc);
this.script = script;
this.dataSources = Collections.unmodifiableMap(dataSources);
this.propertyWriter = pw;
}
public String getDeleteQuery() {
return deleteQuery;
@ -95,4 +101,7 @@ public class DIHConfiguration {
public Script getScript() {
return script;
}
public PropertyWriter getPropertyWriter() {
return propertyWriter;
}
}

View File

@ -1,4 +1,8 @@
package org.apache.solr.handler.dataimport;
package org.apache.solr.handler.dataimport.config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@ -17,16 +21,20 @@ package org.apache.solr.handler.dataimport;
* limitations under the License.
*/
import java.util.Properties;
public class PropertyWriter {
private final String type;
private final Map<String,String> parameters;
public PropertyWriter(String type, Map<String,String> parameters) {
this.type = type;
this.parameters = Collections.unmodifiableMap(new HashMap<String,String>(parameters));
}
public interface DIHPropertiesWriter {
public void init(DataImporter dataImporter);
public boolean isWritable();
public void persist(Properties props);
public Properties readIndexerProperties();
public Map<String,String> getParameters() {
return parameters;
}
public String getType() {
return type;
}
}

View File

@ -1,4 +1,5 @@
package org.apache.solr.handler.dataimport;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
@ -19,15 +20,8 @@ package org.apache.solr.handler.dataimport;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -38,25 +32,18 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* This sets up an in-memory Sql database with a little sample data.
*/
public abstract class AbstractDIHJdbcTestCase extends AbstractDataImportHandlerTestCase {
protected boolean underlyingDataModified;
public abstract class AbstractDIHJdbcTestCase extends
AbstractDataImportHandlerTestCase {
protected boolean useSimpleCaches;
protected boolean countryEntity;
protected boolean countryCached;
protected boolean sportsEntity;
protected boolean sportsCached;
protected String rootTransformerName;
protected boolean countryTransformer;
protected boolean sportsTransformer;
protected Database dbToUse;
protected Database db = Database.RANDOM;
private Database dbToUse;
public enum Database { RANDOM , DERBY , HSQLDB }
public enum Database {
RANDOM, DERBY, HSQLDB
}
private static final Pattern totalRequestsPattern = Pattern
.compile(".str name..Total Requests made to DataSource..(\\d+)..str.");
@ -68,685 +55,132 @@ public abstract class AbstractDIHJdbcTestCase extends AbstractDataImportHandlerT
String oldProp = System.getProperty("derby.stream.error.field");
System.setProperty("derby.stream.error.field", "DerbyUtil.DEV_NULL");
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
if(oldProp!=null) {
System.setProperty("derby.stream.error.field", oldProp);
if (oldProp != null) {
System.setProperty("derby.stream.error.field", oldProp);
}
} catch (Exception e) {
throw e;
}
}
initCore("dataimport-solrconfig.xml", "dataimport-schema.xml");
}
}
@AfterClass
public static void afterClassDihJdbcTest() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch(SQLException e) {
//ignore...we might not even be using derby this time...
}
} catch (SQLException e) {
// ignore...we might not even be using derby this time...
}
}
protected Database setAllowedDatabases() {
return Database.RANDOM;
}
@Before
public void beforeDihJdbcTest() throws Exception {
useSimpleCaches = false;
countryEntity = false;
countryCached = false;
sportsEntity = false;
sportsCached = false;
rootTransformerName = null;
countryTransformer = false;
sportsTransformer = false;
dbToUse = db;
if(db==Database.RANDOM) {
if(random().nextBoolean()) {
public void beforeDihJdbcTest() throws Exception {
dbToUse = setAllowedDatabases();
if (dbToUse == Database.RANDOM) {
if (random().nextBoolean()) {
dbToUse = Database.DERBY;
} else {
dbToUse = Database.HSQLDB;
}
}
}
clearIndex();
assertU(commit());
assertU(commit());
buildDatabase();
}
}
@After
public void afterDihJdbcTest() throws Exception {
Connection conn = null;
Statement s = null;
try {
if(dbToUse==Database.DERBY) {
try {
if (dbToUse == Database.DERBY) {
try {
conn = DriverManager.getConnection("jdbc:derby:memory:derbyDB;drop=true");
} catch(SQLException e) {
if(!"08006".equals(e.getSQLState())) {
conn = DriverManager
.getConnection("jdbc:derby:memory:derbyDB;drop=true");
} catch (SQLException e) {
if (!"08006".equals(e.getSQLState())) {
throw e;
}
}
} else if(dbToUse==Database.HSQLDB) {
conn = DriverManager.getConnection("jdbc:hsqldb:mem:.");
}
} else if (dbToUse == Database.HSQLDB) {
conn = DriverManager.getConnection("jdbc:hsqldb:mem:.");
s = conn.createStatement();
s.executeUpdate("shutdown");
}
} catch (SQLException e) {
throw e;
} finally {
try { s.close(); } catch(Exception ex) { }
try { conn.close(); } catch(Exception ex) { }
try {
s.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
}
private Connection newConnection() throws Exception {
if(dbToUse==Database.DERBY) {
return DriverManager.getConnection("jdbc:derby:memory:derbyDB;");
} else if(dbToUse==Database.HSQLDB) {
return DriverManager.getConnection("jdbc:hsqldb:mem:.");
}
throw new AssertionError("Invalid database to use: " + dbToUse);
}
protected void singleEntity(int numToExpect) throws Exception {
h.query("/dataimport", generateRequest());
assertQ("There should be 1 document per person in the database: "
+ totalPeople(), req("*:*"), "//*[@numFound='" + totalPeople() + "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
protected Connection newConnection() throws Exception {
if (dbToUse == Database.DERBY) {
return DriverManager.getConnection("jdbc:derby:memory:derbyDB;");
} else if (dbToUse == Database.HSQLDB) {
return DriverManager.getConnection("jdbc:hsqldb:mem:.");
}
throw new AssertionError("Invalid database to use: " + dbToUse);
}
protected void simpleTransform(int numToExpect) throws Exception {
rootTransformerName = "AddAColumnTransformer";
h.query("/dataimport", generateRequest());
assertQ("There should be 1 document with a transformer-added column per person is the database: "
+ totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + totalPeople() + "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
}
/**
* A delta update will not clean up documents added by a transformer
* even if the parent document that the transformer used to base the new documents
* were deleted
*/
protected void complexTransform(int numToExpect, int numDeleted) throws Exception {
rootTransformerName = "TripleThreatTransformer";
h.query("/dataimport", generateRequest());
int totalDocs = ((totalPeople() * 3) + (numDeleted * 2));
int totalAddedDocs = (totalPeople() + numDeleted);
assertQ(req("q", "*:*", "rows", "" + (totalPeople() * 3), "sort", "id asc"), "//*[@numFound='" + totalDocs + "']");
assertQ(req("id:TripleThreat-1-*"), "//*[@numFound='" + totalAddedDocs + "']");
assertQ(req("id:TripleThreat-2-*"), "//*[@numFound='" + totalAddedDocs + "']");
if(personNameExists("Michael") && countryCodeExists("NR"))
{
assertQ(
"Michael and NR are assured to be in the database. Therefore the transformer should have added leahciM and RN on the same document as id:TripleThreat-1-3",
req("+id:TripleThreat-1-3 +NAME_mult_s:Michael +NAME_mult_s:leahciM +COUNTRY_CODES_mult_s:NR +COUNTRY_CODES_mult_s:RN"),
"//*[@numFound='1']");
}
assertQ(req("AddAColumn_s:Added"), "//*[@numFound='" + totalAddedDocs + "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
}
protected void withChildEntities(boolean cached, boolean checkDatabaseRequests) throws Exception {
rootTransformerName = random().nextBoolean() ? null : "AddAColumnTransformer";
int numChildren = random().nextInt(1) + 1;
int numDatabaseRequests = 1;
if(underlyingDataModified) {
if (countryEntity) {
if (cached) {
numDatabaseRequests++;
} else {
numDatabaseRequests += totalPeople();
}
}
if (sportsEntity) {
if (cached) {
numDatabaseRequests++;
} else {
numDatabaseRequests += totalPeople();
}
}
} else {
countryEntity = true;
sportsEntity = true;
if(numChildren==1) {
countryEntity = random().nextBoolean();
sportsEntity = !countryEntity;
}
if(countryEntity) {
countryTransformer = random().nextBoolean();
if(cached) {
numDatabaseRequests++;
countryCached = true;
} else {
numDatabaseRequests += totalPeople();
}
}
if(sportsEntity) {
sportsTransformer = random().nextBoolean();
if(cached) {
numDatabaseRequests++;
sportsCached = true;
} else {
numDatabaseRequests += totalPeople();
}
}
}
h.query("/dataimport", generateRequest());
assertQ("There should be 1 document per person in the database: "
+ totalPeople(), req("*:*"), "//*[@numFound='" + (totalPeople()) + "']");
if(!underlyingDataModified && "AddAColumnTransformer".equals(rootTransformerName)) {
assertQ("There should be 1 document with a transformer-added column per person is the database: "
+ totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + (totalPeople()) + "']");
}
if(countryEntity) {
if(personNameExists("Jayden"))
{
String nrName = countryNameByCode("NP");
if(nrName!=null && nrName.length()>0) {
assertQ(req("NAME_mult_s:Jayden"),
"//*[@numFound='1']", "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'");
}
}
String nrName = countryNameByCode("NR");
int num = numberPeopleByCountryCode("NR");
if(nrName!=null && num>0) {
assertQ(req("COUNTRY_CODES_mult_s:NR"),
"//*[@numFound='" + num + "']", "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'");
}
if(countryTransformer && !underlyingDataModified) {
assertQ(req("countryAdded_s:country_added"), "//*[@numFound='" + totalPeople() + "']");
}
}
if(sportsEntity) {
if(!underlyingDataModified) {
assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']");
}
String michaelsName = personNameById(3);
String[] michaelsSports = sportNamesByPersonId(3);
if(michaelsName != null && michaelsSports.length>0) {
String[] xpath = new String[michaelsSports.length + 1];
xpath[0] = "//*[@numFound='1']";
int i=1;
for(String ms : michaelsSports) {
xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms + "'";
i++;
}
assertQ(req("NAME_mult_s:" + michaelsName.replaceAll("\\W", "\\\\$0")), xpath);
}
if (!underlyingDataModified && sportsTransformer) {
assertQ(req("sportsAdded_s:sport_added"), "//*[@numFound='" + (totalPeople()) + "']");
}
}
if(checkDatabaseRequests) {
Assert.assertTrue("Expecting " + numDatabaseRequests
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numDatabaseRequests);
}
}
protected void simpleCacheChildEntities(boolean checkDatabaseRequests) throws Exception {
useSimpleCaches = true;
countryEntity = true;
sportsEntity = true;
countryCached=true;
sportsCached=true;
int dbRequestsMoreThan = 3;
int dbRequestsLessThan = totalPeople() * 2 + 1;
h.query("/dataimport", generateRequest());
assertQ(req("*:*"), "//*[@numFound='" + (totalPeople()) + "']");
if(!underlyingDataModified || (personNameExists("Samantha") && "Nauru".equals(countryNameByCode("NR"))))
{
assertQ(req("NAME_mult_s:Samantha"),
"//*[@numFound='1']", "//doc/str[@name='COUNTRY_NAME_s']='Nauru'");
}
if(!underlyingDataModified)
{
assertQ(req("COUNTRY_CODES_mult_s:NR"),
"//*[@numFound='2']", "//doc/str[@name='COUNTRY_NAME_s']='Nauru'");
assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']");
}
String[] michaelsSports = sportNamesByPersonId(3);
if(!underlyingDataModified || michaelsSports.length>0) {
String[] xpath = new String[michaelsSports.length + 1];
xpath[0] = "//*[@numFound='1']";
int i=1;
for(String ms : michaelsSports) {
xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms + "'";
i++;
}
assertQ(req("NAME_mult_s:Michael"), xpath);
}
if(checkDatabaseRequests) {
Assert.assertTrue("Expecting more than " + dbRequestsMoreThan
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() > dbRequestsMoreThan);
Assert.assertTrue("Expecting fewer than " + dbRequestsLessThan
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() < dbRequestsLessThan);
}
}
public void buildDatabase() throws Exception
{
underlyingDataModified = false;
protected void buildDatabase() throws Exception {
Connection conn = null;
Statement s = null;
PreparedStatement ps = null;
Timestamp theTime = new Timestamp(System.currentTimeMillis() - 10000); //10 seconds ago
try {
if(dbToUse==Database.DERBY) {
conn = DriverManager.getConnection("jdbc:derby:memory:derbyDB;create=true");
} else if(dbToUse==Database.HSQLDB) {
conn = DriverManager.getConnection("jdbc:hsqldb:mem:.");
try {
if (dbToUse == Database.DERBY) {
conn = DriverManager
.getConnection("jdbc:derby:memory:derbyDB;create=true");
} else if (dbToUse == Database.HSQLDB) {
conn = DriverManager.getConnection("jdbc:hsqldb:mem:.");
} else {
throw new AssertionError("Invalid database to use: " + dbToUse);
}
s = conn.createStatement();
s.executeUpdate("create table countries(code varchar(3) not null primary key, country_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)");
s.executeUpdate("create table people(id int not null primary key, name varchar(50), country_code char(2), deleted char(1) default 'N', last_modified timestamp not null)");
s.executeUpdate("create table people_sports(id int not null primary key, person_id int, sport_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)");
populateData(conn);
} catch (Exception e) {
throw e;
} finally {
ps = conn.prepareStatement("insert into countries (code, country_name, last_modified) values (?,?,?)");
for(String[] country : countries) {
ps.setString(1, country[0]);
ps.setString(2, country[1]);
ps.setTimestamp(3, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
ps = conn.prepareStatement("insert into people (id, name, country_code, last_modified) values (?,?,?,?)");
for(Object[] person : people) {
ps.setInt(1, (Integer) person[0]);
ps.setString(2, (String) person[1]);
ps.setString(3, (String) person[2]);
ps.setTimestamp(4, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
ps = conn.prepareStatement("insert into people_sports (id, person_id, sport_name, last_modified) values (?,?,?,?)");
for(Object[] sport : people_sports) {
ps.setInt(1, (Integer) sport[0]);
ps.setInt(2, (Integer) sport[1]);
ps.setString(3, (String) sport[2]);
ps.setTimestamp(4, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
conn.commit();
conn.close();
} catch(Exception e) {
throw e;
} finally {
try { ps.close(); } catch(Exception ex) { }
try { s.close(); } catch(Exception ex) { }
try { conn.close(); } catch(Exception ex) { }
}
}
protected abstract String deltaQueriesCountryTable() ;
protected abstract String deltaQueriesPersonTable() ;
private int getIntFromQuery(String query) throws Exception {
Connection conn = null;
Statement s = null;
ResultSet rs = null;
try {
conn = newConnection();
s = conn.createStatement();
rs = s.executeQuery(query);
if(rs.next()) {
return rs.getInt(1);
}
return 0;
} catch (SQLException e) {
throw e;
} finally {
try { rs.close(); } catch(Exception ex) { }
try { s.close(); } catch(Exception ex) { }
try { conn.close(); } catch(Exception ex) { }
}
}
private String[] getStringsFromQuery(String query) throws Exception {
Connection conn = null;
Statement s = null;
ResultSet rs = null;
try {
conn = newConnection();
s = conn.createStatement();
rs = s.executeQuery(query);
List<String> results = new ArrayList<String>();
while(rs.next()) {
results.add(rs.getString(1));
}
return results.toArray(new String[results.size()]);
} catch (SQLException e) {
throw e;
} finally {
try { rs.close(); } catch(Exception ex) { }
try { s.close(); } catch(Exception ex) { }
try { conn.close(); } catch(Exception ex) { }
}
}
public int totalCountries() throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM COUNTRIES WHERE DELETED != 'Y' ");
}
public int totalPeople() throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' ");
protected void populateData(Connection conn) throws Exception {
// no-op
}
public boolean countryCodeExists(String cc) throws Exception {
return getIntFromQuery("SELECT COUNT(1) country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + cc + "'")>0;
}
public String countryNameByCode(String cc) throws Exception {
String[] s = getStringsFromQuery("SELECT country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + cc + "'");
return s.length==0 ? null : s[0];
}
public int numberPeopleByCountryCode(String cc) throws Exception {
return getIntFromQuery(
"Select count(1) " +
"from people p " +
"inner join countries c on p.country_code=c.code " +
"where p.deleted!='Y' and c.deleted!='Y' and c.code='" + cc + "'");
}
public String[] sportNamesByPersonId(int personId) throws Exception {
return getStringsFromQuery(
"SELECT ps.SPORT_NAME " +
"FROM people_sports ps " +
"INNER JOIN PEOPLE p ON p.id = ps.person_Id " +
"WHERE ps.DELETED != 'Y' AND p.DELETED != 'Y' " +
"AND ps.person_id=" + personId + " " +
"ORDER BY ps.id"
);
}
public boolean personNameExists(String pn) throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' AND NAME='" + pn + "'")>0;
}
public String personNameById(int id) throws Exception {
String[] nameArr= getStringsFromQuery("SELECT NAME FROM PEOPLE WHERE ID=" + id);
if(nameArr.length==0) {
return null;
}
return nameArr[0];
}
public int totalDatabaseRequests(String dihHandlerName) throws Exception {
LocalSolrQueryRequest request = lrf.makeRequest("indent", "true");
String response = h.query(dihHandlerName, request);
Matcher m = totalRequestsPattern.matcher(response);
Assert.assertTrue("The handler " + dihHandlerName
+ " is not reporting any database requests. ", m.find()
&& m.groupCount() == 1);
+ " is not reporting any database requests. ",
m.find() && m.groupCount() == 1);
return Integer.parseInt(m.group(1));
}
public int totalDatabaseRequests() throws Exception {
return totalDatabaseRequests("/dataimport");
}
public IntChanges modifySomePeople() throws Exception {
underlyingDataModified = true;
int numberToChange = random().nextInt(people.length + 1);
Set<Integer> changeSet = new HashSet<Integer>();
Set<Integer> deleteSet = new HashSet<Integer>();
Set<Integer> addSet = new HashSet<Integer>();
Connection conn = null;
PreparedStatement change = null;
PreparedStatement delete = null;
PreparedStatement add = null;
//One second in the future ensures a change time after the last import (DIH uses second precision only)
Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000);
try {
conn = newConnection();
change = conn.prepareStatement("update people set name=?, last_modified=? where id=?");
delete = conn.prepareStatement("update people set deleted='Y', last_modified=? where id=?");
add = conn.prepareStatement("insert into people (id,name,country_code,last_modified) values (?,?,'ZZ',?)");
for(int i=0 ; i<numberToChange ; i++) {
int tryIndex = random().nextInt(people.length);
Integer id = (Integer) people[tryIndex][0];
if(!changeSet.contains(id) && !deleteSet.contains(id)) {
boolean changeDontDelete = random().nextBoolean();
if(changeDontDelete) {
changeSet.add(id);
change.setString(1, "MODIFIED " + people[tryIndex][1]);
change.setTimestamp(2, theTime);
change.setInt(3, id);
Assert.assertEquals(1, change.executeUpdate());
} else {
deleteSet.add(id);
delete.setTimestamp(1, theTime);
delete.setInt(2, id);
Assert.assertEquals(1, delete.executeUpdate());
}
}
}
int numberToAdd = random().nextInt(3);
for(int i=0 ; i<numberToAdd ; i++) {
int tryIndex = random().nextInt(people.length);
Integer id = (Integer) people[tryIndex][0];
Integer newId = id+1000;
String newDesc = "ADDED " + people[tryIndex][1];
if(!addSet.contains(newId)) {
addSet.add(newId);
add.setInt(1, newId);
add.setString(2, newDesc);
add.setTimestamp(3, theTime);
Assert.assertEquals(1, add.executeUpdate());
}
}
conn.commit();
} catch (SQLException e) {
throw e;
} finally {
try { change.close(); } catch(Exception ex) { }
try { conn.close(); } catch(Exception ex) { }
}
IntChanges c = new IntChanges();
c.changedKeys=changeSet.toArray(new Integer[changeSet.size()]);
c.deletedKeys=deleteSet.toArray(new Integer[deleteSet.size()]);
c.addedKeys=addSet.toArray(new Integer[addSet.size()]);
return c;
}
public String[] modifySomeCountries() throws Exception {
underlyingDataModified = true;
int numberToChange = random().nextInt(countries.length + 1);
Set<String> changeSet = new HashSet<String>();
Connection conn = null;
PreparedStatement change = null;
// One second in the future ensures a change time after the last import (DIH
// uses second precision only)
Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000);
try {
conn = newConnection();
change = conn
.prepareStatement("update countries set country_name=?, last_modified=? where code=?");
for (int i = 0; i < numberToChange; i++) {
int tryIndex = random().nextInt(countries.length);
String code = countries[tryIndex][0];
if (!changeSet.contains(code)) {
changeSet.add(code);
change.setString(1, "MODIFIED " + countries[tryIndex][1]);
change.setTimestamp(2, theTime);
change.setString(3, code);
Assert.assertEquals(1, change.executeUpdate());
}
}
} catch (SQLException e) {
throw e;
} finally {
try {
change.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
return changeSet.toArray(new String[changeSet.size()]);
}
class IntChanges {
public Integer[] changedKeys;
public Integer[] deletedKeys;
public Integer[] addedKeys;
}
protected LocalSolrQueryRequest generateRequest() {
return lrf.makeRequest("command", "full-import", "dataConfig", generateConfig(),
"clean", "true", "commit", "true", "synchronous", "true", "indent", "true");
return lrf.makeRequest("command", "full-import", "dataConfig",
generateConfig(), "clean", "true", "commit", "true", "synchronous",
"true", "indent", "true");
}
protected String generateConfig() {
String ds = null;
if (dbToUse == Database.DERBY) {
ds = "derby";
} else if (dbToUse == Database.HSQLDB) {
ds = "hsqldb";
} else {
throw new AssertionError("Invalid database to use: " + dbToUse);
}
StringBuilder sb = new StringBuilder();
sb.append("<dataConfig> \n");
sb.append("<dataSource name=''hsqldb'' driver=''org.hsqldb.jdbcDriver'' url=''jdbc:hsqldb:mem:.'' /> \n");
sb.append("<dataSource name=''derby'' driver=''org.apache.derby.jdbc.EmbeddedDriver'' url=''jdbc:derby:memory:derbyDB;'' /> \n");
sb.append("<document name=''TestSqlEntityProcessor''> \n");
sb.append("<entity name=''People'' ");
sb.append("pk=''" + (random().nextBoolean() ? "ID" : "People.ID") + "'' ");
sb.append("processor=''SqlEntityProcessor'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(rootTransformerName != null ? "transformer=''" + rootTransformerName + "'' " : "");
sb.append("query=''SELECT ID, NAME, COUNTRY_CODE FROM PEOPLE WHERE DELETED != 'Y' '' ");
sb.append(deltaQueriesPersonTable());
sb.append("> \n");
sb.append("<field column=''NAME'' name=''NAME_mult_s'' /> \n");
sb.append("<field column=''COUNTRY_CODE'' name=''COUNTRY_CODES_mult_s'' /> \n");
if (countryEntity) {
sb.append("<entity name=''Countries'' ");
sb.append("pk=''" + (random().nextBoolean() ? "CODE" : "Countries.CODE")
+ "'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(countryTransformer ? "transformer=''AddAColumnTransformer'' "
+ "newColumnName=''countryAdded_s'' newColumnValue=''country_added'' "
: "");
if (countryCached) {
sb.append(random().nextBoolean() ? "processor=''SqlEntityProcessor'' cacheImpl=''SortedMapBackedCache'' "
: "processor=''CachedSqlEntityProcessor'' ");
if (useSimpleCaches) {
sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' ''>\n");
} else {
sb.append(random().nextBoolean() ? "cacheKey=''CODE'' cacheLookup=''People.COUNTRY_CODE'' "
: "where=''CODE=People.COUNTRY_CODE'' ");
sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES'' ");
sb.append("> \n");
}
} else {
sb.append("processor=''SqlEntityProcessor'' query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' '' ");
sb.append(deltaQueriesCountryTable());
sb.append("> \n");
}
sb.append("<field column=''CODE'' name=''COUNTRY_CODE_s'' /> \n");
sb.append("<field column=''COUNTRY_NAME'' name=''COUNTRY_NAME_s'' /> \n");
sb.append("</entity> \n");
}
if (sportsEntity) {
sb.append("<entity name=''Sports'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(sportsTransformer ? "transformer=''AddAColumnTransformer'' "
+ "newColumnName=''sportsAdded_s'' newColumnValue=''sport_added'' "
: "");
if (sportsCached) {
sb.append(random().nextBoolean() ? "processor=''SqlEntityProcessor'' cacheImpl=''SortedMapBackedCache'' "
: "processor=''CachedSqlEntityProcessor'' ");
if (useSimpleCaches) {
sb.append("query=''SELECT ID, SPORT_NAME FROM PEOPLE_SPORTS WHERE DELETED != 'Y' AND PERSON_ID=${People.ID} ORDER BY ID'' ");
} else {
sb.append(random().nextBoolean() ? "cacheKey=''PERSON_ID'' cacheLookup=''People.ID'' "
: "where=''PERSON_ID=People.ID'' ");
sb.append("query=''SELECT ID, PERSON_ID, SPORT_NAME FROM PEOPLE_SPORTS ORDER BY ID'' ");
}
} else {
sb.append("processor=''SqlEntityProcessor'' query=''SELECT ID, SPORT_NAME FROM PEOPLE_SPORTS WHERE DELETED != 'Y' AND PERSON_ID=${People.ID} ORDER BY ID'' ");
}
sb.append("> \n");
sb.append("<field column=''SPORT_NAME'' name=''SPORT_NAME_mult_s'' /> \n");
sb.append("<field column=''id'' name=''SPORT_ID_mult_s'' /> \n");
sb.append("</entity> \n");
}
sb.append("</entity> \n");
sb.append("</document> \n");
sb.append("</dataConfig> \n");
String config = sb.toString().replaceAll("[']{2}", "\"");
log.debug(config);
return config;
}
protected abstract String generateConfig();
public static final String[][] countries = {
{"NA", "Namibia"},
{"NC", "New Caledonia"},
{"NE", "Niger"},
{"NF", "Norfolk Island"},
{"NG", "Nigeria"},
{"NI", "Nicaragua"},
{"NL", "Netherlands"},
{"NO", "Norway"},
{"NP", "Nepal"},
{"NR", "Nauru"},
{"NU", "Niue"},
{"NZ", "New Zealand"}
};
public static final Object[][] people = {
{1,"Jacob","NZ"},
{2,"Ethan","NU"},
{3,"Michael","NR"},
{4,"Jayden","NP"},
{5,"William","NO"},
{6,"Alexander","NL"},
{7,"Noah","NI"},
{8,"Daniel","NG"},
{9,"Aiden","NF"},
{10,"Anthony","NE"},
{11,"Emma","NL"},
{12,"Grace","NI"},
{13,"Hailey","NG"},
{14,"Isabella","NF"},
{15,"Lily","NE"},
{16,"Madison","NC"},
{17,"Mia","NA"},
{18,"Natalie","NZ"},
{19,"Olivia","NU"},
{20,"Samantha","NR"}
};
public static final Object[][] people_sports = {
{100, 1, "Swimming"},
{200, 2, "Triathlon"},
{300, 3, "Water polo"},
{310, 3, "Underwater rugby"},
{320, 3, "Kayaking"},
{400, 4, "Snorkeling"},
{500, 5, "Synchronized diving"},
{600, 6, "Underwater rugby"},
{700, 7, "Boating"},
{800, 8, "Bodyboarding"},
{900, 9, "Canoeing"},
{1000, 10, "Fishing"},
{1100, 11, "Jet Ski"},
{1110, 11, "Rowing"},
{1120, 11, "Sailing"},
{1200, 12, "Kayaking"},
{1210, 12, "Canoeing"},
{1300, 13, "Kite surfing"},
{1400, 14, "Parasailing"},
{1500, 15, "Rafting"},
{1600, 16, "Rowing"},
{1700, 17, "Sailing"},
{1800, 18, "White Water Rafting"},
{1900, 19, "Water skiing"},
{2000, 20, "Windsurfing"}
};
public static class DerbyUtil {
public static final OutputStream DEV_NULL = new OutputStream() {
public void write(int b) {}
public void write(int b) {}
};
}
}
}

View File

@ -0,0 +1,696 @@
package org.apache.solr.handler.dataimport;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.After;
import junit.framework.Assert;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public abstract class AbstractSqlEntityProcessorTestCase extends
AbstractDIHJdbcTestCase {
protected boolean underlyingDataModified;
protected boolean useSimpleCaches;
protected boolean countryEntity;
protected boolean countryCached;
protected boolean sportsEntity;
protected boolean sportsCached;
protected String rootTransformerName;
protected boolean countryTransformer;
protected boolean sportsTransformer;
@After
public void afterSqlEntitiyProcessorTestCase() {
useSimpleCaches = false;
countryEntity = false;
countryCached = false;
sportsEntity = false;
sportsCached = false;
rootTransformerName = null;
countryTransformer = false;
sportsTransformer = false;
underlyingDataModified = false;
}
protected abstract String deltaQueriesCountryTable();
protected abstract String deltaQueriesPersonTable();
protected void singleEntity(int numToExpect) throws Exception {
h.query("/dataimport", generateRequest());
assertQ("There should be 1 document per person in the database: "
+ totalPeople(), req("*:*"), "//*[@numFound='" + totalPeople() + "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
}
protected void simpleTransform(int numToExpect) throws Exception {
rootTransformerName = "AddAColumnTransformer";
h.query("/dataimport", generateRequest());
assertQ(
"There should be 1 document with a transformer-added column per person is the database: "
+ totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='"
+ totalPeople() + "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
}
/**
* A delta update will not clean up documents added by a transformer even if
* the parent document that the transformer used to base the new documents
* were deleted
*/
protected void complexTransform(int numToExpect, int numDeleted)
throws Exception {
rootTransformerName = "TripleThreatTransformer";
h.query("/dataimport", generateRequest());
int totalDocs = ((totalPeople() * 3) + (numDeleted * 2));
int totalAddedDocs = (totalPeople() + numDeleted);
assertQ(
req("q", "*:*", "rows", "" + (totalPeople() * 3), "sort", "id asc"),
"//*[@numFound='" + totalDocs + "']");
assertQ(req("id:TripleThreat-1-*"), "//*[@numFound='" + totalAddedDocs
+ "']");
assertQ(req("id:TripleThreat-2-*"), "//*[@numFound='" + totalAddedDocs
+ "']");
if (personNameExists("Michael") && countryCodeExists("NR")) {
assertQ(
"Michael and NR are assured to be in the database. Therefore the transformer should have added leahciM and RN on the same document as id:TripleThreat-1-3",
req("+id:TripleThreat-1-3 +NAME_mult_s:Michael +NAME_mult_s:leahciM +COUNTRY_CODES_mult_s:NR +COUNTRY_CODES_mult_s:RN"),
"//*[@numFound='1']");
}
assertQ(req("AddAColumn_s:Added"), "//*[@numFound='" + totalAddedDocs
+ "']");
Assert.assertTrue("Expecting " + numToExpect
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numToExpect);
}
protected void withChildEntities(boolean cached, boolean checkDatabaseRequests)
throws Exception {
rootTransformerName = random().nextBoolean() ? null
: "AddAColumnTransformer";
int numChildren = random().nextInt(1) + 1;
int numDatabaseRequests = 1;
if (underlyingDataModified) {
if (countryEntity) {
if (cached) {
numDatabaseRequests++;
} else {
numDatabaseRequests += totalPeople();
}
}
if (sportsEntity) {
if (cached) {
numDatabaseRequests++;
} else {
numDatabaseRequests += totalPeople();
}
}
} else {
countryEntity = true;
sportsEntity = true;
if (numChildren == 1) {
countryEntity = random().nextBoolean();
sportsEntity = !countryEntity;
}
if (countryEntity) {
countryTransformer = random().nextBoolean();
if (cached) {
numDatabaseRequests++;
countryCached = true;
} else {
numDatabaseRequests += totalPeople();
}
}
if (sportsEntity) {
sportsTransformer = random().nextBoolean();
if (cached) {
numDatabaseRequests++;
sportsCached = true;
} else {
numDatabaseRequests += totalPeople();
}
}
}
h.query("/dataimport", generateRequest());
assertQ("There should be 1 document per person in the database: "
+ totalPeople(), req("*:*"), "//*[@numFound='" + (totalPeople()) + "']");
if (!underlyingDataModified
&& "AddAColumnTransformer".equals(rootTransformerName)) {
assertQ(
"There should be 1 document with a transformer-added column per person is the database: "
+ totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='"
+ (totalPeople()) + "']");
}
if (countryEntity) {
if (personNameExists("Jayden")) {
String nrName = countryNameByCode("NP");
if (nrName != null && nrName.length() > 0) {
assertQ(req("NAME_mult_s:Jayden"), "//*[@numFound='1']",
"//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'");
}
}
String nrName = countryNameByCode("NR");
int num = numberPeopleByCountryCode("NR");
if (nrName != null && num > 0) {
assertQ(req("COUNTRY_CODES_mult_s:NR"), "//*[@numFound='" + num + "']",
"//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'");
}
if (countryTransformer && !underlyingDataModified) {
assertQ(req("countryAdded_s:country_added"), "//*[@numFound='"
+ totalPeople() + "']");
}
}
if (sportsEntity) {
if (!underlyingDataModified) {
assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']");
}
String michaelsName = personNameById(3);
String[] michaelsSports = sportNamesByPersonId(3);
if (michaelsName != null && michaelsSports.length > 0) {
String[] xpath = new String[michaelsSports.length + 1];
xpath[0] = "//*[@numFound='1']";
int i = 1;
for (String ms : michaelsSports) {
xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='"
+ ms + "'";
i++;
}
assertQ(req("NAME_mult_s:" + michaelsName.replaceAll("\\W", "\\\\$0")),
xpath);
}
if (!underlyingDataModified && sportsTransformer) {
assertQ(req("sportsAdded_s:sport_added"), "//*[@numFound='"
+ (totalPeople()) + "']");
}
}
if (checkDatabaseRequests) {
Assert.assertTrue("Expecting " + numDatabaseRequests
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() == numDatabaseRequests);
}
}
protected void simpleCacheChildEntities(boolean checkDatabaseRequests)
throws Exception {
useSimpleCaches = true;
countryEntity = true;
sportsEntity = true;
countryCached = true;
sportsCached = true;
int dbRequestsMoreThan = 3;
int dbRequestsLessThan = totalPeople() * 2 + 1;
h.query("/dataimport", generateRequest());
assertQ(req("*:*"), "//*[@numFound='" + (totalPeople()) + "']");
if (!underlyingDataModified
|| (personNameExists("Samantha") && "Nauru"
.equals(countryNameByCode("NR")))) {
assertQ(req("NAME_mult_s:Samantha"), "//*[@numFound='1']",
"//doc/str[@name='COUNTRY_NAME_s']='Nauru'");
}
if (!underlyingDataModified) {
assertQ(req("COUNTRY_CODES_mult_s:NR"), "//*[@numFound='2']",
"//doc/str[@name='COUNTRY_NAME_s']='Nauru'");
assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']");
}
String[] michaelsSports = sportNamesByPersonId(3);
if (!underlyingDataModified || michaelsSports.length > 0) {
String[] xpath = new String[michaelsSports.length + 1];
xpath[0] = "//*[@numFound='1']";
int i = 1;
for (String ms : michaelsSports) {
xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms
+ "'";
i++;
}
assertQ(req("NAME_mult_s:Michael"), xpath);
}
if (checkDatabaseRequests) {
Assert.assertTrue("Expecting more than " + dbRequestsMoreThan
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() > dbRequestsMoreThan);
Assert.assertTrue("Expecting fewer than " + dbRequestsLessThan
+ " database calls, but DIH reported " + totalDatabaseRequests(),
totalDatabaseRequests() < dbRequestsLessThan);
}
}
private int getIntFromQuery(String query) throws Exception {
Connection conn = null;
Statement s = null;
ResultSet rs = null;
try {
conn = newConnection();
s = conn.createStatement();
rs = s.executeQuery(query);
if (rs.next()) {
return rs.getInt(1);
}
return 0;
} catch (SQLException e) {
throw e;
} finally {
try {
rs.close();
} catch (Exception ex) {}
try {
s.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
}
private String[] getStringsFromQuery(String query) throws Exception {
Connection conn = null;
Statement s = null;
ResultSet rs = null;
try {
conn = newConnection();
s = conn.createStatement();
rs = s.executeQuery(query);
List<String> results = new ArrayList<String>();
while (rs.next()) {
results.add(rs.getString(1));
}
return results.toArray(new String[results.size()]);
} catch (SQLException e) {
throw e;
} finally {
try {
rs.close();
} catch (Exception ex) {}
try {
s.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
}
public int totalCountries() throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM COUNTRIES WHERE DELETED != 'Y' ");
}
public int totalPeople() throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' ");
}
public boolean countryCodeExists(String cc) throws Exception {
return getIntFromQuery("SELECT COUNT(1) country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='"
+ cc + "'") > 0;
}
public String countryNameByCode(String cc) throws Exception {
String[] s = getStringsFromQuery("SELECT country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='"
+ cc + "'");
return s.length == 0 ? null : s[0];
}
public int numberPeopleByCountryCode(String cc) throws Exception {
return getIntFromQuery("Select count(1) " + "from people p "
+ "inner join countries c on p.country_code=c.code "
+ "where p.deleted!='Y' and c.deleted!='Y' and c.code='" + cc + "'");
}
public String[] sportNamesByPersonId(int personId) throws Exception {
return getStringsFromQuery("SELECT ps.SPORT_NAME "
+ "FROM people_sports ps "
+ "INNER JOIN PEOPLE p ON p.id = ps.person_Id "
+ "WHERE ps.DELETED != 'Y' AND p.DELETED != 'Y' " + "AND ps.person_id="
+ personId + " " + "ORDER BY ps.id");
}
public boolean personNameExists(String pn) throws Exception {
return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' AND NAME='"
+ pn + "'") > 0;
}
public String personNameById(int id) throws Exception {
String[] nameArr = getStringsFromQuery("SELECT NAME FROM PEOPLE WHERE ID="
+ id);
if (nameArr.length == 0) {
return null;
}
return nameArr[0];
}
public IntChanges modifySomePeople() throws Exception {
underlyingDataModified = true;
int numberToChange = random().nextInt(people.length + 1);
Set<Integer> changeSet = new HashSet<Integer>();
Set<Integer> deleteSet = new HashSet<Integer>();
Set<Integer> addSet = new HashSet<Integer>();
Connection conn = null;
PreparedStatement change = null;
PreparedStatement delete = null;
PreparedStatement add = null;
// One second in the future ensures a change time after the last import (DIH
// uses second precision only)
Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000);
try {
conn = newConnection();
change = conn
.prepareStatement("update people set name=?, last_modified=? where id=?");
delete = conn
.prepareStatement("update people set deleted='Y', last_modified=? where id=?");
add = conn
.prepareStatement("insert into people (id,name,country_code,last_modified) values (?,?,'ZZ',?)");
for (int i = 0; i < numberToChange; i++) {
int tryIndex = random().nextInt(people.length);
Integer id = (Integer) people[tryIndex][0];
if (!changeSet.contains(id) && !deleteSet.contains(id)) {
boolean changeDontDelete = random().nextBoolean();
if (changeDontDelete) {
changeSet.add(id);
change.setString(1, "MODIFIED " + people[tryIndex][1]);
change.setTimestamp(2, theTime);
change.setInt(3, id);
Assert.assertEquals(1, change.executeUpdate());
} else {
deleteSet.add(id);
delete.setTimestamp(1, theTime);
delete.setInt(2, id);
Assert.assertEquals(1, delete.executeUpdate());
}
}
}
int numberToAdd = random().nextInt(3);
for (int i = 0; i < numberToAdd; i++) {
int tryIndex = random().nextInt(people.length);
Integer id = (Integer) people[tryIndex][0];
Integer newId = id + 1000;
String newDesc = "ADDED " + people[tryIndex][1];
if (!addSet.contains(newId)) {
addSet.add(newId);
add.setInt(1, newId);
add.setString(2, newDesc);
add.setTimestamp(3, theTime);
Assert.assertEquals(1, add.executeUpdate());
}
}
conn.commit();
} catch (SQLException e) {
throw e;
} finally {
try {
change.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
IntChanges c = new IntChanges();
c.changedKeys = changeSet.toArray(new Integer[changeSet.size()]);
c.deletedKeys = deleteSet.toArray(new Integer[deleteSet.size()]);
c.addedKeys = addSet.toArray(new Integer[addSet.size()]);
return c;
}
public String[] modifySomeCountries() throws Exception {
underlyingDataModified = true;
int numberToChange = random().nextInt(countries.length + 1);
Set<String> changeSet = new HashSet<String>();
Connection conn = null;
PreparedStatement change = null;
// One second in the future ensures a change time after the last import (DIH
// uses second precision only)
Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000);
try {
conn = newConnection();
change = conn
.prepareStatement("update countries set country_name=?, last_modified=? where code=?");
for (int i = 0; i < numberToChange; i++) {
int tryIndex = random().nextInt(countries.length);
String code = countries[tryIndex][0];
if (!changeSet.contains(code)) {
changeSet.add(code);
change.setString(1, "MODIFIED " + countries[tryIndex][1]);
change.setTimestamp(2, theTime);
change.setString(3, code);
Assert.assertEquals(1, change.executeUpdate());
}
}
} catch (SQLException e) {
throw e;
} finally {
try {
change.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
return changeSet.toArray(new String[changeSet.size()]);
}
class IntChanges {
public Integer[] changedKeys;
public Integer[] deletedKeys;
public Integer[] addedKeys;
}
@Override
protected String generateConfig() {
String ds = null;
if (dbToUse == Database.DERBY) {
ds = "derby";
} else if (dbToUse == Database.HSQLDB) {
ds = "hsqldb";
} else {
throw new AssertionError("Invalid database to use: " + dbToUse);
}
StringBuilder sb = new StringBuilder();
sb.append("<dataConfig> \n");
sb.append("<dataSource name=''hsqldb'' driver=''org.hsqldb.jdbcDriver'' url=''jdbc:hsqldb:mem:.'' /> \n");
sb.append("<dataSource name=''derby'' driver=''org.apache.derby.jdbc.EmbeddedDriver'' url=''jdbc:derby:memory:derbyDB;'' /> \n");
sb.append("<document name=''TestSqlEntityProcessor''> \n");
sb.append("<entity name=''People'' ");
sb.append("pk=''" + (random().nextBoolean() ? "ID" : "People.ID") + "'' ");
sb.append("processor=''SqlEntityProcessor'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(rootTransformerName != null ? "transformer=''"
+ rootTransformerName + "'' " : "");
sb.append("query=''SELECT ID, NAME, COUNTRY_CODE FROM PEOPLE WHERE DELETED != 'Y' '' ");
sb.append(deltaQueriesPersonTable());
sb.append("> \n");
sb.append("<field column=''NAME'' name=''NAME_mult_s'' /> \n");
sb.append("<field column=''COUNTRY_CODE'' name=''COUNTRY_CODES_mult_s'' /> \n");
if (countryEntity) {
sb.append("<entity name=''Countries'' ");
sb.append("pk=''" + (random().nextBoolean() ? "CODE" : "Countries.CODE")
+ "'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(countryTransformer ? "transformer=''AddAColumnTransformer'' "
+ "newColumnName=''countryAdded_s'' newColumnValue=''country_added'' "
: "");
if (countryCached) {
sb.append(random().nextBoolean() ? "processor=''SqlEntityProcessor'' cacheImpl=''SortedMapBackedCache'' "
: "processor=''CachedSqlEntityProcessor'' ");
if (useSimpleCaches) {
sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' ''>\n");
} else {
sb.append(random().nextBoolean() ? "cacheKey=''CODE'' cacheLookup=''People.COUNTRY_CODE'' "
: "where=''CODE=People.COUNTRY_CODE'' ");
sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES'' ");
sb.append("> \n");
}
} else {
sb.append("processor=''SqlEntityProcessor'' query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' '' ");
sb.append(deltaQueriesCountryTable());
sb.append("> \n");
}
sb.append("<field column=''CODE'' name=''COUNTRY_CODE_s'' /> \n");
sb.append("<field column=''COUNTRY_NAME'' name=''COUNTRY_NAME_s'' /> \n");
sb.append("</entity> \n");
}
if (sportsEntity) {
sb.append("<entity name=''Sports'' ");
sb.append("dataSource=''" + ds + "'' ");
sb.append(sportsTransformer ? "transformer=''AddAColumnTransformer'' "
+ "newColumnName=''sportsAdded_s'' newColumnValue=''sport_added'' "
: "");
if (sportsCached) {
sb.append(random().nextBoolean() ? "processor=''SqlEntityProcessor'' cacheImpl=''SortedMapBackedCache'' "
: "processor=''CachedSqlEntityProcessor'' ");
if (useSimpleCaches) {
sb.append("query=''SELECT ID, SPORT_NAME FROM PEOPLE_SPORTS WHERE DELETED != 'Y' AND PERSON_ID=${People.ID} ORDER BY ID'' ");
} else {
sb.append(random().nextBoolean() ? "cacheKey=''PERSON_ID'' cacheLookup=''People.ID'' "
: "where=''PERSON_ID=People.ID'' ");
sb.append("query=''SELECT ID, PERSON_ID, SPORT_NAME FROM PEOPLE_SPORTS ORDER BY ID'' ");
}
} else {
sb.append("processor=''SqlEntityProcessor'' query=''SELECT ID, SPORT_NAME FROM PEOPLE_SPORTS WHERE DELETED != 'Y' AND PERSON_ID=${People.ID} ORDER BY ID'' ");
}
sb.append("> \n");
sb.append("<field column=''SPORT_NAME'' name=''SPORT_NAME_mult_s'' /> \n");
sb.append("<field column=''id'' name=''SPORT_ID_mult_s'' /> \n");
sb.append("</entity> \n");
}
sb.append("</entity> \n");
sb.append("</document> \n");
sb.append("</dataConfig> \n");
String config = sb.toString().replaceAll("[']{2}", "\"");
log.debug(config);
return config;
}
@Override
protected void populateData(Connection conn) throws Exception {
Statement s = null;
PreparedStatement ps = null;
Timestamp theTime = new Timestamp(System.currentTimeMillis() - 10000); // 10 seconds ago
try {
s = conn.createStatement();
s.executeUpdate("create table countries(code varchar(3) not null primary key, country_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)");
s.executeUpdate("create table people(id int not null primary key, name varchar(50), country_code char(2), deleted char(1) default 'N', last_modified timestamp not null)");
s.executeUpdate("create table people_sports(id int not null primary key, person_id int, sport_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)");
ps = conn
.prepareStatement("insert into countries (code, country_name, last_modified) values (?,?,?)");
for (String[] country : countries) {
ps.setString(1, country[0]);
ps.setString(2, country[1]);
ps.setTimestamp(3, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
ps = conn
.prepareStatement("insert into people (id, name, country_code, last_modified) values (?,?,?,?)");
for (Object[] person : people) {
ps.setInt(1, (Integer) person[0]);
ps.setString(2, (String) person[1]);
ps.setString(3, (String) person[2]);
ps.setTimestamp(4, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
ps = conn
.prepareStatement("insert into people_sports (id, person_id, sport_name, last_modified) values (?,?,?,?)");
for (Object[] sport : people_sports) {
ps.setInt(1, (Integer) sport[0]);
ps.setInt(2, (Integer) sport[1]);
ps.setString(3, (String) sport[2]);
ps.setTimestamp(4, theTime);
Assert.assertEquals(1, ps.executeUpdate());
}
ps.close();
conn.commit();
conn.close();
} catch (Exception e) {
throw e;
} finally {
try {
ps.close();
} catch (Exception ex) {}
try {
s.close();
} catch (Exception ex) {}
try {
conn.close();
} catch (Exception ex) {}
}
}
public static final String[][] countries = {
{"NA", "Namibia"},
{"NC", "New Caledonia"},
{"NE", "Niger"},
{"NF", "Norfolk Island"},
{"NG", "Nigeria"},
{"NI", "Nicaragua"},
{"NL", "Netherlands"},
{"NO", "Norway"},
{"NP", "Nepal"},
{"NR", "Nauru"},
{"NU", "Niue"},
{"NZ", "New Zealand"}
};
public static final Object[][] people = {
{1,"Jacob","NZ"},
{2,"Ethan","NU"},
{3,"Michael","NR"},
{4,"Jayden","NP"},
{5,"William","NO"},
{6,"Alexander","NL"},
{7,"Noah","NI"},
{8,"Daniel","NG"},
{9,"Aiden","NF"},
{10,"Anthony","NE"},
{11,"Emma","NL"},
{12,"Grace","NI"},
{13,"Hailey","NG"},
{14,"Isabella","NF"},
{15,"Lily","NE"},
{16,"Madison","NC"},
{17,"Mia","NA"},
{18,"Natalie","NZ"},
{19,"Olivia","NU"},
{20,"Samantha","NR"}
};
public static final Object[][] people_sports = {
{100, 1, "Swimming"},
{200, 2, "Triathlon"},
{300, 3, "Water polo"},
{310, 3, "Underwater rugby"},
{320, 3, "Kayaking"},
{400, 4, "Snorkeling"},
{500, 5, "Synchronized diving"},
{600, 6, "Underwater rugby"},
{700, 7, "Boating"},
{800, 8, "Bodyboarding"},
{900, 9, "Canoeing"},
{1000, 10, "Fishing"},
{1100, 11, "Jet Ski"},
{1110, 11, "Rowing"},
{1120, 11, "Sailing"},
{1200, 12, "Kayaking"},
{1210, 12, "Canoeing"},
{1300, 13, "Kite surfing"},
{1400, 14, "Parasailing"},
{1500, 15, "Rafting"},
{1600, 16, "Rowing"},
{1700, 17, "Sailing"},
{1800, 18, "White Water Rafting"},
{1900, 19, "Water skiing"},
{2000, 20, "Windsurfing"}
};
}

View File

@ -0,0 +1,127 @@
package org.apache.solr.handler.dataimport;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TestSimplePropertiesWriter extends AbstractDIHJdbcTestCase {
private boolean useJdbcEscapeSyntax;
private String dateFormat;
private String fileLocation;
private String fileName;
@Before
public void spwBefore() throws Exception {
File tmpdir = File.createTempFile("test", "tmp", TEMP_DIR);
tmpdir.delete();
tmpdir.mkdir();
fileLocation = tmpdir.getPath();
fileName = "the.properties";
}
@After
public void spwAfter() throws Exception {
new File(fileLocation + File.separatorChar + fileName).delete();
new File(fileLocation).delete();
}
@Test
public void testSimplePropertiesWriter() throws Exception {
SimpleDateFormat errMsgFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
String[] d = {
"{'ts' ''yyyy-MM-dd HH:mm:ss.SSSSSS''}",
"{'ts' ''yyyy-MM-dd HH:mm:ss''}",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd HH:mm:ss.SSSSSS"
};
for(int i=0 ; i<d.length ; i++) {
delQ("*:*");
commit();
if(i<2) {
useJdbcEscapeSyntax = true;
} else {
useJdbcEscapeSyntax = false;
}
dateFormat = d[i];
SimpleDateFormat df = new SimpleDateFormat(dateFormat, Locale.ROOT);
Date oneSecondAgo = new Date(System.currentTimeMillis() - 1000);
Map<String,String> init = new HashMap<String,String>();
init.put("dateFormat", dateFormat);
init.put("filename", fileName);
init.put("directory", fileLocation);
SimplePropertiesWriter spw = new SimplePropertiesWriter();
spw.init(new DataImporter(), init);
Map<String, Object> props = new HashMap<String,Object>();
props.put("SomeDates.last_index_time", oneSecondAgo);
props.put("last_index_time", oneSecondAgo);
spw.persist(props);
h.query("/dataimport", generateRequest());
props = spw.readIndexerProperties();
Date entityDate = df.parse((String) props.get("SomeDates.last_index_time"));
Date docDate= df.parse((String) props.get("last_index_time"));
Calendar c = new GregorianCalendar(TimeZone.getTimeZone("GMT"), Locale.ROOT);
c.setTime(docDate);
int year = c.get(Calendar.YEAR);
Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the document date: " + errMsgFormat.format(docDate), docDate.getTime() - oneSecondAgo.getTime() > 0);
Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the entity date: " + errMsgFormat.format(entityDate), entityDate.getTime() - oneSecondAgo.getTime() > 0);
assertQ(req("*:*"), "//*[@numFound='1']", "//doc/str[@name=\"ayear_s\"]=\"" + year + "\"");
}
}
@Override
protected Database setAllowedDatabases() {
return Database.DERBY;
}
@Override
protected String generateConfig() {
StringBuilder sb = new StringBuilder();
String q = useJdbcEscapeSyntax ? "" : "'";
sb.append("<dataConfig> \n");
sb.append("<propertyWriter dateFormat=\"" + dateFormat + "\" type=\"SimplePropertiesWriter\" directory=\"" + fileLocation + "\" filename=\"" + fileName + "\" />\n");
sb.append("<dataSource name=\"derby\" driver=\"org.apache.derby.jdbc.EmbeddedDriver\" url=\"jdbc:derby:memory:derbyDB;\" /> \n");
sb.append("<document name=\"TestSimplePropertiesWriter\"> \n");
sb.append("<entity name=\"SomeDates\" processor=\"SqlEntityProcessor\" dataSource=\"derby\" ");
sb.append("query=\"select 1 as id, YEAR(" + q + "${dih.last_index_time}" + q + ") as AYEAR_S from sysibm.sysdummy1 \" >\n");
sb.append("<field column=\"AYEAR_S\" name=\"ayear_s\" /> \n");
sb.append("</entity>\n");
sb.append("</document> \n");
sb.append("</dataConfig> \n");
String config = sb.toString();
log.debug(config);
return config;
}
}

View File

@ -23,7 +23,7 @@ import org.junit.Test;
/**
* Test with various combinations of parameters, child entities, caches, transformers.
*/
public class TestSqlEntityProcessor extends AbstractDIHJdbcTestCase {
public class TestSqlEntityProcessor extends AbstractSqlEntityProcessorTestCase {
@Test
public void testSingleEntity() throws Exception {

View File

@ -1,11 +1,6 @@
package org.apache.solr.handler.dataimport;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@ -29,20 +24,12 @@ import org.junit.Test;
/**
* Test with various combinations of parameters, child entites, transformers.
*/
public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
public class TestSqlEntityProcessorDelta extends AbstractSqlEntityProcessorTestCase {
private boolean delta = false;
private boolean useParentDeltaQueryParam = false;
private IntChanges personChanges = null;
private String[] countryChanges = null;
//TODO: remove this on fixing SOLR-4051 / SOLR-1916
private void assumeIncomaptibleLocale() {
Date d = new Date();
String badDateFormat = DataImporter.DATE_TIME_FORMAT.get().format(d);
String betterDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(d);
Assume.assumeTrue(badDateFormat.equals(betterDateFormat));
}
@Before
public void setupDeltaTest() {
delta = false;
@ -51,7 +38,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
}
@Test
public void testSingleEntity() throws Exception {
assumeIncomaptibleLocale();
singleEntity(1);
changeStuff();
int c = calculateDatabaseCalls();
@ -60,7 +46,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
}
@Test
public void testWithSimpleTransformer() throws Exception {
assumeIncomaptibleLocale();
simpleTransform(1);
changeStuff();
simpleTransform(calculateDatabaseCalls());
@ -68,7 +53,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
}
@Test
public void testWithComplexTransformer() throws Exception {
assumeIncomaptibleLocale();
complexTransform(1, 0);
changeStuff();
complexTransform(calculateDatabaseCalls(), personChanges.deletedKeys.length);
@ -76,7 +60,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
}
@Test
public void testChildEntities() throws Exception {
assumeIncomaptibleLocale();
useParentDeltaQueryParam = random().nextBoolean();
withChildEntities(false, true);
changeStuff();
@ -137,12 +120,13 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase {
personChanges = modifySomePeople();
}
delta = true;
}
}
@Override
protected LocalSolrQueryRequest generateRequest() {
return lrf.makeRequest("command", (delta ? "delta-import" : "full-import"), "dataConfig", generateConfig(),
"clean", (delta ? "false" : "true"), "commit", "true", "synchronous", "true", "indent", "true");
}
@Override
protected String deltaQueriesPersonTable() {
return
"deletedPkQuery=''SELECT ID FROM PEOPLE WHERE DELETED='Y' AND last_modified &gt;='${dih.last_index_time}' '' " +