SOLR-1060 -- A LineEntityProcessor which can stream lines of text from a given file to be indexed directly or for processing with transformers and child entities.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@766638 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-04-20 10:12:50 +00:00
parent a336753fdc
commit e2a020b6c6
7 changed files with 554 additions and 96 deletions

View File

@ -132,6 +132,10 @@ New Features
29.SOLR-934: A MailEntityProcessor to enable indexing mails from POP/IMAP sources into a solr index.
(Preetam Rao, shalin)
30.SOLR-1060: A LineEntityProcessor which can stream lines of text from a given file to be indexed directly or
for processing with transformers and child entities.
(Fergus McMenemie, Noble Paul, shalin)
Optimizations
----------------------
1. SOLR-846: Reduce memory consumption during delta import by removing keys when used

View File

@ -123,7 +123,7 @@ public abstract class AbstractDataImportHandlerTest extends
public String getResolvedEntityAttribute(String name) {
return entityAttrs == null ? delegate.getResolvedEntityAttribute(name) :
delegate.getResolvedEntityAttribute(entityAttrs.get(name));
delegate.getVariableResolver().replaceTokens(entityAttrs.get(name));
}
public List<Map<String, String>> getAllEntityFields() {

View File

@ -18,6 +18,8 @@ package org.apache.solr.handler.dataimport;
import java.io.*;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
@ -46,10 +48,12 @@ public class FileDataSource extends DataSource<Reader> {
private String encoding = null;
private static final Logger LOG = LoggerFactory.getLogger(FileDataSource.class);
public void init(Context context, Properties initProps) {
basePath = initProps.getProperty(BASE_PATH);
if (initProps.get(HttpDataSource.ENCODING) != null)
encoding = initProps.getProperty(HttpDataSource.ENCODING);
if (initProps.get(URLDataSource.ENCODING) != null)
encoding = initProps.getProperty(URLDataSource.ENCODING);
}
/**
@ -75,10 +79,13 @@ public class FileDataSource extends DataSource<Reader> {
file = new File(basePath + query);
if (file.isFile() && file.canRead()) {
LOG.debug("Accessing File: " + file.toString());
return openStream(file);
} else if (file != file0)
if (file0.isFile() && file0.canRead())
if (file0.isFile() && file0.canRead()) {
LOG.debug("Accessing File0: " + file0.toString());
return openStream(file0);
}
throw new FileNotFoundException("Could not find file: " + query);
} catch (UnsupportedEncodingException e) {

View File

@ -43,97 +43,9 @@ import java.util.regex.Pattern;
*
* @version $Id$
* @since solr 1.3
* @deprecated use {@link org.apache.solr.handler.dataimport.URLDataSource} instead
*/
public class HttpDataSource extends DataSource<Reader> {
Logger LOG = LoggerFactory.getLogger(HttpDataSource.class);
private String baseUrl;
private String encoding;
private int connectionTimeout = CONNECTION_TIMEOUT;
private int readTimeout = READ_TIMEOUT;
public HttpDataSource() {
}
public void init(Context context, Properties initProps) {
baseUrl = initProps.getProperty(BASE_URL);
if (initProps.get(ENCODING) != null)
encoding = initProps.getProperty(ENCODING);
String cTimeout = initProps.getProperty(CONNECTION_TIMEOUT_FIELD_NAME);
String rTimeout = initProps.getProperty(READ_TIMEOUT_FIELD_NAME);
if (cTimeout != null) {
try {
connectionTimeout = Integer.parseInt(cTimeout);
} catch (NumberFormatException e) {
LOG.warn( "Invalid connection timeout: " + cTimeout);
}
}
if (rTimeout != null) {
try {
readTimeout = Integer.parseInt(rTimeout);
} catch (NumberFormatException e) {
LOG.warn( "Invalid read timeout: " + rTimeout);
}
}
}
public Reader getData(String query) {
URL url = null;
try {
if (query.startsWith("http:")) {
url = new URL(query);
} else {
url = new URL(baseUrl + query);
}
LOG.info("Created URL to: " + url.toString());
URLConnection conn = url.openConnection();
conn.setConnectTimeout(connectionTimeout);
conn.setReadTimeout(readTimeout);
InputStream in = conn.getInputStream();
String enc = encoding;
if (enc == null) {
String cType = conn.getContentType();
if (cType != null) {
Matcher m = CHARSET_PATTERN.matcher(cType);
if (m.find()) {
enc = m.group(1);
}
}
}
if (enc == null)
enc = UTF_8;
DataImporter.QUERY_COUNT.get().incrementAndGet();
return new InputStreamReader(in, enc);
} catch (Exception e) {
LOG.error( "Exception thrown while getting data", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Exception in invoking url " + url, e);
}
}
public void close() {
}
private static final Pattern CHARSET_PATTERN = Pattern.compile(
".*?charset=(.*)$", Pattern.CASE_INSENSITIVE);
public static final String ENCODING = "encoding";
public static final String BASE_URL = "baseUrl";
public static final String UTF_8 = "UTF-8";
public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout";
public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout";
public static final int CONNECTION_TIMEOUT = 5000;
public static final int READ_TIMEOUT = 10000;
@Deprecated
public class HttpDataSource extends URLDataSource {
}

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import java.io.*;
import java.util.*;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
/**
* <p>
* An EntityProcessor instance which can stream lines of text read from a
* datasource. Options allow lines to be explicitly skipped or included in the index.
* </p>
* <p/>
* <p>
* Attribute summary
* <ul>
* <li>url is the required location of the input file. If this value is
* relative, it assumed to be relative to baseLoc.</li>
* <li>acceptLineRegex is an optional attribute that if present discards any
* line which does not match the regExp.</li>
* <li>skipLineRegex is an optional attribute that is applied after any
* acceptLineRegex and discards any line which matches this regExp.</li>
* </ul>
* </p><p>
* Although envisioned for reading lines from a file or url, LineEntityProcessor may also be useful
* for dealing with change lists, where each line contains filenames which can be used by subsequent entities
* to parse content from those files.
* <p/>
* <p>
* Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
* for more details.
* </p>
* <p/>
* <b>This API is experimental and may change in the future.</b>
*
* @version $Id$
* @since solr 1.4
*/
public class LineEntityProcessor extends EntityProcessorBase {
private Pattern acceptLineRegex, skipLineRegex;
private String url;
private BufferedReader reader;
/**
* Parses each of the entity attributes.
*/
public void init(Context context) {
super.init(context);
String s;
// init a regex to locate files from the input we want to index
s = context.getResolvedEntityAttribute(ACCEPT_LINE_REGEX);
if (s != null) {
acceptLineRegex = Pattern.compile(s);
}
// init a regex to locate files from the input to be skipped
s = context.getResolvedEntityAttribute(SKIP_LINE_REGEX);
if (s != null) {
skipLineRegex = Pattern.compile(s);
}
// the FileName is required.
url = context.getResolvedEntityAttribute(URL);
if (url == null) throw
new DataImportHandlerException(DataImportHandlerException.SEVERE,
"'"+ URL +"' is a required attribute");
}
/**
* Reads lines from the url till it finds a lines that matches the
* optional acceptLineRegex and does not match the optional skipLineRegex.
*
* @return A row containing a minimum of one field "rawLine" or null to signal
* end of file. The rawLine is the as line as returned by readLine()
* from the url. However transformers can be used to create as
* many other fields as required.
*/
public Map<String, Object> nextRow() {
if (reader == null) {
reader = new BufferedReader((Reader) context.getDataSource().getData(url));
}
String line;
while ( true ) {
// read a line from the input file
try {
line = reader.readLine();
}
catch (IOException exp) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Problem reading from input", exp);
}
if (line == null) return null; // end of input
// First scan whole line to see if we want it
if (acceptLineRegex != null && ! acceptLineRegex.matcher(line).find()) continue;
if (skipLineRegex != null && skipLineRegex.matcher(line).find()) continue;
// Contruct the 'row' of fields
Map<String, Object> row = new HashMap<String, Object>();
row.put("rawLine", line);
return row;
}
}
@Override
public void destroy() {
if (reader != null)
IOUtils.closeQuietly(reader);
super.destroy();
}
/**
* Holds the name of entity attribute that will be parsed to obtain
* the filename containing the changelist.
*/
public static final String URL = "url";
/**
* Holds the name of entity attribute that will be parsed to obtain
* the pattern to be used when checking to see if a line should
* be returned.
*/
public static final String ACCEPT_LINE_REGEX = "acceptLineRegex";
/**
* Holds the name of entity attribute that will be parsed to obtain
* the pattern to be used when checking to see if a line should
* be ignored.
*/
public static final String SKIP_LINE_REGEX = "skipLineRegex";
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* <p> A data source implementation which can be used to read character files using HTTP. </p> <p/> <p> Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
* details. </p>
* <p/>
* <b>This API is experimental and may change in the future.</b>
*
* @version $Id$
* @since solr 1.4
*/
public class URLDataSource extends DataSource<Reader> {
Logger LOG = LoggerFactory.getLogger(URLDataSource.class);
private String baseUrl;
private String encoding;
private int connectionTimeout = CONNECTION_TIMEOUT;
private int readTimeout = READ_TIMEOUT;
public URLDataSource() {
}
public void init(Context context, Properties initProps) {
baseUrl = initProps.getProperty(BASE_URL);
if (initProps.get(ENCODING) != null)
encoding = initProps.getProperty(ENCODING);
String cTimeout = initProps.getProperty(CONNECTION_TIMEOUT_FIELD_NAME);
String rTimeout = initProps.getProperty(READ_TIMEOUT_FIELD_NAME);
if (cTimeout != null) {
try {
connectionTimeout = Integer.parseInt(cTimeout);
} catch (NumberFormatException e) {
LOG.warn("Invalid connection timeout: " + cTimeout);
}
}
if (rTimeout != null) {
try {
readTimeout = Integer.parseInt(rTimeout);
} catch (NumberFormatException e) {
LOG.warn("Invalid read timeout: " + rTimeout);
}
}
}
public Reader getData(String query) {
URL url = null;
try {
if (URIMETHOD.matcher(query).find()) url = new URL(query);
else url = new URL(baseUrl + query);
LOG.debug("Accessing URL: " + url.toString());
URLConnection conn = url.openConnection();
conn.setConnectTimeout(connectionTimeout);
conn.setReadTimeout(readTimeout);
InputStream in = conn.getInputStream();
String enc = encoding;
if (enc == null) {
String cType = conn.getContentType();
if (cType != null) {
Matcher m = CHARSET_PATTERN.matcher(cType);
if (m.find()) {
enc = m.group(1);
}
}
}
if (enc == null)
enc = UTF_8;
DataImporter.QUERY_COUNT.get().incrementAndGet();
return new InputStreamReader(in, enc);
} catch (Exception e) {
LOG.error("Exception thrown while getting data", e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Exception in invoking url " + url, e);
}
}
public void close() {
}
private static final Pattern URIMETHOD = Pattern.compile("\\w{3,}:/");
private static final Pattern CHARSET_PATTERN = Pattern.compile(".*?charset=(.*)$", Pattern.CASE_INSENSITIVE);
public static final String ENCODING = "encoding";
public static final String BASE_URL = "baseUrl";
public static final String UTF_8 = "UTF-8";
public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout";
public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout";
public static final int CONNECTION_TIMEOUT = 5000;
public static final int READ_TIMEOUT = 10000;
}

View File

@ -0,0 +1,250 @@
package org.apache.solr.handler.dataimport;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.*;
/**
* <p> Test for TestLineEntityProcessor </p>
*
* @version $Id$
* @since solr 1.4
*/
public class TestLineEntityProcessor {
@Test
/************************************************************************/
public void simple() throws IOException {
/* we want to create the equiv of :-
* <entity name="list_all_files"
* processor="LineEntityProcessor"
* fileName="dummy.lis"
* />
*/
Map attrs = AbstractDataImportHandlerTest.createMap(
LineEntityProcessor.URL, "dummy.lis",
LineEntityProcessor.ACCEPT_LINE_REGEX, null,
LineEntityProcessor.SKIP_LINE_REGEX, null
);
Context c = AbstractDataImportHandlerTest.getContext(
null, //parentEntity
new VariableResolverImpl(), //resolver
getDataSource(filecontents), //parentDataSource
Context.FULL_DUMP, //currProcess
Collections.EMPTY_LIST, //entityFields
attrs //entityAttrs
);
LineEntityProcessor ep = new LineEntityProcessor();
ep.init(c);
/// call the entity processor to the list of lines
System.out.print("\n");
List<String> fList = new ArrayList<String>();
while (true) {
Map<String, Object> f = ep.nextRow();
if (f == null) break;
fList.add((String) f.get("rawLine"));
System.out.print(" rawLine='" + f.get("rawLine") + "'\n");
}
Assert.assertEquals(24, fList.size());
}
@Test
/************************************************************************/
public void only_xml_files() throws IOException {
/* we want to create the equiv of :-
* <entity name="list_all_files"
* processor="LineEntityProcessor"
* fileName="dummy.lis"
* acceptLineRegex="xml"
* />
*/
Map attrs = AbstractDataImportHandlerTest.createMap(
LineEntityProcessor.URL, "dummy.lis",
LineEntityProcessor.ACCEPT_LINE_REGEX, "xml",
LineEntityProcessor.SKIP_LINE_REGEX, null
);
Context c = AbstractDataImportHandlerTest.getContext(
null, //parentEntity
new VariableResolverImpl(), //resolver
getDataSource(filecontents), //parentDataSource
Context.FULL_DUMP, //currProcess
Collections.EMPTY_LIST, //entityFields
attrs //entityAttrs
);
LineEntityProcessor ep = new LineEntityProcessor();
ep.init(c);
/// call the entity processor to the list of lines
List<String> fList = new ArrayList<String>();
while (true) {
Map<String, Object> f = ep.nextRow();
if (f == null) break;
fList.add((String) f.get("rawLine"));
}
Assert.assertEquals(5, fList.size());
}
@Test
/************************************************************************/
public void only_xml_files_no_xsd() throws IOException {
/* we want to create the equiv of :-
* <entity name="list_all_files"
* processor="LineEntityProcessor"
* fileName="dummy.lis"
* acceptLineRegex="\\.xml"
* omitLineRegex="\\.xsd"
* />
*/
Map attrs = AbstractDataImportHandlerTest.createMap(
LineEntityProcessor.URL, "dummy.lis",
LineEntityProcessor.ACCEPT_LINE_REGEX, "\\.xml",
LineEntityProcessor.SKIP_LINE_REGEX, "\\.xsd"
);
Context c = AbstractDataImportHandlerTest.getContext(
null, //parentEntity
new VariableResolverImpl(), //resolver
getDataSource(filecontents), //parentDataSource
Context.FULL_DUMP, //currProcess
Collections.EMPTY_LIST, //entityFields
attrs //entityAttrs
);
LineEntityProcessor ep = new LineEntityProcessor();
ep.init(c);
/// call the entity processor to walk the directory
List<String> fList = new ArrayList<String>();
while (true) {
Map<String, Object> f = ep.nextRow();
if (f == null) break;
fList.add((String) f.get("rawLine"));
}
Assert.assertEquals(4, fList.size());
}
@Test
/************************************************************************/
public void no_xsd_files() throws IOException {
/* we want to create the equiv of :-
* <entity name="list_all_files"
* processor="LineEntityProcessor"
* fileName="dummy.lis"
* omitLineRegex="\\.xsd"
* />
*/
Map attrs = AbstractDataImportHandlerTest.createMap(
LineEntityProcessor.URL, "dummy.lis",
LineEntityProcessor.SKIP_LINE_REGEX, "\\.xsd"
);
Context c = AbstractDataImportHandlerTest.getContext(
null, //parentEntity
new VariableResolverImpl(), //resolver
getDataSource(filecontents), //parentDataSource
Context.FULL_DUMP, //currProcess
Collections.EMPTY_LIST, //entityFields
attrs //entityAttrs
);
LineEntityProcessor ep = new LineEntityProcessor();
ep.init(c);
/// call the entity processor to walk the directory
List<String> fList = new ArrayList<String>();
while (true) {
Map<String, Object> f = ep.nextRow();
if (f == null) break;
fList.add((String) f.get("rawLine"));
}
Assert.assertEquals(18, fList.size());
}
/**
* ********************************************************************
*/
public static Map<String, String> createField(
String col, // DIH column name
String type, // field type from schema.xml
String srcCol, // DIH transformer attribute 'sourceColName'
String re, // DIH regex attribute 'regex'
String rw, // DIH regex attribute 'replaceWith'
String gn // DIH regex attribute 'groupNames'
) {
HashMap<String, String> vals = new HashMap<String, String>();
vals.put("column", col);
vals.put("type", type);
vals.put("sourceColName", srcCol);
vals.put("regex", re);
vals.put("replaceWith", rw);
vals.put("groupNames", gn);
return vals;
}
private DataSource<Reader> getDataSource(final String xml) {
return new DataSource<Reader>() {
public void init(Context context, Properties initProps) {
}
public void close() {
}
public Reader getData(String query) {
return new StringReader(xml);
}
};
}
private static final String filecontents =
"\n" +
"# this is what the output from 'find . -ls; looks like, athough the format\n" +
"# of the time stamp varies depending on the age of the file and your LANG \n" +
"# env setting\n" +
"412577 0 drwxr-xr-x 6 user group 204 1 Apr 10:53 /Volumes/spare/ts\n" +
"412582 0 drwxr-xr-x 13 user group 442 1 Apr 10:18 /Volumes/spare/ts/config\n" +
"412583 24 -rwxr-xr-x 1 user group 8318 1 Apr 11:10 /Volumes/spare/ts/config/dc.xsd\n" +
"412584 32 -rwxr-xr-x 1 user group 12847 1 Apr 11:10 /Volumes/spare/ts/config/dcterms.xsd\n" +
"412585 8 -rwxr-xr-x 1 user group 3156 1 Apr 11:10 /Volumes/spare/ts/config/s-deliver.css\n" +
"412586 192 -rwxr-xr-x 1 user group 97764 1 Apr 11:10 /Volumes/spare/ts/config/s-deliver.xsl\n" +
"412587 224 -rwxr-xr-x 1 user group 112700 1 Apr 11:10 /Volumes/spare/ts/config/sml-delivery-2.1.xsd\n" +
"412588 208 -rwxr-xr-x 1 user group 103419 1 Apr 11:10 /Volumes/spare/ts/config/sml-delivery-norm-2.0.dtd\n" +
"412589 248 -rwxr-xr-x 1 user group 125296 1 Apr 11:10 /Volumes/spare/ts/config/sml-delivery-norm-2.1.dtd\n" +
"412590 72 -rwxr-xr-x 1 user group 36256 1 Apr 11:10 /Volumes/spare/ts/config/jm.xsd\n" +
"412591 8 -rwxr-xr-x 1 user group 990 1 Apr 11:10 /Volumes/spare/ts/config/video.gif\n" +
"412592 8 -rwxr-xr-x 1 user group 1498 1 Apr 11:10 /Volumes/spare/ts/config/xlink.xsd\n" +
"412593 8 -rwxr-xr-x 1 user group 1155 1 Apr 11:10 /Volumes/spare/ts/config/xml.xsd\n" +
"412594 0 drwxr-xr-x 4 user group 136 1 Apr 10:18 /Volumes/spare/ts/acm19\n" +
"412621 0 drwxr-xr-x 57 user group 1938 1 Apr 10:18 /Volumes/spare/ts/acm19/data\n" +
"412622 24 -rwxr-xr-x 1 user group 8894 1 Apr 11:09 /Volumes/spare/ts/acm19/data/00000510.xml\n" +
"412623 32 -rwxr-xr-x 1 user group 14124 1 Apr 11:09 /Volumes/spare/ts/acm19/data/00000603.xml\n" +
"412624 24 -rwxr-xr-x 1 user group 11976 1 Apr 11:09 /Volumes/spare/ts/acm19/data/00001292.xml\n" +
"# tacked on an extra line to cause a file to be deleted.\n" +
"DELETE /Volumes/spare/ts/acm19/data/00001292old.xml\n" +
"";
}