SOLR-469 -- Support for streaming xpath parsing and solr add xml (the lost code between DIH patches)

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@682383 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-08-04 13:36:55 +00:00
parent cdc38d0811
commit 3f20de6ba4
2 changed files with 153 additions and 61 deletions

View File

@ -24,10 +24,11 @@ import javax.xml.transform.stream.StreamSource;
import java.io.CharArrayReader; import java.io.CharArrayReader;
import java.io.CharArrayWriter; import java.io.CharArrayWriter;
import java.io.Reader; import java.io.Reader;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue;
import java.util.List; import java.util.concurrent.BlockingQueue;
import java.util.Map; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
/** /**
@ -65,6 +66,12 @@ public class XPathEntityProcessor extends EntityProcessorBase {
protected javax.xml.transform.Transformer xslTransformer; protected javax.xml.transform.Transformer xslTransformer;
protected boolean useSolrAddXml = false;
protected boolean streamRows = false;
private int batchSz = 1000;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void init(Context context) { public void init(Context context) {
super.init(context); super.init(context);
@ -76,8 +83,13 @@ public class XPathEntityProcessor extends EntityProcessorBase {
} }
private void initXpathReader() { private void initXpathReader() {
boolean useSolrAddXml = Boolean.parseBoolean(context useSolrAddXml = Boolean.parseBoolean(context
.getEntityAttribute(USE_SOLR_ADD_SCHEMA)); .getEntityAttribute(USE_SOLR_ADD_SCHEMA));
streamRows = Boolean.parseBoolean(context
.getEntityAttribute(STREAM));
if(context.getEntityAttribute("batchSize") != null){
batchSz = Integer.parseInt(context.getEntityAttribute("batchSize"));
}
String xslt = context.getEntityAttribute(XSL); String xslt = context.getEntityAttribute(XSL);
if (xslt != null) { if (xslt != null) {
xslt = resolver.replaceTokens(xslt); xslt = resolver.replaceTokens(xslt);
@ -208,18 +220,36 @@ public class XPathEntityProcessor extends EntityProcessorBase {
"Exception in applying XSL Transformeation", e); "Exception in applying XSL Transformeation", e);
} }
} }
final List<Map<String, Object>> solrDocs = new ArrayList<Map<String, Object>>(); if(streamRows ){
final boolean useSolrAddXml = Boolean.parseBoolean(context rowIterator = getRowIterator(data);
.getEntityAttribute(USE_SOLR_ADD_SCHEMA)); } else {
xpathReader.streamRecords(data, new XPathRecordReader.Handler() { xpathReader.streamRecords(data, new XPathRecordReader.Handler() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void handle(Map<String, Object> record, String xpath) { public void handle(Map<String, Object> record, String xpath) {
rows.add(readRow(record, xpath));
}
});
rowIterator = rows.iterator();
}
} finally {
if (!streamRows) {
closeIt(data);
}
}
}
private void closeIt(Reader data) {
try {
data.close();
} catch (Exception e) { /* Ignore */
}
}
private Map<String, Object> readRow(Map<String, Object> record, String xpath) {
if (useSolrAddXml) { if (useSolrAddXml) {
List<String> names = (List<String>) record.get("name"); List<String> names = (List<String>) record.get("name");
List<String> values = (List<String>) record.get("value"); List<String> values = (List<String>) record.get("value");
Map<String, Object> row = new HashMap<String, Object>(); Map<String, Object> row = new HashMap<String, Object>();
for (int i = 0; i < names.size(); i++) { for (int i = 0; i < names.size(); i++) {
if (row.containsKey(names.get(i))) { if (row.containsKey(names.get(i))) {
Object existing = row.get(names.get(i)); Object existing = row.get(names.get(i));
@ -236,27 +266,13 @@ public class XPathEntityProcessor extends EntityProcessorBase {
row.put(names.get(i), values.get(i)); row.put(names.get(i), values.get(i));
} }
} }
return row;
solrDocs.add(row);
} else { } else {
record.put(XPATH_FIELD_NAME, xpath); record.put(XPATH_FIELD_NAME, xpath);
rows.add(record); return record;
} }
} }
});
if (useSolrAddXml) {
rowIterator = solrDocs.iterator();
} else {
rowIterator = rows.iterator();
}
} finally {
try {
data.close();
} catch (Exception e) { /* Ignore */
}
}
}
private static class SimpleCharArrayReader extends CharArrayWriter { private static class SimpleCharArrayReader extends CharArrayWriter {
public Reader getReader() { public Reader getReader() {
@ -296,6 +312,55 @@ public class XPathEntityProcessor extends EntityProcessorBase {
} }
return r; return r;
}
private Iterator<Map<String ,Object>> getRowIterator(final Reader data){
final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(batchSz);
final AtomicBoolean isEnd = new AtomicBoolean(false);
new Thread() {
public void run() {
try {
xpathReader.streamRecords(data, new XPathRecordReader.Handler() {
@SuppressWarnings("unchecked")
public void handle(Map<String, Object> record, String xpath) {
if(isEnd.get()) return ;
try {
blockingQueue.offer(readRow(record, xpath), 10, TimeUnit.SECONDS);
} catch (Exception e) {
isEnd.set(true);
}
}
});
} finally {
closeIt(data);
try {
blockingQueue.offer(Collections.EMPTY_MAP, 10, TimeUnit.SECONDS);
} catch (Exception e) { }
}
}
}.start();
return new Iterator<Map<String, Object>>() {
public boolean hasNext() {
return !isEnd.get();
}
public Map<String, Object> next() {
try {
Map<String, Object> row = blockingQueue.poll(10, TimeUnit.SECONDS);
if (row == null || row == Collections.EMPTY_MAP) {
isEnd.set(true);
return null;
}
return row;
} catch (InterruptedException e) {
isEnd.set(true);
return null;
}
}
public void remove() {
/*no op*/
}
};
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -326,4 +391,6 @@ public class XPathEntityProcessor extends EntityProcessorBase {
public static final String XSL = "xsl"; public static final String XSL = "xsl";
public static final String STREAM = "stream";
} }

View File

@ -68,6 +68,31 @@ public class TestXPathEntityProcessor {
Assert.assertEquals("1982", result.get(2).get("year")); Assert.assertEquals("1982", result.get(2).get("year"));
} }
@Test
public void withFieldsAndXpathStream() throws Exception {
Map entityAttrs = createMap("name", "e", "url", "cd.xml",
XPathEntityProcessor.FOR_EACH, "/catalog/cd", "stream", "true", "batchSize","1");
List fields = new ArrayList();
fields.add(createMap("column", "title", "xpath", "/catalog/cd/title"));
fields.add(createMap("column", "artist", "xpath", "/catalog/cd/artist"));
fields.add(createMap("column", "year", "xpath", "/catalog/cd/year"));
Context c = AbstractDataImportHandlerTest.getContext(null,
new VariableResolverImpl(), getds(), 0, fields, entityAttrs);
XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor();
xPathEntityProcessor.init(c);
List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
while (true) {
Map<String, Object> row = xPathEntityProcessor.nextRow();
if (row == null)
break;
result.add(row);
}
Assert.assertEquals(3, result.size());
Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
Assert.assertEquals("1982", result.get(2).get("year"));
}
@Test @Test
public void withDefaultSolrAndXsl() throws Exception { public void withDefaultSolrAndXsl() throws Exception {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();