SOLR-2382 Properties writer abstracted

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1148653 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2011-07-20 08:49:32 +00:00
parent f484aebb03
commit ab8f8f994f
9 changed files with 212 additions and 96 deletions

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/test" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="lib2" level="project" />
<orderEntry type="library" name="lib1" level="project" />
<orderEntry type="library" name="lib" level="project" />
<orderEntry type="library" name="lib8" level="project" />
<orderEntry type="module" module-name="Solrj" />
</component>
</module>

View File

@ -188,7 +188,7 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
Boolean commitCalled; Boolean commitCalled;
public SolrWriterImpl() { public SolrWriterImpl() {
super(null, ".", null); super(null, null);
} }
@Override @Override

View File

@ -0,0 +1,34 @@
package org.apache.solr.handler.dataimport;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.util.Properties;
/**
*
*/
public interface DIHPropertiesWriter {
public void init(DataImporter dataImporter);
public boolean isWritable();
public void persist(Properties props);
public Properties readIndexerProperties();
}

View File

@ -113,7 +113,7 @@ public class DataImportHandler extends RequestHandlerBase implements
final InputSource is = new InputSource(core.getResourceLoader().openConfig(configLoc)); final InputSource is = new InputSource(core.getResourceLoader().openConfig(configLoc));
is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc)); is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc));
importer = new DataImporter(is, core, importer = new DataImporter(is, core,
dataSources, coreScopeSession); dataSources, coreScopeSession, myName);
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -165,7 +165,7 @@ public class DataImportHandler extends RequestHandlerBase implements
try { try {
processConfiguration((NamedList) initArgs.get("defaults")); processConfiguration((NamedList) initArgs.get("defaults"));
importer = new DataImporter(new InputSource(new StringReader(requestParams.dataConfig)), req.getCore() importer = new DataImporter(new InputSource(new StringReader(requestParams.dataConfig)), req.getCore()
, dataSources, coreScopeSession); , dataSources, coreScopeSession, myName);
} catch (RuntimeException e) { } catch (RuntimeException e) {
rsp.add("exception", DebugLogger.getStacktraceString(e)); rsp.add("exception", DebugLogger.getStacktraceString(e));
importer = null; importer = null;
@ -280,7 +280,7 @@ public class DataImportHandler extends RequestHandlerBase implements
private SolrWriter getSolrWriter(final UpdateRequestProcessor processor, private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
final SolrResourceLoader loader, final DataImporter.RequestParams requestParams, SolrQueryRequest req) { final SolrResourceLoader loader, final DataImporter.RequestParams requestParams, SolrQueryRequest req) {
return new SolrWriter(processor, loader.getConfigDir(), myName, req) { return new SolrWriter(processor, req) {
@Override @Override
public boolean upload(SolrInputDocument document) { public boolean upload(SolrInputDocument document) {

View File

@ -39,7 +39,6 @@ import org.apache.commons.io.IOUtils;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.StringReader; import java.io.StringReader;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
@ -81,25 +80,34 @@ public class DataImporter {
private SolrCore core; private SolrCore core;
private DIHPropertiesWriter propWriter;
private ReentrantLock importLock = new ReentrantLock(); private ReentrantLock importLock = new ReentrantLock();
private final Map<String , Object> coreScopeSession; private final Map<String , Object> coreScopeSession;
private boolean isDeltaImportSupported = false; private boolean isDeltaImportSupported = false;
private final String handlerName;
/** /**
* Only for testing purposes * Only for testing purposes
*/ */
DataImporter() { DataImporter() {
coreScopeSession = new ConcurrentHashMap<String, Object>(); coreScopeSession = new ConcurrentHashMap<String, Object>();
this.propWriter = new SimplePropertiesWriter();
propWriter.init(this);
this.handlerName = "dataimport" ;
} }
DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session) { DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
this.handlerName = handlerName;
if (dataConfig == null) if (dataConfig == null)
throw new DataImportHandlerException(SEVERE, throw new DataImportHandlerException(SEVERE,
"Configuration not found"); "Configuration not found");
this.core = core; this.core = core;
this.schema = core.getSchema(); this.schema = core.getSchema();
this.propWriter = new SimplePropertiesWriter();
propWriter.init(this);
dataSourceProps = ds; dataSourceProps = ds;
if (session == null) if (session == null)
session = new HashMap<String, Object>(); session = new HashMap<String, Object>();
@ -120,6 +128,10 @@ public class DataImporter {
} }
} }
public String getHandlerName() {
return handlerName;
}
private void verifyWithSchema(Map<String, DataConfig.Field> fields) { private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
Map<String, SchemaField> schemaFields = schema.getFields(); Map<String, SchemaField> schemaFields = schema.getFields();
for (Map.Entry<String, SchemaField> entry : schemaFields.entrySet()) { for (Map.Entry<String, SchemaField> entry : schemaFields.entrySet()) {
@ -353,7 +365,7 @@ public class DataImporter {
setIndexStartTime(new Date()); setIndexStartTime(new Date());
try { try {
docBuilder = new DocBuilder(this, writer, requestParams); docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer); checkWritablePersistFile(writer);
docBuilder.execute(); docBuilder.execute();
if (!requestParams.debug) if (!requestParams.debug)
@ -370,11 +382,11 @@ public class DataImporter {
} }
private void checkWritablePersistFile(SolrWriter writer) { private void checkWritablePersistFile(SolrWriter writer) {
File persistFile = writer.getPersistFile(); // File persistFile = propWriter.getPersistFile();
boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite(); // boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
if (isDeltaImportSupported && !isWritable) { if (isDeltaImportSupported && !propWriter.isWritable()) {
throw new DataImportHandlerException(SEVERE, persistFile.getAbsolutePath() + throw new DataImportHandlerException(SEVERE,
" is not writable. Delta imports are supported by data config but will not work."); "Properties is not writable. Delta imports are supported by data config but will not work.");
} }
} }
@ -384,7 +396,7 @@ public class DataImporter {
try { try {
setIndexStartTime(new Date()); setIndexStartTime(new Date());
docBuilder = new DocBuilder(this, writer, requestParams); docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
checkWritablePersistFile(writer); checkWritablePersistFile(writer);
docBuilder.execute(); docBuilder.execute();
if (!requestParams.debug) if (!requestParams.debug)

View File

@ -68,15 +68,18 @@ public class DocBuilder {
Map<String, Object> functionsNamespace; Map<String, Object> functionsNamespace;
private Properties persistedProperties; private Properties persistedProperties;
public DocBuilder(DataImporter dataImporter, SolrWriter writer, DataImporter.RequestParams reqParams) { private DIHPropertiesWriter propWriter;
public DocBuilder(DataImporter dataImporter, SolrWriter writer, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
INSTANCE.set(this); INSTANCE.set(this);
this.dataImporter = dataImporter; this.dataImporter = dataImporter;
this.writer = writer; this.writer = writer;
this.propWriter = propWriter;
DataImporter.QUERY_COUNT.set(importStatistics.queryCount); DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
requestParameters = reqParams; requestParameters = reqParams;
verboseDebug = requestParameters.debug && requestParameters.verbose; verboseDebug = requestParameters.debug && requestParameters.verbose;
functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this); functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
persistedProperties = writer.readIndexerProperties(); persistedProperties = propWriter.readIndexerProperties();
} }
public VariableResolverImpl getVariableResolver() { public VariableResolverImpl getVariableResolver() {
@ -238,7 +241,7 @@ public class DocBuilder {
addStatusMessage("Optimized"); addStatusMessage("Optimized");
} }
try { try {
writer.persist(lastIndexTimeProps); propWriter.persist(lastIndexTimeProps);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not write property file", e); LOG.error("Could not write property file", e);
statusMessages.put("error", "Could not write property file. Delta imports will not work. " + statusMessages.put("error", "Could not write property file. Delta imports will not work. " +

View File

@ -0,0 +1,123 @@
package org.apache.solr.handler.dataimport;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Properties;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimplePropertiesWriter implements DIHPropertiesWriter {
private static final Logger log = LoggerFactory.getLogger(SimplePropertiesWriter.class);
static final String IMPORTER_PROPERTIES = "dataimport.properties";
static final String LAST_INDEX_KEY = "last_index_time";
private String persistFilename = IMPORTER_PROPERTIES;
private String configDir = null;
public void init(DataImporter dataImporter) {
SolrCore core = dataImporter.getCore();
String configDir = core ==null ? ".": core.getResourceLoader().getConfigDir();
String persistFileName = dataImporter.getHandlerName();
this.configDir = configDir;
if(persistFileName != null){
persistFilename = persistFileName + ".properties";
}
}
private File getPersistFile() {
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator))
filePath += File.separator;
filePath += persistFilename;
return new File(filePath);
}
public boolean isWritable() {
File persistFile = getPersistFile();
return persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
}
@Override
public void persist(Properties p) {
OutputStream propOutput = null;
Properties props = readIndexerProperties();
try {
props.putAll(p);
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator))
filePath += File.separator;
filePath += persistFilename;
propOutput = new FileOutputStream(filePath);
props.store(propOutput, null);
log.info("Wrote last indexed time to " + persistFilename);
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to persist Index Start Time", e);
} finally {
try {
if (propOutput != null)
propOutput.close();
} catch (IOException e) {
propOutput = null;
}
}
}
@Override
public Properties readIndexerProperties() {
Properties props = new Properties();
InputStream propInput = null;
try {
propInput = new FileInputStream(configDir + persistFilename);
props.load(propInput);
log.info("Read " + persistFilename);
} catch (Exception e) {
log.warn("Unable to read: " + persistFilename);
} finally {
try {
if (propInput != null)
propInput.close();
} catch (IOException e) {
propInput = null;
}
}
return props;
}
}

View File

@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.util.Properties;
/** /**
* <p> Writes documents to SOLR as well as provides methods for loading and persisting last index time. </p> * <p> Writes documents to SOLR as well as provides methods for loading and persisting last index time. </p>
@ -39,34 +38,20 @@ import java.util.Properties;
public class SolrWriter { public class SolrWriter {
private static final Logger log = LoggerFactory.getLogger(SolrWriter.class); private static final Logger log = LoggerFactory.getLogger(SolrWriter.class);
static final String IMPORTER_PROPERTIES = "dataimport.properties";
static final String LAST_INDEX_KEY = "last_index_time"; static final String LAST_INDEX_KEY = "last_index_time";
private final UpdateRequestProcessor processor; private final UpdateRequestProcessor processor;
private final String configDir;
private String persistFilename = IMPORTER_PROPERTIES;
DebugLogger debugLogger; DebugLogger debugLogger;
SolrQueryRequest req; SolrQueryRequest req;
public SolrWriter(UpdateRequestProcessor processor, String confDir, SolrQueryRequest req) { public SolrWriter(UpdateRequestProcessor processor, SolrQueryRequest req) {
this.processor = processor; this.processor = processor;
configDir = confDir;
this.req = req;
}
public SolrWriter(UpdateRequestProcessor processor, String confDir, String filePrefix, SolrQueryRequest req) {
this.processor = processor;
configDir = confDir;
if(filePrefix != null){
persistFilename = filePrefix+".properties";
}
this.req = req; this.req = req;
} }
public boolean upload(SolrInputDocument d) { public boolean upload(SolrInputDocument d) {
try { try {
AddUpdateCommand command = new AddUpdateCommand(req); AddUpdateCommand command = new AddUpdateCommand(req);
@ -91,42 +76,6 @@ public class SolrWriter {
} }
} }
void persist(Properties p) {
OutputStream propOutput = null;
Properties props = readIndexerProperties();
try {
props.putAll(p);
File persistFile = getPersistFile();
propOutput = new FileOutputStream(persistFile);
props.store(propOutput, null);
log.info("Wrote last indexed time to " + persistFile.getAbsolutePath());
} catch (FileNotFoundException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to persist Index Start Time", e);
} catch (IOException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Unable to persist Index Start Time", e);
} finally {
try {
if (propOutput != null)
propOutput.close();
} catch (IOException e) {
propOutput = null;
}
}
}
File getPersistFile() {
String filePath = configDir;
if (configDir != null && !configDir.endsWith(File.separator))
filePath += File.separator;
filePath += persistFilename;
return new File(filePath);
}
void finish() { void finish() {
try { try {
processor.finish(); processor.finish();
@ -136,29 +85,6 @@ public class SolrWriter {
} }
} }
Properties readIndexerProperties() {
Properties props = new Properties();
InputStream propInput = null;
try {
propInput = new FileInputStream(configDir
+ persistFilename);
props.load(propInput);
log.info("Read " + persistFilename);
} catch (Exception e) {
log.warn("Unable to read: " + persistFilename);
} finally {
try {
if (propInput != null)
propInput.close();
} catch (IOException e) {
propInput = null;
}
}
return props;
}
public void deleteByQuery(String query) { public void deleteByQuery(String query) {
try { try {
log.info("Deleting documents from Solr with query: " + query); log.info("Deleting documents from Solr with query: " + query);

View File

@ -198,7 +198,7 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
Boolean finishCalled = Boolean.FALSE; Boolean finishCalled = Boolean.FALSE;
public SolrWriterImpl() { public SolrWriterImpl() {
super(null, ".",null); super(null, null);
} }
@Override @Override