SOLR-1539

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@882852 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2009-11-21 07:56:14 +00:00
parent ff1ceb4aa8
commit 4d65be52cc
2 changed files with 159 additions and 36 deletions

View File

@ -51,6 +51,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class XPathEntityProcessor extends EntityProcessorBase { public class XPathEntityProcessor extends EntityProcessorBase {
private static final Logger LOG = LoggerFactory.getLogger(XPathEntityProcessor.class); private static final Logger LOG = LoggerFactory.getLogger(XPathEntityProcessor.class);
private static final Map<String, Object> END_MARKER = new HashMap<String, Object>();
protected List<String> placeHolderVariables; protected List<String> placeHolderVariables;
protected List<String> commonFields; protected List<String> commonFields;
@ -67,8 +69,17 @@ public class XPathEntityProcessor extends EntityProcessorBase {
protected boolean streamRows = false; protected boolean streamRows = false;
private int batchSz = 1000; // Amount of time to block reading/writing to queue when streaming
protected int blockingQueueTimeOut = 10;
// Units for pumpTimeOut
protected TimeUnit blockingQueueTimeOutUnits = TimeUnit.SECONDS;
// Number of rows to queue for asynchronous processing
protected int blockingQueueSize = 1000;
protected Thread publisherThread;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void init(Context context) { public void init(Context context) {
super.init(context); super.init(context);
@ -85,8 +96,11 @@ public class XPathEntityProcessor extends EntityProcessorBase {
.getEntityAttribute(USE_SOLR_ADD_SCHEMA)); .getEntityAttribute(USE_SOLR_ADD_SCHEMA));
streamRows = Boolean.parseBoolean(context streamRows = Boolean.parseBoolean(context
.getEntityAttribute(STREAM)); .getEntityAttribute(STREAM));
if (context.getEntityAttribute("batchSize") != null) { if (context.getResolvedEntityAttribute("batchSize") != null) {
batchSz = Integer.parseInt(context.getEntityAttribute("batchSize")); blockingQueueSize = Integer.parseInt(context.getEntityAttribute("batchSize"));
}
if (context.getResolvedEntityAttribute("readTimeOut") != null) {
blockingQueueTimeOut = Integer.parseInt(context.getEntityAttribute("readTimeOut"));
} }
String xslt = context.getEntityAttribute(XSL); String xslt = context.getEntityAttribute(XSL);
if (xslt != null) { if (xslt != null) {
@ -316,7 +330,7 @@ public class XPathEntityProcessor extends EntityProcessorBase {
} }
} }
private Map<String, Object> readRow(Map<String, Object> record, String xpath) { protected Map<String, Object> readRow(Map<String, Object> record, String xpath) {
if (useSolrAddXml) { if (useSolrAddXml) {
List<String> names = (List<String>) record.get("name"); List<String> names = (List<String>) record.get("name");
List<String> values = (List<String>) record.get("value"); List<String> values = (List<String>) record.get("value");
@ -381,33 +395,58 @@ public class XPathEntityProcessor extends EntityProcessorBase {
private Iterator<Map<String, Object>> getRowIterator(final Reader data, final String s) { private Iterator<Map<String, Object>> getRowIterator(final Reader data, final String s) {
//nothing atomic about it. I just needed a StongReference //nothing atomic about it. I just needed a StongReference
final AtomicReference<Exception> exp = new AtomicReference<Exception>(); final AtomicReference<Exception> exp = new AtomicReference<Exception>();
final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(batchSz); final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(blockingQueueSize);
final AtomicBoolean isEnd = new AtomicBoolean(false); final AtomicBoolean isEnd = new AtomicBoolean(false);
new Thread() { final AtomicBoolean throwExp = new AtomicBoolean(true);
publisherThread = new Thread() {
public void run() { public void run() {
try { try {
xpathReader.streamRecords(data, new XPathRecordReader.Handler() { xpathReader.streamRecords(data, new XPathRecordReader.Handler() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void handle(Map<String, Object> record, String xpath) { public void handle(Map<String, Object> record, String xpath) {
if (isEnd.get()) return; if (isEnd.get()) {
throwExp.set(false);
//To end the streaming . otherwise the parsing will go on forever
//though consumer has gone away
throw new RuntimeException("BREAK");
}
Map<String, Object> row;
try { try {
blockingQueue.offer(readRow(record, xpath), 10, TimeUnit.SECONDS); row = readRow(record, xpath);
} catch (Exception e) { } catch (Exception e) {
isEnd.set(true); isEnd.set(true);
return;
} }
offer(row);
} }
}); });
} catch (Exception e) { } catch (Exception e) {
exp.set(e); if(throwExp.get()) exp.set(e);
} finally { } finally {
closeIt(data); closeIt(data);
try { if (!isEnd.get()) {
blockingQueue.offer(Collections.EMPTY_MAP, 10, TimeUnit.SECONDS); offer(END_MARKER);
} catch (Exception e) {
} }
} }
} }
}.start();
private void offer(Map<String, Object> row) {
try {
while (!blockingQueue.offer(row, blockingQueueTimeOut, blockingQueueTimeOutUnits)) {
if (isEnd.get()) return;
LOG.debug("Timeout elapsed writing records. Perhaps buffer size should be increased.");
}
} catch (InterruptedException e) {
return;
} finally {
synchronized (this) {
notifyAll();
}
}
}
};
publisherThread.start();
return new Iterator<Map<String, Object>>() { return new Iterator<Map<String, Object>>() {
private Map<String, Object> lastRow; private Map<String, Object> lastRow;
@ -418,29 +457,38 @@ public class XPathEntityProcessor extends EntityProcessorBase {
} }
public Map<String, Object> next() { public Map<String, Object> next() {
try { Map<String, Object> row;
Map<String, Object> row = blockingQueue.poll(10, TimeUnit.SECONDS);
if (row == null || row == Collections.EMPTY_MAP) { do {
isEnd.set(true); try {
if (exp.get() != null) { row = blockingQueue.poll(blockingQueueTimeOut, blockingQueueTimeOutUnits);
String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count; if (row == null) {
if (lastRow != null) msg += " last row in this xml:" + lastRow; LOG.debug("Timeout elapsed reading records.");
if (ABORT.equals(onError)) {
wrapAndThrow(SEVERE, exp.get(), msg);
} else if (SKIP.equals(onError)) {
wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
} else {
LOG.warn(msg, exp.get());
}
} }
} catch (InterruptedException e) {
LOG.debug("Caught InterruptedException while waiting for row. Aborting.");
isEnd.set(true);
return null; return null;
} }
count++; } while (row == null);
return lastRow = row;
} catch (InterruptedException e) { if (row == END_MARKER) {
isEnd.set(true); isEnd.set(true);
if (exp.get() != null) {
String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
if (lastRow != null) msg += " last row in this xml:" + lastRow;
if (ABORT.equals(onError)) {
wrapAndThrow(SEVERE, exp.get(), msg);
} else if (SKIP.equals(onError)) {
wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
} else {
LOG.warn(msg, exp.get());
}
}
return null; return null;
} }
count++;
return lastRow = row;
} }
public void remove() { public void remove() {

View File

@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
/** /**
* <p> * <p>
@ -37,6 +38,10 @@ import java.util.Properties;
* @since solr 1.3 * @since solr 1.3
*/ */
public class TestXPathEntityProcessor { public class TestXPathEntityProcessor {
boolean simulateSlowReader;
boolean simulateSlowResultProcessor;
int rowsToRead = -1;
@Test @Test
public void withFieldsAndXpath() throws Exception { public void withFieldsAndXpath() throws Exception {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@ -110,6 +115,9 @@ public class TestXPathEntityProcessor {
@Test @Test
public void withFieldsAndXpathStream() throws Exception { public void withFieldsAndXpathStream() throws Exception {
final Object monitor = new Object();
final boolean[] done = new boolean[1];
Map entityAttrs = createMap("name", "e", "url", "cd.xml", Map entityAttrs = createMap("name", "e", "url", "cd.xml",
XPathEntityProcessor.FOR_EACH, "/catalog/cd", "stream", "true", "batchSize","1"); XPathEntityProcessor.FOR_EACH, "/catalog/cd", "stream", "true", "batchSize","1");
List fields = new ArrayList(); List fields = new ArrayList();
@ -118,21 +126,88 @@ public class TestXPathEntityProcessor {
fields.add(createMap("column", "year", "xpath", "/catalog/cd/year")); fields.add(createMap("column", "year", "xpath", "/catalog/cd/year"));
Context c = AbstractDataImportHandlerTest.getContext(null, Context c = AbstractDataImportHandlerTest.getContext(null,
new VariableResolverImpl(), getDataSource(cdData), Context.FULL_DUMP, fields, entityAttrs); new VariableResolverImpl(), getDataSource(cdData), Context.FULL_DUMP, fields, entityAttrs);
XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor(); XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor() {
private int count;
@Override
protected Map<String, Object> readRow(Map<String, Object> record,
String xpath) {
synchronized (monitor) {
if (simulateSlowReader && !done[0]) {
try {
monitor.wait(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
return super.readRow(record, xpath);
}
};
if (simulateSlowResultProcessor) {
xPathEntityProcessor.blockingQueueSize = 1;
}
xPathEntityProcessor.blockingQueueTimeOut = 1;
xPathEntityProcessor.blockingQueueTimeOutUnits = TimeUnit.MICROSECONDS;
xPathEntityProcessor.init(c); xPathEntityProcessor.init(c);
List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(); List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
while (true) { while (true) {
if (rowsToRead >= 0 && result.size() >= rowsToRead) {
Thread.currentThread().interrupt();
}
Map<String, Object> row = xPathEntityProcessor.nextRow(); Map<String, Object> row = xPathEntityProcessor.nextRow();
if (row == null) if (row == null)
break; break;
result.add(row); result.add(row);
if (simulateSlowResultProcessor) {
synchronized (xPathEntityProcessor.publisherThread) {
if (xPathEntityProcessor.publisherThread.isAlive()) {
xPathEntityProcessor.publisherThread.wait(1000);
}
}
}
}
synchronized (monitor) {
done[0] = true;
monitor.notify();
}
// confirm that publisher thread stops.
xPathEntityProcessor.publisherThread.join(1000);
Assert.assertEquals("Expected thread to stop", false, xPathEntityProcessor.publisherThread.isAlive());
Assert.assertEquals(rowsToRead < 0 ? 3 : rowsToRead, result.size());
if (rowsToRead < 0) {
Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
Assert.assertEquals("1982", result.get(2).get("year"));
} }
Assert.assertEquals(3, result.size());
Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
Assert.assertEquals("1982", result.get(2).get("year"));
} }
@Test
public void withFieldsAndXpathStreamContinuesOnTimeout() throws Exception {
simulateSlowReader = true;
withFieldsAndXpathStream();
}
@Test
public void streamWritesMessageAfterBlockedAttempt() throws Exception {
simulateSlowResultProcessor = true;
withFieldsAndXpathStream();
}
@Test
public void streamStopsAfterInterrupt() throws Exception {
simulateSlowResultProcessor = true;
rowsToRead = 1;
withFieldsAndXpathStream();
}
@Test @Test
public void withDefaultSolrAndXsl() throws Exception { public void withDefaultSolrAndXsl() throws Exception {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();