From 3e45aec329684015367df334876908e0cd16d772 Mon Sep 17 00:00:00 2001 From: James Dyer Date: Tue, 13 Nov 2012 18:30:51 +0000 Subject: [PATCH] SOLR-4051: Configurable DIH Property Writers git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1408873 13f79535-47bb-0310-9956-ffa450edef68 --- .../handler/dataimport/DIHProperties.java | 44 ++ .../solr/handler/dataimport/DataImporter.java | 89 ++- .../solr/handler/dataimport/DocBuilder.java | 28 +- .../dataimport/SimplePropertiesWriter.java | 234 ++++-- .../handler/dataimport/TemplateString.java | 10 +- .../dataimport/ZKPropertiesWriter.java | 30 +- .../config/ConfigNameConstants.java | 2 + .../dataimport/config/DIHConfiguration.java | 11 +- .../PropertyWriter.java} | 32 +- .../dataimport/AbstractDIHJdbcTestCase.java | 714 ++---------------- .../AbstractSqlEntityProcessorTestCase.java | 696 +++++++++++++++++ .../TestSimplePropertiesWriter.java | 127 ++++ .../dataimport/TestSqlEntityProcessor.java | 2 +- .../TestSqlEntityProcessorDelta.java | 24 +- 14 files changed, 1240 insertions(+), 803 deletions(-) create mode 100644 solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java rename solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/{DIHPropertiesWriter.java => config/PropertyWriter.java} (59%) create mode 100644 solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractSqlEntityProcessorTestCase.java create mode 100644 solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSimplePropertiesWriter.java diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java new file mode 100644 index 00000000000..0ea391c4974 --- /dev/null +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHProperties.java @@ -0,0 +1,44 @@ +package org.apache.solr.handler.dataimport; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in combstract clapliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Date; +import java.util.Map; + +/** + * Implementations write out properties about the last data import + * for use by the next import. ex: to persist the last import timestamp + * so that future delta imports can know what needs to be updated. + * + * @lucene.experimental + */ +public abstract class DIHProperties { + + public abstract void init(DataImporter dataImporter, Map initParams); + + public abstract boolean isWritable(); + + public abstract void persist(Map props); + + public abstract Map readIndexerProperties(); + + public Date getCurrentTimestamp() { + return new Date(); + } + +} diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java index baaa5dd9d18..5daf08d0e4c 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java @@ -29,6 +29,7 @@ import org.apache.solr.handler.dataimport.config.ConfigNameConstants; import org.apache.solr.handler.dataimport.config.ConfigParseUtil; import org.apache.solr.handler.dataimport.config.DIHConfiguration; import org.apache.solr.handler.dataimport.config.Entity; +import org.apache.solr.handler.dataimport.config.PropertyWriter; import org.apache.solr.handler.dataimport.config.Script; import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow; @@ -78,7 +79,6 @@ public class DataImporter { public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics(); private SolrCore core; private Map coreScopeSession = new ConcurrentHashMap(); - private DIHPropertiesWriter propWriter; private ReentrantLock importLock = new ReentrantLock(); private boolean isDeltaImportSupported = false; private final String handlerName; @@ -88,8 +88,6 @@ public class DataImporter { * Only for testing purposes */ DataImporter() { - createPropertyWriter(); - propWriter.init(this); this.handlerName = "dataimport" ; } @@ -97,19 +95,10 @@ public class DataImporter { this.handlerName = handlerName; this.core = core; this.schema = core.getSchema(); - loadSchemaFieldMap(); - createPropertyWriter(); + loadSchemaFieldMap(); } - private void createPropertyWriter() { - if (this.core == null - || !this.core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) { - propWriter = new SimplePropertiesWriter(); - } else { - propWriter = new ZKPropertiesWriter(); - } - propWriter.init(this); - } + boolean maybeReloadConfiguration(RequestInfo params, @@ -278,7 +267,7 @@ public class DataImporter { } } } - List dataSourceTags = ConfigParseUtil.getChildNodes(e, DATA_SRC); + List dataSourceTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.DATA_SRC); if (!dataSourceTags.isEmpty()) { for (Element element : dataSourceTags) { Map p = new HashMap(); @@ -295,7 +284,54 @@ public class DataImporter { break; } } - return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources); + PropertyWriter pw = null; + List propertyWriterTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.PROPERTY_WRITER); + if (propertyWriterTags.isEmpty()) { + boolean zookeeper = false; + if (this.core != null + && this.core.getCoreDescriptor().getCoreContainer() + .isZooKeeperAware()) { + zookeeper = true; + } + pw = new PropertyWriter(zookeeper ? "ZKPropertiesWriter" + : "SimplePropertiesWriter", Collections. emptyMap()); + } else if (propertyWriterTags.size() > 1) { + throw new DataImportHandlerException(SEVERE, "Only one " + + ConfigNameConstants.PROPERTY_WRITER + " can be configured."); + } else { + Element pwElement = propertyWriterTags.get(0); + String type = null; + Map params = new HashMap(); + for (Map.Entry entry : ConfigParseUtil.getAllAttributes( + pwElement).entrySet()) { + if (TYPE.equals(entry.getKey())) { + type = entry.getValue(); + } else { + params.put(entry.getKey(), entry.getValue()); + } + } + if (type == null) { + throw new DataImportHandlerException(SEVERE, "The " + + ConfigNameConstants.PROPERTY_WRITER + " element must specify " + + TYPE); + } + pw = new PropertyWriter(type, params); + } + return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources, pw); + } + + @SuppressWarnings("unchecked") + private DIHProperties createPropertyWriter() { + DIHProperties propWriter = null; + PropertyWriter configPw = config.getPropertyWriter(); + try { + Class writerClass = DocBuilder.loadClass(configPw.getType(), this.core); + propWriter = writerClass.newInstance(); + propWriter.init(this, configPw.getParameters()); + } catch (Exception e) { + throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to PropertyWriter implementation:" + configPw.getType(), e); + } + return propWriter; } DIHConfiguration getConfig() { @@ -374,11 +410,11 @@ public class DataImporter { LOG.info("Starting Full Import"); setStatus(Status.RUNNING_FULL_DUMP); - setIndexStartTime(new Date()); - try { - docBuilder = new DocBuilder(this, writer, propWriter, requestParams); - checkWritablePersistFile(writer); + DIHProperties dihPropWriter = createPropertyWriter(); + setIndexStartTime(dihPropWriter.getCurrentTimestamp()); + docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams); + checkWritablePersistFile(writer, dihPropWriter); docBuilder.execute(); if (!requestParams.isDebug()) cumulativeStatistics.add(docBuilder.importStatistics); @@ -392,10 +428,8 @@ public class DataImporter { } - private void checkWritablePersistFile(SolrWriter writer) { -// File persistFile = propWriter.getPersistFile(); -// boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite(); - if (isDeltaImportSupported && !propWriter.isWritable()) { + private void checkWritablePersistFile(SolrWriter writer, DIHProperties dihPropWriter) { + if (isDeltaImportSupported && !dihPropWriter.isWritable()) { throw new DataImportHandlerException(SEVERE, "Properties is not writable. Delta imports are supported by data config but will not work."); } @@ -406,9 +440,10 @@ public class DataImporter { setStatus(Status.RUNNING_DELTA_DUMP); try { - setIndexStartTime(new Date()); - docBuilder = new DocBuilder(this, writer, propWriter, requestParams); - checkWritablePersistFile(writer); + DIHProperties dihPropWriter = createPropertyWriter(); + setIndexStartTime(dihPropWriter.getCurrentTimestamp()); + docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams); + checkWritablePersistFile(writer, dihPropWriter); docBuilder.execute(); if (!requestParams.isDebug()) cumulativeStatistics.add(docBuilder.importStatistics); diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java index 9cfe89d93ae..7f34c601796 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java @@ -68,16 +68,16 @@ public class DocBuilder { static final ThreadLocal INSTANCE = new ThreadLocal(); private Map functionsNamespace; - private Properties persistedProperties; + private Map persistedProperties; - private DIHPropertiesWriter propWriter; + private DIHProperties propWriter; private static final String PARAM_WRITER_IMPL = "writerImpl"; private static final String DEFAULT_WRITER_NAME = "SolrWriter"; private DebugLogger debugLogger; private final RequestInfo reqParams; @SuppressWarnings("unchecked") - public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, RequestInfo reqParams) { + public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) { INSTANCE.set(this); this.dataImporter = dataImporter; this.reqParams = reqParams; @@ -121,22 +121,22 @@ public class DocBuilder { resolver = new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties()); } else resolver = new VariableResolverImpl(); Map indexerNamespace = new HashMap(); - if (persistedProperties.getProperty(LAST_INDEX_TIME) != null) { - indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.getProperty(LAST_INDEX_TIME)); + if (persistedProperties.get(LAST_INDEX_TIME) != null) { + indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME)); } else { // set epoch - indexerNamespace.put(LAST_INDEX_TIME, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH)); + indexerNamespace.put(LAST_INDEX_TIME, EPOCH); } indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime()); indexerNamespace.put("request", reqParams.getRawParams()); indexerNamespace.put("functions", functionsNamespace); for (Entity entity : dataImporter.getConfig().getEntities()) { String key = entity.getName() + "." + SolrWriter.LAST_INDEX_KEY; - String lastIndex = persistedProperties.getProperty(key); - if (lastIndex != null) { + Object lastIndex = persistedProperties.get(key); + if (lastIndex != null && lastIndex instanceof Date) { indexerNamespace.put(key, lastIndex); } else { - indexerNamespace.put(key, DataImporter.DATE_TIME_FORMAT.get().format(EPOCH)); + indexerNamespace.put(key, EPOCH); } } resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace); @@ -206,9 +206,8 @@ public class DocBuilder { } AtomicBoolean fullCleanDone = new AtomicBoolean(false); //we must not do a delete of *:* multiple times if there are multiple root entities to be run - Properties lastIndexTimeProps = new Properties(); - lastIndexTimeProps.setProperty(LAST_INDEX_KEY, - DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime())); + Map lastIndexTimeProps = new HashMap(); + lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime()); epwList = new ArrayList(config.getEntities().size()); for (Entity e : config.getEntities()) { @@ -217,8 +216,7 @@ public class DocBuilder { for (EntityProcessorWrapper epw : epwList) { if (entities != null && !entities.contains(epw.getEntity().getName())) continue; - lastIndexTimeProps.setProperty(epw.getEntity().getName() + "." + LAST_INDEX_KEY, - DataImporter.DATE_TIME_FORMAT.get().format(new Date())); + lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp()); currentEntityProcessorWrapper = epw; String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery"); if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) { @@ -295,7 +293,7 @@ public class DocBuilder { } @SuppressWarnings("unchecked") - private void finish(Properties lastIndexTimeProps) { + private void finish(Map lastIndexTimeProps) { LOG.info("Import completed successfully"); statusMessages.put("", "Indexing completed. Added/Updated: " + importStatistics.docCount + " documents. Deleted " diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java index dcdb4b5a0b7..a30ede5a3e5 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java @@ -1,4 +1,5 @@ package org.apache.solr.handler.dataimport; + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,108 +17,213 @@ package org.apache.solr.handler.dataimport; * limitations under the License. */ +import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE; + import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; import java.util.Properties; import org.apache.solr.core.SolrCore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public class SimplePropertiesWriter implements DIHPropertiesWriter { - private static final Logger log = LoggerFactory.getLogger(SimplePropertiesWriter.class); - - static final String IMPORTER_PROPERTIES = "dataimport.properties"; - +/** + *

+ * Writes properties using {@link Properties#store} . + * The special property "last_index_time" is converted to a formatted date. + * Users can configure the location, filename, locale and date format to use. + *

+ */ +public class SimplePropertiesWriter extends DIHProperties { + private static final Logger log = LoggerFactory + .getLogger(SimplePropertiesWriter.class); + static final String LAST_INDEX_KEY = "last_index_time"; - - private String persistFilename = IMPORTER_PROPERTIES; - - private String configDir = null; - - - - public void init(DataImporter dataImporter) { - SolrCore core = dataImporter.getCore(); - String configDir = core ==null ? ".": core.getResourceLoader().getConfigDir(); - String persistFileName = dataImporter.getHandlerName(); - - this.configDir = configDir; - if(persistFileName != null){ - persistFilename = persistFileName + ".properties"; - } + + protected String filename = null; + + protected String configDir = null; + + protected Locale locale = null; + + protected SimpleDateFormat dateFormat = null; + + /** + * The locale to use when writing the properties file. Default is {@link Locale#ROOT} + */ + public static final String LOCALE = "locale"; + /** + * The date format to use when writing values for "last_index_time" to the properties file. + * See {@link SimpleDateFormat} for patterns. Default is yyyy-MM-dd HH:mm:ss . + */ + public static final String DATE_FORMAT = "dateFormat"; + /** + * The directory to save the properties file in. Default is the current core's "config" directory. + */ + public static final String DIRECTORY = "directory"; + /** + * The filename to save the properties file to. Default is this Handler's name from solrconfig.xml. + */ + public static final String FILENAME = "filename"; + + @Override + public void init(DataImporter dataImporter, Map params) { + if(params.get(FILENAME) != null) { + filename = params.get(FILENAME); + } else if(dataImporter.getHandlerName()!=null) { + filename = dataImporter.getHandlerName() + ".properties"; + } else { + filename = "dataimport.properties"; } - - - - + if(params.get(DIRECTORY) != null) { + configDir = params.get(DIRECTORY); + } else { + SolrCore core = dataImporter.getCore(); + configDir = (core == null ? "." : core.getResourceLoader().getConfigDir()); + } + if(params.get(LOCALE) != null) { + String localeStr = params.get(LOCALE); + for (Locale l : Locale.getAvailableLocales()) { + if(localeStr.equals(l.getDisplayName())) { + locale = l; + break; + } + } + if(locale==null) { + throw new DataImportHandlerException(SEVERE, "Unsupported locale for PropertWriter: " + localeStr); + } + } else { + locale = Locale.ROOT; + } + if(params.get(DATE_FORMAT) != null) { + dateFormat = new SimpleDateFormat(params.get(DATE_FORMAT), locale); + } else { + dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", locale); + } + } + private File getPersistFile() { String filePath = configDir; - if (configDir != null && !configDir.endsWith(File.separator)) - filePath += File.separator; - filePath += persistFilename; + if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator; + filePath += filename; return new File(filePath); } - - public boolean isWritable() { - File persistFile = getPersistFile(); - return persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite(); - - } - - @Override - public void persist(Properties p) { - OutputStream propOutput = null; - - Properties props = readIndexerProperties(); - + @Override + public boolean isWritable() { + File persistFile = getPersistFile(); + return persistFile.exists() ? persistFile.canWrite() : persistFile + .getParentFile().canWrite(); + + } + + protected String convertDateToString(Date d) { + return dateFormat.format(d); + } + protected Date convertStringToDate(String s) { try { - props.putAll(p); + return dateFormat.parse(s); + } catch (ParseException e) { + throw new DataImportHandlerException(SEVERE, "Value for " + + LAST_INDEX_KEY + " is invalid for date format " + + dateFormat.toLocalizedPattern() + " : " + s); + } + } + /** + * {@link DocBuilder} sends the date as an Object because + * this class knows how to convert it to a String + */ + protected Properties mapToProperties(Map propObjs) { + Properties p = new Properties(); + for(Map.Entry entry : propObjs.entrySet()) { + String key = entry.getKey(); + String val = null; + String lastKeyPart = key; + int lastDotPos = key.lastIndexOf('.'); + if(lastDotPos!=-1 && key.length() > lastDotPos+1) { + lastKeyPart = key.substring(lastDotPos + 1); + } + if(LAST_INDEX_KEY.equals(lastKeyPart) && entry.getValue() instanceof Date) { + val = convertDateToString((Date) entry.getValue()); + } else { + val = entry.getValue().toString(); + } + p.put(key, val); + } + return p; + } + /** + * We'll send everything back as Strings as this class has + * already converted them. + */ + protected Map propertiesToMap(Properties p) { + Map theMap = new HashMap(); + for(Map.Entry entry : p.entrySet()) { + String key = entry.getKey().toString(); + Object val = entry.getValue().toString(); + theMap.put(key, val); + } + return theMap; + } + + @Override + public void persist(Map propObjs) { + OutputStream propOutput = null; + Properties existingProps = mapToProperties(readIndexerProperties()); + Properties newProps = mapToProperties(propObjs); + try { + existingProps.putAll(newProps); String filePath = configDir; - if (configDir != null && !configDir.endsWith(File.separator)) + if (configDir != null && !configDir.endsWith(File.separator)) { filePath += File.separator; - filePath += persistFilename; + } + filePath += filename; propOutput = new FileOutputStream(filePath); - props.store(propOutput, null); - log.info("Wrote last indexed time to " + persistFilename); + existingProps.store(propOutput, null); + log.info("Wrote last indexed time to " + filename); } catch (Exception e) { - throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to persist Index Start Time", e); + throw new DataImportHandlerException(DataImportHandlerException.SEVERE, + "Unable to persist Index Start Time", e); } finally { try { - if (propOutput != null) - propOutput.close(); + if (propOutput != null) propOutput.close(); } catch (IOException e) { propOutput = null; } } } - + @Override - public Properties readIndexerProperties() { + public Map readIndexerProperties() { Properties props = new Properties(); - InputStream propInput = null; - + InputStream propInput = null; try { - propInput = new FileInputStream(configDir + persistFilename); + String filePath = configDir; + if (configDir != null && !configDir.endsWith(File.separator)) { + filePath += File.separator; + } + filePath += filename; + propInput = new FileInputStream(filePath); props.load(propInput); - log.info("Read " + persistFilename); + log.info("Read " + filename); } catch (Exception e) { - log.warn("Unable to read: " + persistFilename); + log.warn("Unable to read: " + filename); } finally { try { - if (propInput != null) - propInput.close(); + if (propInput != null) propInput.close(); } catch (IOException e) { propInput = null; } - } - - return props; + } + return propertiesToMap(props); } - + } diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateString.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateString.java index 77128dc456d..60249e2956b 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateString.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateString.java @@ -79,7 +79,7 @@ public class TemplateString { String[] s = new String[variables.size()]; for (int i = 0; i < variables.size(); i++) { Object val = resolver.resolve(variables.get(i)); - s[i] = val == null ? "" : getObjectAsString(val); + s[i] = val == null ? "" : val.toString(); } StringBuilder sb = new StringBuilder(); @@ -93,14 +93,6 @@ public class TemplateString { return sb.toString(); } - private String getObjectAsString(Object val) { - if (val instanceof Date) { - Date d = (Date) val; - return DataImporter.DATE_TIME_FORMAT.get().format(d); - } - return val.toString(); - } - /** * Returns the variables in the given string. * diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ZKPropertiesWriter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ZKPropertiesWriter.java index ddec036140f..3838a286533 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ZKPropertiesWriter.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ZKPropertiesWriter.java @@ -18,6 +18,7 @@ package org.apache.solr.handler.dataimport; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.util.Map; import java.util.Properties; import org.apache.solr.common.cloud.SolrZkClient; @@ -25,7 +26,13 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ZKPropertiesWriter implements DIHPropertiesWriter { +/** + *

+ * A SolrCloud-friendly extension of {@link SimplePropertiesWriter}. + * This implementation ignores the "directory" parameter, saving + * the properties file under /configs/[solrcloud collection name]/ + */ +public class ZKPropertiesWriter extends SimplePropertiesWriter { private static final Logger log = LoggerFactory .getLogger(ZKPropertiesWriter.class); @@ -34,16 +41,11 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter { private SolrZkClient zkClient; @Override - public void init(DataImporter dataImporter) { + public void init(DataImporter dataImporter, Map params) { + super.init(dataImporter, params); String collection = dataImporter.getCore().getCoreDescriptor() .getCloudDescriptor().getCollectionName(); - String persistFilename; - if(dataImporter.getHandlerName() != null){ - persistFilename = dataImporter.getHandlerName() + ".properties"; - } else { - persistFilename = SimplePropertiesWriter.IMPORTER_PROPERTIES; - } - path = "/configs/" + collection + "/" + persistFilename; + path = "/configs/" + collection + "/" + filename; zkClient = dataImporter.getCore().getCoreDescriptor().getCoreContainer() .getZkController().getZkClient(); } @@ -54,9 +56,9 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter { } @Override - public void persist(Properties props) { - Properties existing = readIndexerProperties(); - existing.putAll(props); + public void persist(Map propObjs) { + Properties existing = mapToProperties(readIndexerProperties()); + existing.putAll(mapToProperties(propObjs)); ByteArrayOutputStream output = new ByteArrayOutputStream(); try { existing.store(output, ""); @@ -78,7 +80,7 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter { } @Override - public Properties readIndexerProperties() { + public Map readIndexerProperties() { Properties props = new Properties(); try { byte[] data = zkClient.getData(path, null, null, false); @@ -90,6 +92,6 @@ public class ZKPropertiesWriter implements DIHPropertiesWriter { log.warn( "Could not read DIH properties from " + path + " :" + e.getClass(), e); } - return props; + return propertiesToMap(props); } } diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/ConfigNameConstants.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/ConfigNameConstants.java index c92feef6855..ad18e3a7179 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/ConfigNameConstants.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/ConfigNameConstants.java @@ -28,6 +28,8 @@ public class ConfigNameConstants { public static final String NAME = "name"; public static final String PROCESSOR = "processor"; + + public static final String PROPERTY_WRITER = "propertyWriter"; /** * @deprecated use IMPORTER_NS_SHORT instead diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java index b583656b400..7a92501cae8 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/DIHConfiguration.java @@ -43,13 +43,18 @@ import org.w3c.dom.Element; public class DIHConfiguration { // TODO - remove from here and add it to entity private final String deleteQuery; + private final List entities; private final String onImportStart; private final String onImportEnd; private final List> functions; private final Script script; private final Map> dataSources; - public DIHConfiguration(Element element, DataImporter di, List> functions, Script script, Map> dataSources) { + private final PropertyWriter propertyWriter; + + public DIHConfiguration(Element element, DataImporter di, + List> functions, Script script, + Map> dataSources, PropertyWriter pw) { this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null); this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null); this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null); @@ -73,6 +78,7 @@ public class DIHConfiguration { this.functions = Collections.unmodifiableList(modFunc); this.script = script; this.dataSources = Collections.unmodifiableMap(dataSources); + this.propertyWriter = pw; } public String getDeleteQuery() { return deleteQuery; @@ -95,4 +101,7 @@ public class DIHConfiguration { public Script getScript() { return script; } + public PropertyWriter getPropertyWriter() { + return propertyWriter; + } } \ No newline at end of file diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/PropertyWriter.java similarity index 59% rename from solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java rename to solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/PropertyWriter.java index 5bdfc2e8940..5925131331a 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/config/PropertyWriter.java @@ -1,4 +1,8 @@ -package org.apache.solr.handler.dataimport; +package org.apache.solr.handler.dataimport.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -17,16 +21,20 @@ package org.apache.solr.handler.dataimport; * limitations under the License. */ -import java.util.Properties; +public class PropertyWriter { + private final String type; + private final Map parameters; + + public PropertyWriter(String type, Map parameters) { + this.type = type; + this.parameters = Collections.unmodifiableMap(new HashMap(parameters)); + } -public interface DIHPropertiesWriter { - - public void init(DataImporter dataImporter); - - public boolean isWritable(); - - public void persist(Properties props); - - public Properties readIndexerProperties(); - + public Map getParameters() { + return parameters; + } + + public String getType() { + return type; + } } diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHJdbcTestCase.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHJdbcTestCase.java index 6cb82b9ef34..0a8d3426146 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHJdbcTestCase.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHJdbcTestCase.java @@ -1,4 +1,5 @@ package org.apache.solr.handler.dataimport; + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this @@ -19,15 +20,8 @@ package org.apache.solr.handler.dataimport; import java.io.OutputStream; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,25 +32,18 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; + /** * This sets up an in-memory Sql database with a little sample data. */ -public abstract class AbstractDIHJdbcTestCase extends AbstractDataImportHandlerTestCase { - protected boolean underlyingDataModified; +public abstract class AbstractDIHJdbcTestCase extends + AbstractDataImportHandlerTestCase { - protected boolean useSimpleCaches; - protected boolean countryEntity; - protected boolean countryCached; - protected boolean sportsEntity; - protected boolean sportsCached; - protected String rootTransformerName; - protected boolean countryTransformer; - protected boolean sportsTransformer; + protected Database dbToUse; - protected Database db = Database.RANDOM; - private Database dbToUse; - - public enum Database { RANDOM , DERBY , HSQLDB } + public enum Database { + RANDOM, DERBY, HSQLDB + } private static final Pattern totalRequestsPattern = Pattern .compile(".str name..Total Requests made to DataSource..(\\d+)..str."); @@ -68,685 +55,132 @@ public abstract class AbstractDIHJdbcTestCase extends AbstractDataImportHandlerT String oldProp = System.getProperty("derby.stream.error.field"); System.setProperty("derby.stream.error.field", "DerbyUtil.DEV_NULL"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance(); - if(oldProp!=null) { - System.setProperty("derby.stream.error.field", oldProp); + if (oldProp != null) { + System.setProperty("derby.stream.error.field", oldProp); } } catch (Exception e) { throw e; - } + } initCore("dataimport-solrconfig.xml", "dataimport-schema.xml"); - } + } + @AfterClass public static void afterClassDihJdbcTest() throws Exception { try { DriverManager.getConnection("jdbc:derby:;shutdown=true"); - } catch(SQLException e) { - //ignore...we might not even be using derby this time... - } + } catch (SQLException e) { + // ignore...we might not even be using derby this time... + } } + + protected Database setAllowedDatabases() { + return Database.RANDOM; + } + @Before - public void beforeDihJdbcTest() throws Exception { - useSimpleCaches = false; - countryEntity = false; - countryCached = false; - sportsEntity = false; - sportsCached = false; - rootTransformerName = null; - countryTransformer = false; - sportsTransformer = false; - - dbToUse = db; - if(db==Database.RANDOM) { - if(random().nextBoolean()) { + public void beforeDihJdbcTest() throws Exception { + dbToUse = setAllowedDatabases(); + if (dbToUse == Database.RANDOM) { + if (random().nextBoolean()) { dbToUse = Database.DERBY; } else { dbToUse = Database.HSQLDB; } - } + } clearIndex(); - assertU(commit()); + assertU(commit()); buildDatabase(); - } + } + @After public void afterDihJdbcTest() throws Exception { Connection conn = null; Statement s = null; - try { - if(dbToUse==Database.DERBY) { + try { + if (dbToUse == Database.DERBY) { try { - conn = DriverManager.getConnection("jdbc:derby:memory:derbyDB;drop=true"); - } catch(SQLException e) { - if(!"08006".equals(e.getSQLState())) { + conn = DriverManager + .getConnection("jdbc:derby:memory:derbyDB;drop=true"); + } catch (SQLException e) { + if (!"08006".equals(e.getSQLState())) { throw e; } - } - } else if(dbToUse==Database.HSQLDB) { - conn = DriverManager.getConnection("jdbc:hsqldb:mem:."); + } + } else if (dbToUse == Database.HSQLDB) { + conn = DriverManager.getConnection("jdbc:hsqldb:mem:."); s = conn.createStatement(); s.executeUpdate("shutdown"); } } catch (SQLException e) { throw e; } finally { - try { s.close(); } catch(Exception ex) { } - try { conn.close(); } catch(Exception ex) { } + try { + s.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} } } - private Connection newConnection() throws Exception { - if(dbToUse==Database.DERBY) { - return DriverManager.getConnection("jdbc:derby:memory:derbyDB;"); - } else if(dbToUse==Database.HSQLDB) { - return DriverManager.getConnection("jdbc:hsqldb:mem:."); - } - throw new AssertionError("Invalid database to use: " + dbToUse); - } - protected void singleEntity(int numToExpect) throws Exception { - h.query("/dataimport", generateRequest()); - assertQ("There should be 1 document per person in the database: " - + totalPeople(), req("*:*"), "//*[@numFound='" + totalPeople() + "']"); - Assert.assertTrue("Expecting " + numToExpect - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() == numToExpect); + protected Connection newConnection() throws Exception { + if (dbToUse == Database.DERBY) { + return DriverManager.getConnection("jdbc:derby:memory:derbyDB;"); + } else if (dbToUse == Database.HSQLDB) { + return DriverManager.getConnection("jdbc:hsqldb:mem:."); + } + throw new AssertionError("Invalid database to use: " + dbToUse); } - protected void simpleTransform(int numToExpect) throws Exception { - rootTransformerName = "AddAColumnTransformer"; - h.query("/dataimport", generateRequest()); - assertQ("There should be 1 document with a transformer-added column per person is the database: " - + totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + totalPeople() + "']"); - Assert.assertTrue("Expecting " + numToExpect - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() == numToExpect); - } - /** - * A delta update will not clean up documents added by a transformer - * even if the parent document that the transformer used to base the new documents - * were deleted - */ - protected void complexTransform(int numToExpect, int numDeleted) throws Exception { - rootTransformerName = "TripleThreatTransformer"; - h.query("/dataimport", generateRequest()); - int totalDocs = ((totalPeople() * 3) + (numDeleted * 2)); - int totalAddedDocs = (totalPeople() + numDeleted); - assertQ(req("q", "*:*", "rows", "" + (totalPeople() * 3), "sort", "id asc"), "//*[@numFound='" + totalDocs + "']"); - assertQ(req("id:TripleThreat-1-*"), "//*[@numFound='" + totalAddedDocs + "']"); - assertQ(req("id:TripleThreat-2-*"), "//*[@numFound='" + totalAddedDocs + "']"); - if(personNameExists("Michael") && countryCodeExists("NR")) - { - assertQ( - "Michael and NR are assured to be in the database. Therefore the transformer should have added leahciM and RN on the same document as id:TripleThreat-1-3", - req("+id:TripleThreat-1-3 +NAME_mult_s:Michael +NAME_mult_s:leahciM +COUNTRY_CODES_mult_s:NR +COUNTRY_CODES_mult_s:RN"), - "//*[@numFound='1']"); - } - assertQ(req("AddAColumn_s:Added"), "//*[@numFound='" + totalAddedDocs + "']"); - Assert.assertTrue("Expecting " + numToExpect - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() == numToExpect); - } - protected void withChildEntities(boolean cached, boolean checkDatabaseRequests) throws Exception { - rootTransformerName = random().nextBoolean() ? null : "AddAColumnTransformer"; - int numChildren = random().nextInt(1) + 1; - int numDatabaseRequests = 1; - if(underlyingDataModified) { - if (countryEntity) { - if (cached) { - numDatabaseRequests++; - } else { - numDatabaseRequests += totalPeople(); - } - } - if (sportsEntity) { - if (cached) { - numDatabaseRequests++; - } else { - numDatabaseRequests += totalPeople(); - } - } - } else { - countryEntity = true; - sportsEntity = true; - if(numChildren==1) { - countryEntity = random().nextBoolean(); - sportsEntity = !countryEntity; - } - if(countryEntity) { - countryTransformer = random().nextBoolean(); - if(cached) { - numDatabaseRequests++; - countryCached = true; - } else { - numDatabaseRequests += totalPeople(); - } - } - if(sportsEntity) { - sportsTransformer = random().nextBoolean(); - if(cached) { - numDatabaseRequests++; - sportsCached = true; - } else { - numDatabaseRequests += totalPeople(); - } - } - } - h.query("/dataimport", generateRequest()); - - assertQ("There should be 1 document per person in the database: " - + totalPeople(), req("*:*"), "//*[@numFound='" + (totalPeople()) + "']"); - if(!underlyingDataModified && "AddAColumnTransformer".equals(rootTransformerName)) { - assertQ("There should be 1 document with a transformer-added column per person is the database: " - + totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + (totalPeople()) + "']"); - } - if(countryEntity) { - if(personNameExists("Jayden")) - { - String nrName = countryNameByCode("NP"); - if(nrName!=null && nrName.length()>0) { - assertQ(req("NAME_mult_s:Jayden"), - "//*[@numFound='1']", "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'"); - } - } - String nrName = countryNameByCode("NR"); - int num = numberPeopleByCountryCode("NR"); - if(nrName!=null && num>0) { - assertQ(req("COUNTRY_CODES_mult_s:NR"), - "//*[@numFound='" + num + "']", "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'"); - } - if(countryTransformer && !underlyingDataModified) { - assertQ(req("countryAdded_s:country_added"), "//*[@numFound='" + totalPeople() + "']"); - } - } - if(sportsEntity) { - if(!underlyingDataModified) { - assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']"); - } - String michaelsName = personNameById(3); - String[] michaelsSports = sportNamesByPersonId(3); - if(michaelsName != null && michaelsSports.length>0) { - String[] xpath = new String[michaelsSports.length + 1]; - xpath[0] = "//*[@numFound='1']"; - int i=1; - for(String ms : michaelsSports) { - xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms + "'"; - i++; - } - assertQ(req("NAME_mult_s:" + michaelsName.replaceAll("\\W", "\\\\$0")), xpath); - } - if (!underlyingDataModified && sportsTransformer) { - assertQ(req("sportsAdded_s:sport_added"), "//*[@numFound='" + (totalPeople()) + "']"); - } - } - if(checkDatabaseRequests) { - Assert.assertTrue("Expecting " + numDatabaseRequests - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() == numDatabaseRequests); - } - } - protected void simpleCacheChildEntities(boolean checkDatabaseRequests) throws Exception { - useSimpleCaches = true; - countryEntity = true; - sportsEntity = true; - countryCached=true; - sportsCached=true; - int dbRequestsMoreThan = 3; - int dbRequestsLessThan = totalPeople() * 2 + 1; - h.query("/dataimport", generateRequest()); - assertQ(req("*:*"), "//*[@numFound='" + (totalPeople()) + "']"); - if(!underlyingDataModified || (personNameExists("Samantha") && "Nauru".equals(countryNameByCode("NR")))) - { - assertQ(req("NAME_mult_s:Samantha"), - "//*[@numFound='1']", "//doc/str[@name='COUNTRY_NAME_s']='Nauru'"); - } - if(!underlyingDataModified) - { - assertQ(req("COUNTRY_CODES_mult_s:NR"), - "//*[@numFound='2']", "//doc/str[@name='COUNTRY_NAME_s']='Nauru'"); - assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']"); - } - String[] michaelsSports = sportNamesByPersonId(3); - if(!underlyingDataModified || michaelsSports.length>0) { - String[] xpath = new String[michaelsSports.length + 1]; - xpath[0] = "//*[@numFound='1']"; - int i=1; - for(String ms : michaelsSports) { - xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms + "'"; - i++; - } - assertQ(req("NAME_mult_s:Michael"), xpath); - } - if(checkDatabaseRequests) { - Assert.assertTrue("Expecting more than " + dbRequestsMoreThan - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() > dbRequestsMoreThan); - Assert.assertTrue("Expecting fewer than " + dbRequestsLessThan - + " database calls, but DIH reported " + totalDatabaseRequests(), - totalDatabaseRequests() < dbRequestsLessThan); - } - } - public void buildDatabase() throws Exception - { - underlyingDataModified = false; + + protected void buildDatabase() throws Exception { Connection conn = null; - Statement s = null; - PreparedStatement ps = null; - Timestamp theTime = new Timestamp(System.currentTimeMillis() - 10000); //10 seconds ago - try { - if(dbToUse==Database.DERBY) { - conn = DriverManager.getConnection("jdbc:derby:memory:derbyDB;create=true"); - } else if(dbToUse==Database.HSQLDB) { - conn = DriverManager.getConnection("jdbc:hsqldb:mem:."); + try { + if (dbToUse == Database.DERBY) { + conn = DriverManager + .getConnection("jdbc:derby:memory:derbyDB;create=true"); + } else if (dbToUse == Database.HSQLDB) { + conn = DriverManager.getConnection("jdbc:hsqldb:mem:."); } else { throw new AssertionError("Invalid database to use: " + dbToUse); } - s = conn.createStatement(); - s.executeUpdate("create table countries(code varchar(3) not null primary key, country_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)"); - s.executeUpdate("create table people(id int not null primary key, name varchar(50), country_code char(2), deleted char(1) default 'N', last_modified timestamp not null)"); - s.executeUpdate("create table people_sports(id int not null primary key, person_id int, sport_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)"); + populateData(conn); + } catch (Exception e) { + throw e; + } finally { - ps = conn.prepareStatement("insert into countries (code, country_name, last_modified) values (?,?,?)"); - for(String[] country : countries) { - ps.setString(1, country[0]); - ps.setString(2, country[1]); - ps.setTimestamp(3, theTime); - Assert.assertEquals(1, ps.executeUpdate()); - } - ps.close(); - - ps = conn.prepareStatement("insert into people (id, name, country_code, last_modified) values (?,?,?,?)"); - for(Object[] person : people) { - ps.setInt(1, (Integer) person[0]); - ps.setString(2, (String) person[1]); - ps.setString(3, (String) person[2]); - ps.setTimestamp(4, theTime); - Assert.assertEquals(1, ps.executeUpdate()); - } - ps.close(); - - ps = conn.prepareStatement("insert into people_sports (id, person_id, sport_name, last_modified) values (?,?,?,?)"); - for(Object[] sport : people_sports) { - ps.setInt(1, (Integer) sport[0]); - ps.setInt(2, (Integer) sport[1]); - ps.setString(3, (String) sport[2]); - ps.setTimestamp(4, theTime); - Assert.assertEquals(1, ps.executeUpdate()); - } - ps.close(); - conn.commit(); - conn.close(); - } catch(Exception e) { - throw e; - } finally { - try { ps.close(); } catch(Exception ex) { } - try { s.close(); } catch(Exception ex) { } - try { conn.close(); } catch(Exception ex) { } - } - } - protected abstract String deltaQueriesCountryTable() ; - protected abstract String deltaQueriesPersonTable() ; - - - private int getIntFromQuery(String query) throws Exception { - Connection conn = null; - Statement s = null; - ResultSet rs = null; - try { - conn = newConnection(); - s = conn.createStatement(); - rs = s.executeQuery(query); - if(rs.next()) { - return rs.getInt(1); - } - return 0; - } catch (SQLException e) { - throw e; - } finally { - try { rs.close(); } catch(Exception ex) { } - try { s.close(); } catch(Exception ex) { } - try { conn.close(); } catch(Exception ex) { } - } - } - private String[] getStringsFromQuery(String query) throws Exception { - Connection conn = null; - Statement s = null; - ResultSet rs = null; - try { - conn = newConnection(); - s = conn.createStatement(); - rs = s.executeQuery(query); - List results = new ArrayList(); - while(rs.next()) { - results.add(rs.getString(1)); - } - return results.toArray(new String[results.size()]); - } catch (SQLException e) { - throw e; - } finally { - try { rs.close(); } catch(Exception ex) { } - try { s.close(); } catch(Exception ex) { } - try { conn.close(); } catch(Exception ex) { } } } - public int totalCountries() throws Exception { - return getIntFromQuery("SELECT COUNT(1) FROM COUNTRIES WHERE DELETED != 'Y' "); - } - public int totalPeople() throws Exception { - return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' "); + protected void populateData(Connection conn) throws Exception { + // no-op } - public boolean countryCodeExists(String cc) throws Exception { - return getIntFromQuery("SELECT COUNT(1) country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + cc + "'")>0; - } - public String countryNameByCode(String cc) throws Exception { - String[] s = getStringsFromQuery("SELECT country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + cc + "'"); - return s.length==0 ? null : s[0]; - } - public int numberPeopleByCountryCode(String cc) throws Exception { - return getIntFromQuery( - "Select count(1) " + - "from people p " + - "inner join countries c on p.country_code=c.code " + - "where p.deleted!='Y' and c.deleted!='Y' and c.code='" + cc + "'"); - } - public String[] sportNamesByPersonId(int personId) throws Exception { - return getStringsFromQuery( - "SELECT ps.SPORT_NAME " + - "FROM people_sports ps " + - "INNER JOIN PEOPLE p ON p.id = ps.person_Id " + - "WHERE ps.DELETED != 'Y' AND p.DELETED != 'Y' " + - "AND ps.person_id=" + personId + " " + - "ORDER BY ps.id" - ); - } - public boolean personNameExists(String pn) throws Exception { - return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' AND NAME='" + pn + "'")>0; - } - public String personNameById(int id) throws Exception { - String[] nameArr= getStringsFromQuery("SELECT NAME FROM PEOPLE WHERE ID=" + id); - if(nameArr.length==0) { - return null; - } - return nameArr[0]; - } - + public int totalDatabaseRequests(String dihHandlerName) throws Exception { LocalSolrQueryRequest request = lrf.makeRequest("indent", "true"); String response = h.query(dihHandlerName, request); Matcher m = totalRequestsPattern.matcher(response); Assert.assertTrue("The handler " + dihHandlerName - + " is not reporting any database requests. ", m.find() - && m.groupCount() == 1); + + " is not reporting any database requests. ", + m.find() && m.groupCount() == 1); return Integer.parseInt(m.group(1)); } public int totalDatabaseRequests() throws Exception { return totalDatabaseRequests("/dataimport"); } - public IntChanges modifySomePeople() throws Exception { - underlyingDataModified = true; - int numberToChange = random().nextInt(people.length + 1); - Set changeSet = new HashSet(); - Set deleteSet = new HashSet(); - Set addSet = new HashSet(); - Connection conn = null; - PreparedStatement change = null; - PreparedStatement delete = null; - PreparedStatement add = null; - //One second in the future ensures a change time after the last import (DIH uses second precision only) - Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000); - try { - conn = newConnection(); - change = conn.prepareStatement("update people set name=?, last_modified=? where id=?"); - delete = conn.prepareStatement("update people set deleted='Y', last_modified=? where id=?"); - add = conn.prepareStatement("insert into people (id,name,country_code,last_modified) values (?,?,'ZZ',?)"); - for(int i=0 ; i changeSet = new HashSet(); - Connection conn = null; - PreparedStatement change = null; - // One second in the future ensures a change time after the last import (DIH - // uses second precision only) - Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000); - try { - conn = newConnection(); - change = conn - .prepareStatement("update countries set country_name=?, last_modified=? where code=?"); - for (int i = 0; i < numberToChange; i++) { - int tryIndex = random().nextInt(countries.length); - String code = countries[tryIndex][0]; - if (!changeSet.contains(code)) { - changeSet.add(code); - change.setString(1, "MODIFIED " + countries[tryIndex][1]); - change.setTimestamp(2, theTime); - change.setString(3, code); - Assert.assertEquals(1, change.executeUpdate()); - - } - } - } catch (SQLException e) { - throw e; - } finally { - try { - change.close(); - } catch (Exception ex) {} - try { - conn.close(); - } catch (Exception ex) {} - } - return changeSet.toArray(new String[changeSet.size()]); - } - class IntChanges { - public Integer[] changedKeys; - public Integer[] deletedKeys; - public Integer[] addedKeys; - } protected LocalSolrQueryRequest generateRequest() { - return lrf.makeRequest("command", "full-import", "dataConfig", generateConfig(), - "clean", "true", "commit", "true", "synchronous", "true", "indent", "true"); + return lrf.makeRequest("command", "full-import", "dataConfig", + generateConfig(), "clean", "true", "commit", "true", "synchronous", + "true", "indent", "true"); } - protected String generateConfig() { - String ds = null; - if (dbToUse == Database.DERBY) { - ds = "derby"; - } else if (dbToUse == Database.HSQLDB) { - ds = "hsqldb"; - } else { - throw new AssertionError("Invalid database to use: " + dbToUse); - } - StringBuilder sb = new StringBuilder(); - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - - sb.append(" \n"); - sb.append(" \n"); - - if (countryEntity) { - sb.append("\n"); - } else { - sb.append(random().nextBoolean() ? "cacheKey=''CODE'' cacheLookup=''People.COUNTRY_CODE'' " - : "where=''CODE=People.COUNTRY_CODE'' "); - sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES'' "); - sb.append("> \n"); - } - } else { - sb.append("processor=''SqlEntityProcessor'' query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' '' "); - sb.append(deltaQueriesCountryTable()); - sb.append("> \n"); - } - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - } - if (sportsEntity) { - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - } - - sb.append(" \n"); - sb.append(" \n"); - sb.append(" \n"); - String config = sb.toString().replaceAll("[']{2}", "\""); - log.debug(config); - return config; - } + protected abstract String generateConfig(); - public static final String[][] countries = { - {"NA", "Namibia"}, - {"NC", "New Caledonia"}, - {"NE", "Niger"}, - {"NF", "Norfolk Island"}, - {"NG", "Nigeria"}, - {"NI", "Nicaragua"}, - {"NL", "Netherlands"}, - {"NO", "Norway"}, - {"NP", "Nepal"}, - {"NR", "Nauru"}, - {"NU", "Niue"}, - {"NZ", "New Zealand"} - }; - - public static final Object[][] people = { - {1,"Jacob","NZ"}, - {2,"Ethan","NU"}, - {3,"Michael","NR"}, - {4,"Jayden","NP"}, - {5,"William","NO"}, - {6,"Alexander","NL"}, - {7,"Noah","NI"}, - {8,"Daniel","NG"}, - {9,"Aiden","NF"}, - {10,"Anthony","NE"}, - {11,"Emma","NL"}, - {12,"Grace","NI"}, - {13,"Hailey","NG"}, - {14,"Isabella","NF"}, - {15,"Lily","NE"}, - {16,"Madison","NC"}, - {17,"Mia","NA"}, - {18,"Natalie","NZ"}, - {19,"Olivia","NU"}, - {20,"Samantha","NR"} - }; - - public static final Object[][] people_sports = { - {100, 1, "Swimming"}, - {200, 2, "Triathlon"}, - {300, 3, "Water polo"}, - {310, 3, "Underwater rugby"}, - {320, 3, "Kayaking"}, - {400, 4, "Snorkeling"}, - {500, 5, "Synchronized diving"}, - {600, 6, "Underwater rugby"}, - {700, 7, "Boating"}, - {800, 8, "Bodyboarding"}, - {900, 9, "Canoeing"}, - {1000, 10, "Fishing"}, - {1100, 11, "Jet Ski"}, - {1110, 11, "Rowing"}, - {1120, 11, "Sailing"}, - {1200, 12, "Kayaking"}, - {1210, 12, "Canoeing"}, - {1300, 13, "Kite surfing"}, - {1400, 14, "Parasailing"}, - {1500, 15, "Rafting"}, - {1600, 16, "Rowing"}, - {1700, 17, "Sailing"}, - {1800, 18, "White Water Rafting"}, - {1900, 19, "Water skiing"}, - {2000, 20, "Windsurfing"} - }; public static class DerbyUtil { public static final OutputStream DEV_NULL = new OutputStream() { - public void write(int b) {} + public void write(int b) {} }; -} + } } \ No newline at end of file diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractSqlEntityProcessorTestCase.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractSqlEntityProcessorTestCase.java new file mode 100644 index 00000000000..220bec70441 --- /dev/null +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractSqlEntityProcessorTestCase.java @@ -0,0 +1,696 @@ +package org.apache.solr.handler.dataimport; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.After; + +import junit.framework.Assert; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public abstract class AbstractSqlEntityProcessorTestCase extends + AbstractDIHJdbcTestCase { + protected boolean underlyingDataModified; + protected boolean useSimpleCaches; + protected boolean countryEntity; + protected boolean countryCached; + protected boolean sportsEntity; + protected boolean sportsCached; + protected String rootTransformerName; + protected boolean countryTransformer; + protected boolean sportsTransformer; + + @After + public void afterSqlEntitiyProcessorTestCase() { + useSimpleCaches = false; + countryEntity = false; + countryCached = false; + sportsEntity = false; + sportsCached = false; + rootTransformerName = null; + countryTransformer = false; + sportsTransformer = false; + underlyingDataModified = false; + } + + protected abstract String deltaQueriesCountryTable(); + + protected abstract String deltaQueriesPersonTable(); + + protected void singleEntity(int numToExpect) throws Exception { + h.query("/dataimport", generateRequest()); + assertQ("There should be 1 document per person in the database: " + + totalPeople(), req("*:*"), "//*[@numFound='" + totalPeople() + "']"); + Assert.assertTrue("Expecting " + numToExpect + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() == numToExpect); + } + + protected void simpleTransform(int numToExpect) throws Exception { + rootTransformerName = "AddAColumnTransformer"; + h.query("/dataimport", generateRequest()); + assertQ( + "There should be 1 document with a transformer-added column per person is the database: " + + totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + + totalPeople() + "']"); + Assert.assertTrue("Expecting " + numToExpect + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() == numToExpect); + } + + /** + * A delta update will not clean up documents added by a transformer even if + * the parent document that the transformer used to base the new documents + * were deleted + */ + protected void complexTransform(int numToExpect, int numDeleted) + throws Exception { + rootTransformerName = "TripleThreatTransformer"; + h.query("/dataimport", generateRequest()); + int totalDocs = ((totalPeople() * 3) + (numDeleted * 2)); + int totalAddedDocs = (totalPeople() + numDeleted); + assertQ( + req("q", "*:*", "rows", "" + (totalPeople() * 3), "sort", "id asc"), + "//*[@numFound='" + totalDocs + "']"); + assertQ(req("id:TripleThreat-1-*"), "//*[@numFound='" + totalAddedDocs + + "']"); + assertQ(req("id:TripleThreat-2-*"), "//*[@numFound='" + totalAddedDocs + + "']"); + if (personNameExists("Michael") && countryCodeExists("NR")) { + assertQ( + "Michael and NR are assured to be in the database. Therefore the transformer should have added leahciM and RN on the same document as id:TripleThreat-1-3", + req("+id:TripleThreat-1-3 +NAME_mult_s:Michael +NAME_mult_s:leahciM +COUNTRY_CODES_mult_s:NR +COUNTRY_CODES_mult_s:RN"), + "//*[@numFound='1']"); + } + assertQ(req("AddAColumn_s:Added"), "//*[@numFound='" + totalAddedDocs + + "']"); + Assert.assertTrue("Expecting " + numToExpect + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() == numToExpect); + } + + protected void withChildEntities(boolean cached, boolean checkDatabaseRequests) + throws Exception { + rootTransformerName = random().nextBoolean() ? null + : "AddAColumnTransformer"; + int numChildren = random().nextInt(1) + 1; + int numDatabaseRequests = 1; + if (underlyingDataModified) { + if (countryEntity) { + if (cached) { + numDatabaseRequests++; + } else { + numDatabaseRequests += totalPeople(); + } + } + if (sportsEntity) { + if (cached) { + numDatabaseRequests++; + } else { + numDatabaseRequests += totalPeople(); + } + } + } else { + countryEntity = true; + sportsEntity = true; + if (numChildren == 1) { + countryEntity = random().nextBoolean(); + sportsEntity = !countryEntity; + } + if (countryEntity) { + countryTransformer = random().nextBoolean(); + if (cached) { + numDatabaseRequests++; + countryCached = true; + } else { + numDatabaseRequests += totalPeople(); + } + } + if (sportsEntity) { + sportsTransformer = random().nextBoolean(); + if (cached) { + numDatabaseRequests++; + sportsCached = true; + } else { + numDatabaseRequests += totalPeople(); + } + } + } + h.query("/dataimport", generateRequest()); + + assertQ("There should be 1 document per person in the database: " + + totalPeople(), req("*:*"), "//*[@numFound='" + (totalPeople()) + "']"); + if (!underlyingDataModified + && "AddAColumnTransformer".equals(rootTransformerName)) { + assertQ( + "There should be 1 document with a transformer-added column per person is the database: " + + totalPeople(), req("AddAColumn_s:Added"), "//*[@numFound='" + + (totalPeople()) + "']"); + } + if (countryEntity) { + if (personNameExists("Jayden")) { + String nrName = countryNameByCode("NP"); + if (nrName != null && nrName.length() > 0) { + assertQ(req("NAME_mult_s:Jayden"), "//*[@numFound='1']", + "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'"); + } + } + String nrName = countryNameByCode("NR"); + int num = numberPeopleByCountryCode("NR"); + if (nrName != null && num > 0) { + assertQ(req("COUNTRY_CODES_mult_s:NR"), "//*[@numFound='" + num + "']", + "//doc/str[@name='COUNTRY_NAME_s']='" + nrName + "'"); + } + if (countryTransformer && !underlyingDataModified) { + assertQ(req("countryAdded_s:country_added"), "//*[@numFound='" + + totalPeople() + "']"); + } + } + if (sportsEntity) { + if (!underlyingDataModified) { + assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']"); + } + String michaelsName = personNameById(3); + String[] michaelsSports = sportNamesByPersonId(3); + if (michaelsName != null && michaelsSports.length > 0) { + String[] xpath = new String[michaelsSports.length + 1]; + xpath[0] = "//*[@numFound='1']"; + int i = 1; + for (String ms : michaelsSports) { + xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + + ms + "'"; + i++; + } + assertQ(req("NAME_mult_s:" + michaelsName.replaceAll("\\W", "\\\\$0")), + xpath); + } + if (!underlyingDataModified && sportsTransformer) { + assertQ(req("sportsAdded_s:sport_added"), "//*[@numFound='" + + (totalPeople()) + "']"); + } + } + if (checkDatabaseRequests) { + Assert.assertTrue("Expecting " + numDatabaseRequests + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() == numDatabaseRequests); + } + } + + protected void simpleCacheChildEntities(boolean checkDatabaseRequests) + throws Exception { + useSimpleCaches = true; + countryEntity = true; + sportsEntity = true; + countryCached = true; + sportsCached = true; + int dbRequestsMoreThan = 3; + int dbRequestsLessThan = totalPeople() * 2 + 1; + h.query("/dataimport", generateRequest()); + assertQ(req("*:*"), "//*[@numFound='" + (totalPeople()) + "']"); + if (!underlyingDataModified + || (personNameExists("Samantha") && "Nauru" + .equals(countryNameByCode("NR")))) { + assertQ(req("NAME_mult_s:Samantha"), "//*[@numFound='1']", + "//doc/str[@name='COUNTRY_NAME_s']='Nauru'"); + } + if (!underlyingDataModified) { + assertQ(req("COUNTRY_CODES_mult_s:NR"), "//*[@numFound='2']", + "//doc/str[@name='COUNTRY_NAME_s']='Nauru'"); + assertQ(req("SPORT_NAME_mult_s:Sailing"), "//*[@numFound='2']"); + } + String[] michaelsSports = sportNamesByPersonId(3); + if (!underlyingDataModified || michaelsSports.length > 0) { + String[] xpath = new String[michaelsSports.length + 1]; + xpath[0] = "//*[@numFound='1']"; + int i = 1; + for (String ms : michaelsSports) { + xpath[i] = "//doc/arr[@name='SPORT_NAME_mult_s']/str[" + i + "]='" + ms + + "'"; + i++; + } + assertQ(req("NAME_mult_s:Michael"), xpath); + } + if (checkDatabaseRequests) { + Assert.assertTrue("Expecting more than " + dbRequestsMoreThan + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() > dbRequestsMoreThan); + Assert.assertTrue("Expecting fewer than " + dbRequestsLessThan + + " database calls, but DIH reported " + totalDatabaseRequests(), + totalDatabaseRequests() < dbRequestsLessThan); + } + } + + private int getIntFromQuery(String query) throws Exception { + Connection conn = null; + Statement s = null; + ResultSet rs = null; + try { + conn = newConnection(); + s = conn.createStatement(); + rs = s.executeQuery(query); + if (rs.next()) { + return rs.getInt(1); + } + return 0; + } catch (SQLException e) { + throw e; + } finally { + try { + rs.close(); + } catch (Exception ex) {} + try { + s.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} + } + } + + private String[] getStringsFromQuery(String query) throws Exception { + Connection conn = null; + Statement s = null; + ResultSet rs = null; + try { + conn = newConnection(); + s = conn.createStatement(); + rs = s.executeQuery(query); + List results = new ArrayList(); + while (rs.next()) { + results.add(rs.getString(1)); + } + return results.toArray(new String[results.size()]); + } catch (SQLException e) { + throw e; + } finally { + try { + rs.close(); + } catch (Exception ex) {} + try { + s.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} + } + } + + public int totalCountries() throws Exception { + return getIntFromQuery("SELECT COUNT(1) FROM COUNTRIES WHERE DELETED != 'Y' "); + } + + public int totalPeople() throws Exception { + return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' "); + } + + public boolean countryCodeExists(String cc) throws Exception { + return getIntFromQuery("SELECT COUNT(1) country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + + cc + "'") > 0; + } + + public String countryNameByCode(String cc) throws Exception { + String[] s = getStringsFromQuery("SELECT country_name FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='" + + cc + "'"); + return s.length == 0 ? null : s[0]; + } + + public int numberPeopleByCountryCode(String cc) throws Exception { + return getIntFromQuery("Select count(1) " + "from people p " + + "inner join countries c on p.country_code=c.code " + + "where p.deleted!='Y' and c.deleted!='Y' and c.code='" + cc + "'"); + } + + public String[] sportNamesByPersonId(int personId) throws Exception { + return getStringsFromQuery("SELECT ps.SPORT_NAME " + + "FROM people_sports ps " + + "INNER JOIN PEOPLE p ON p.id = ps.person_Id " + + "WHERE ps.DELETED != 'Y' AND p.DELETED != 'Y' " + "AND ps.person_id=" + + personId + " " + "ORDER BY ps.id"); + } + + public boolean personNameExists(String pn) throws Exception { + return getIntFromQuery("SELECT COUNT(1) FROM PEOPLE WHERE DELETED != 'Y' AND NAME='" + + pn + "'") > 0; + } + + public String personNameById(int id) throws Exception { + String[] nameArr = getStringsFromQuery("SELECT NAME FROM PEOPLE WHERE ID=" + + id); + if (nameArr.length == 0) { + return null; + } + return nameArr[0]; + } + + public IntChanges modifySomePeople() throws Exception { + underlyingDataModified = true; + int numberToChange = random().nextInt(people.length + 1); + Set changeSet = new HashSet(); + Set deleteSet = new HashSet(); + Set addSet = new HashSet(); + Connection conn = null; + PreparedStatement change = null; + PreparedStatement delete = null; + PreparedStatement add = null; + // One second in the future ensures a change time after the last import (DIH + // uses second precision only) + Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000); + try { + conn = newConnection(); + change = conn + .prepareStatement("update people set name=?, last_modified=? where id=?"); + delete = conn + .prepareStatement("update people set deleted='Y', last_modified=? where id=?"); + add = conn + .prepareStatement("insert into people (id,name,country_code,last_modified) values (?,?,'ZZ',?)"); + for (int i = 0; i < numberToChange; i++) { + int tryIndex = random().nextInt(people.length); + Integer id = (Integer) people[tryIndex][0]; + if (!changeSet.contains(id) && !deleteSet.contains(id)) { + boolean changeDontDelete = random().nextBoolean(); + if (changeDontDelete) { + changeSet.add(id); + change.setString(1, "MODIFIED " + people[tryIndex][1]); + change.setTimestamp(2, theTime); + change.setInt(3, id); + Assert.assertEquals(1, change.executeUpdate()); + } else { + deleteSet.add(id); + delete.setTimestamp(1, theTime); + delete.setInt(2, id); + Assert.assertEquals(1, delete.executeUpdate()); + } + } + } + int numberToAdd = random().nextInt(3); + for (int i = 0; i < numberToAdd; i++) { + int tryIndex = random().nextInt(people.length); + Integer id = (Integer) people[tryIndex][0]; + Integer newId = id + 1000; + String newDesc = "ADDED " + people[tryIndex][1]; + if (!addSet.contains(newId)) { + addSet.add(newId); + add.setInt(1, newId); + add.setString(2, newDesc); + add.setTimestamp(3, theTime); + Assert.assertEquals(1, add.executeUpdate()); + } + } + conn.commit(); + } catch (SQLException e) { + throw e; + } finally { + try { + change.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} + } + IntChanges c = new IntChanges(); + c.changedKeys = changeSet.toArray(new Integer[changeSet.size()]); + c.deletedKeys = deleteSet.toArray(new Integer[deleteSet.size()]); + c.addedKeys = addSet.toArray(new Integer[addSet.size()]); + return c; + } + + public String[] modifySomeCountries() throws Exception { + underlyingDataModified = true; + int numberToChange = random().nextInt(countries.length + 1); + Set changeSet = new HashSet(); + Connection conn = null; + PreparedStatement change = null; + // One second in the future ensures a change time after the last import (DIH + // uses second precision only) + Timestamp theTime = new Timestamp(System.currentTimeMillis() + 1000); + try { + conn = newConnection(); + change = conn + .prepareStatement("update countries set country_name=?, last_modified=? where code=?"); + for (int i = 0; i < numberToChange; i++) { + int tryIndex = random().nextInt(countries.length); + String code = countries[tryIndex][0]; + if (!changeSet.contains(code)) { + changeSet.add(code); + change.setString(1, "MODIFIED " + countries[tryIndex][1]); + change.setTimestamp(2, theTime); + change.setString(3, code); + Assert.assertEquals(1, change.executeUpdate()); + + } + } + } catch (SQLException e) { + throw e; + } finally { + try { + change.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} + } + return changeSet.toArray(new String[changeSet.size()]); + } + + class IntChanges { + public Integer[] changedKeys; + public Integer[] deletedKeys; + public Integer[] addedKeys; + } + + @Override + protected String generateConfig() { + String ds = null; + if (dbToUse == Database.DERBY) { + ds = "derby"; + } else if (dbToUse == Database.HSQLDB) { + ds = "hsqldb"; + } else { + throw new AssertionError("Invalid database to use: " + dbToUse); + } + StringBuilder sb = new StringBuilder(); + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + + sb.append(" \n"); + sb.append(" \n"); + + if (countryEntity) { + sb.append("\n"); + } else { + sb.append(random().nextBoolean() ? "cacheKey=''CODE'' cacheLookup=''People.COUNTRY_CODE'' " + : "where=''CODE=People.COUNTRY_CODE'' "); + sb.append("query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES'' "); + sb.append("> \n"); + } + } else { + sb.append("processor=''SqlEntityProcessor'' query=''SELECT CODE, COUNTRY_NAME FROM COUNTRIES WHERE DELETED != 'Y' AND CODE='${People.COUNTRY_CODE}' '' "); + sb.append(deltaQueriesCountryTable()); + sb.append("> \n"); + } + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + } + if (sportsEntity) { + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + } + + sb.append(" \n"); + sb.append(" \n"); + sb.append(" \n"); + String config = sb.toString().replaceAll("[']{2}", "\""); + log.debug(config); + return config; + } + @Override + protected void populateData(Connection conn) throws Exception { + Statement s = null; + PreparedStatement ps = null; + Timestamp theTime = new Timestamp(System.currentTimeMillis() - 10000); // 10 seconds ago + try { + s = conn.createStatement(); + s.executeUpdate("create table countries(code varchar(3) not null primary key, country_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)"); + s.executeUpdate("create table people(id int not null primary key, name varchar(50), country_code char(2), deleted char(1) default 'N', last_modified timestamp not null)"); + s.executeUpdate("create table people_sports(id int not null primary key, person_id int, sport_name varchar(50), deleted char(1) default 'N', last_modified timestamp not null)"); + + ps = conn + .prepareStatement("insert into countries (code, country_name, last_modified) values (?,?,?)"); + for (String[] country : countries) { + ps.setString(1, country[0]); + ps.setString(2, country[1]); + ps.setTimestamp(3, theTime); + Assert.assertEquals(1, ps.executeUpdate()); + } + ps.close(); + + ps = conn + .prepareStatement("insert into people (id, name, country_code, last_modified) values (?,?,?,?)"); + for (Object[] person : people) { + ps.setInt(1, (Integer) person[0]); + ps.setString(2, (String) person[1]); + ps.setString(3, (String) person[2]); + ps.setTimestamp(4, theTime); + Assert.assertEquals(1, ps.executeUpdate()); + } + ps.close(); + + ps = conn + .prepareStatement("insert into people_sports (id, person_id, sport_name, last_modified) values (?,?,?,?)"); + for (Object[] sport : people_sports) { + ps.setInt(1, (Integer) sport[0]); + ps.setInt(2, (Integer) sport[1]); + ps.setString(3, (String) sport[2]); + ps.setTimestamp(4, theTime); + Assert.assertEquals(1, ps.executeUpdate()); + } + ps.close(); + conn.commit(); + conn.close(); + } catch (Exception e) { + throw e; + } finally { + try { + ps.close(); + } catch (Exception ex) {} + try { + s.close(); + } catch (Exception ex) {} + try { + conn.close(); + } catch (Exception ex) {} + } + } + public static final String[][] countries = { + {"NA", "Namibia"}, + {"NC", "New Caledonia"}, + {"NE", "Niger"}, + {"NF", "Norfolk Island"}, + {"NG", "Nigeria"}, + {"NI", "Nicaragua"}, + {"NL", "Netherlands"}, + {"NO", "Norway"}, + {"NP", "Nepal"}, + {"NR", "Nauru"}, + {"NU", "Niue"}, + {"NZ", "New Zealand"} + }; + + public static final Object[][] people = { + {1,"Jacob","NZ"}, + {2,"Ethan","NU"}, + {3,"Michael","NR"}, + {4,"Jayden","NP"}, + {5,"William","NO"}, + {6,"Alexander","NL"}, + {7,"Noah","NI"}, + {8,"Daniel","NG"}, + {9,"Aiden","NF"}, + {10,"Anthony","NE"}, + {11,"Emma","NL"}, + {12,"Grace","NI"}, + {13,"Hailey","NG"}, + {14,"Isabella","NF"}, + {15,"Lily","NE"}, + {16,"Madison","NC"}, + {17,"Mia","NA"}, + {18,"Natalie","NZ"}, + {19,"Olivia","NU"}, + {20,"Samantha","NR"} + }; + + public static final Object[][] people_sports = { + {100, 1, "Swimming"}, + {200, 2, "Triathlon"}, + {300, 3, "Water polo"}, + {310, 3, "Underwater rugby"}, + {320, 3, "Kayaking"}, + {400, 4, "Snorkeling"}, + {500, 5, "Synchronized diving"}, + {600, 6, "Underwater rugby"}, + {700, 7, "Boating"}, + {800, 8, "Bodyboarding"}, + {900, 9, "Canoeing"}, + {1000, 10, "Fishing"}, + {1100, 11, "Jet Ski"}, + {1110, 11, "Rowing"}, + {1120, 11, "Sailing"}, + {1200, 12, "Kayaking"}, + {1210, 12, "Canoeing"}, + {1300, 13, "Kite surfing"}, + {1400, 14, "Parasailing"}, + {1500, 15, "Rafting"}, + {1600, 16, "Rowing"}, + {1700, 17, "Sailing"}, + {1800, 18, "White Water Rafting"}, + {1900, 19, "Water skiing"}, + {2000, 20, "Windsurfing"} + }; +} diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSimplePropertiesWriter.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSimplePropertiesWriter.java new file mode 100644 index 00000000000..4e052934312 --- /dev/null +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSimplePropertiesWriter.java @@ -0,0 +1,127 @@ +package org.apache.solr.handler.dataimport; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestSimplePropertiesWriter extends AbstractDIHJdbcTestCase { + + private boolean useJdbcEscapeSyntax; + private String dateFormat; + private String fileLocation; + private String fileName; + + @Before + public void spwBefore() throws Exception { + File tmpdir = File.createTempFile("test", "tmp", TEMP_DIR); + tmpdir.delete(); + tmpdir.mkdir(); + fileLocation = tmpdir.getPath(); + fileName = "the.properties"; + } + @After + public void spwAfter() throws Exception { + new File(fileLocation + File.separatorChar + fileName).delete(); + new File(fileLocation).delete(); + } + @Test + public void testSimplePropertiesWriter() throws Exception { + + SimpleDateFormat errMsgFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + String[] d = { + "{'ts' ''yyyy-MM-dd HH:mm:ss.SSSSSS''}", + "{'ts' ''yyyy-MM-dd HH:mm:ss''}", + "yyyy-MM-dd HH:mm:ss", + "yyyy-MM-dd HH:mm:ss.SSSSSS" + }; + for(int i=0 ; i init = new HashMap(); + init.put("dateFormat", dateFormat); + init.put("filename", fileName); + init.put("directory", fileLocation); + SimplePropertiesWriter spw = new SimplePropertiesWriter(); + spw.init(new DataImporter(), init); + Map props = new HashMap(); + props.put("SomeDates.last_index_time", oneSecondAgo); + props.put("last_index_time", oneSecondAgo); + spw.persist(props); + + h.query("/dataimport", generateRequest()); + props = spw.readIndexerProperties(); + Date entityDate = df.parse((String) props.get("SomeDates.last_index_time")); + Date docDate= df.parse((String) props.get("last_index_time")); + Calendar c = new GregorianCalendar(TimeZone.getTimeZone("GMT"), Locale.ROOT); + c.setTime(docDate); + int year = c.get(Calendar.YEAR); + + Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the document date: " + errMsgFormat.format(docDate), docDate.getTime() - oneSecondAgo.getTime() > 0); + Assert.assertTrue("This date: " + errMsgFormat.format(oneSecondAgo) + " should be prior to the entity date: " + errMsgFormat.format(entityDate), entityDate.getTime() - oneSecondAgo.getTime() > 0); + assertQ(req("*:*"), "//*[@numFound='1']", "//doc/str[@name=\"ayear_s\"]=\"" + year + "\""); + } + } + + @Override + protected Database setAllowedDatabases() { + return Database.DERBY; + } + @Override + protected String generateConfig() { + StringBuilder sb = new StringBuilder(); + String q = useJdbcEscapeSyntax ? "" : "'"; + sb.append(" \n"); + sb.append("\n"); + sb.append(" \n"); + sb.append(" \n"); + sb.append("\n"); + sb.append(" \n"); + sb.append("\n"); + sb.append(" \n"); + sb.append(" \n"); + String config = sb.toString(); + log.debug(config); + return config; + } + +} diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java index 550b58cc74c..aac642c5446 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor.java @@ -23,7 +23,7 @@ import org.junit.Test; /** * Test with various combinations of parameters, child entities, caches, transformers. */ -public class TestSqlEntityProcessor extends AbstractDIHJdbcTestCase { +public class TestSqlEntityProcessor extends AbstractSqlEntityProcessorTestCase { @Test public void testSingleEntity() throws Exception { diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java index cd408139ede..9d5351714fe 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessorDelta.java @@ -1,11 +1,6 @@ package org.apache.solr.handler.dataimport; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; - import org.apache.solr.request.LocalSolrQueryRequest; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -29,20 +24,12 @@ import org.junit.Test; /** * Test with various combinations of parameters, child entites, transformers. */ -public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { +public class TestSqlEntityProcessorDelta extends AbstractSqlEntityProcessorTestCase { private boolean delta = false; private boolean useParentDeltaQueryParam = false; private IntChanges personChanges = null; private String[] countryChanges = null; - //TODO: remove this on fixing SOLR-4051 / SOLR-1916 - private void assumeIncomaptibleLocale() { - Date d = new Date(); - String badDateFormat = DataImporter.DATE_TIME_FORMAT.get().format(d); - String betterDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(d); - Assume.assumeTrue(badDateFormat.equals(betterDateFormat)); - } - @Before public void setupDeltaTest() { delta = false; @@ -51,7 +38,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { } @Test public void testSingleEntity() throws Exception { - assumeIncomaptibleLocale(); singleEntity(1); changeStuff(); int c = calculateDatabaseCalls(); @@ -60,7 +46,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { } @Test public void testWithSimpleTransformer() throws Exception { - assumeIncomaptibleLocale(); simpleTransform(1); changeStuff(); simpleTransform(calculateDatabaseCalls()); @@ -68,7 +53,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { } @Test public void testWithComplexTransformer() throws Exception { - assumeIncomaptibleLocale(); complexTransform(1, 0); changeStuff(); complexTransform(calculateDatabaseCalls(), personChanges.deletedKeys.length); @@ -76,7 +60,6 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { } @Test public void testChildEntities() throws Exception { - assumeIncomaptibleLocale(); useParentDeltaQueryParam = random().nextBoolean(); withChildEntities(false, true); changeStuff(); @@ -137,12 +120,13 @@ public class TestSqlEntityProcessorDelta extends AbstractDIHJdbcTestCase { personChanges = modifySomePeople(); } delta = true; - } + } + @Override protected LocalSolrQueryRequest generateRequest() { return lrf.makeRequest("command", (delta ? "delta-import" : "full-import"), "dataConfig", generateConfig(), "clean", (delta ? "false" : "true"), "commit", "true", "synchronous", "true", "indent", "true"); } - + @Override protected String deltaQueriesPersonTable() { return "deletedPkQuery=''SELECT ID FROM PEOPLE WHERE DELETED='Y' AND last_modified >='${dih.last_index_time}' '' " +