() {
+ @Override public void onResponse(BulkResponse bulkResponse) {
+ onGoingBulks.decrementAndGet();
+ }
+
+ @Override public void onFailure(Throwable e) {
+ logger.warn("failed to execute bulk");
+ }
+ });
+ } catch (Exception e) {
+ logger.warn("failed to process bulk", e);
+ }
+ }
+ currentRequest = client.prepareBulk();
+ }
+ }
+ }
+
+
+ private String stripTitle(String title) {
+ sb.setLength(0);
+ sb.append(title);
+ while (sb.length() > 0 && (sb.charAt(sb.length() - 1) == '\n' || (sb.charAt(sb.length() - 1) == ' '))) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java
new file mode 100644
index 00000000000..e430908dc91
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia;
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.river.River;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class WikipediaRiverModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(River.class).to(WikipediaRiver.class).asEagerSingleton();
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java
new file mode 100644
index 00000000000..38e9902a206
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+/**
+ * A class abstracting Wiki infobox
+ *
+ * @author Delip Rao
+ */
+public class InfoBox {
+ String infoBoxWikiText = null;
+
+ InfoBox(String infoBoxWikiText) {
+ this.infoBoxWikiText = infoBoxWikiText;
+ }
+
+ public String dumpRaw() {
+ return infoBoxWikiText;
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java
new file mode 100644
index 00000000000..d9b320d967a
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+public class IteratorHandler implements PageCallbackHandler {
+
+ private WikiXMLParser parser = null;
+
+ public IteratorHandler(WikiXMLParser myParser) {
+ parser = myParser;
+ }
+
+ public void process(WikiPage page) {
+ parser.notifyPage(page);
+ }
+
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java
new file mode 100644
index 00000000000..2f6b2a640a2
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java
@@ -0,0 +1,26 @@
+package org.elasticsearch.river.wikipedia.support;
+
+/**
+ * Interface to allow streamed processing of pages.
+ * This allows a SAX style processing of Wikipedia XML files.
+ * The registered callback is executed on each page
+ * element in the XML file.
+ *
+ * Using callbacks will consume lesser memory, an useful feature for large
+ * dumps like English and German.
+ *
+ * @author Delip Rao
+ * @see WikiXMLDOMParser
+ * @see WikiPage
+ */
+
+public interface PageCallbackHandler {
+ /**
+ * This is the callback method that should be implemented before
+ * registering with WikiXMLDOMParser
+ *
+ * @param page a wikipedia page object
+ * @see WikiPage
+ */
+ public void process(WikiPage page);
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java
new file mode 100644
index 00000000000..263d362ca9a
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java
@@ -0,0 +1,60 @@
+package org.elasticsearch.river.wikipedia.support;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.helpers.DefaultHandler;
+
+/**
+ * A Wrapper class for the PageCallbackHandler
+ *
+ * @author Jason Smith
+ */
+public class SAXPageCallbackHandler extends DefaultHandler {
+
+ private PageCallbackHandler pageHandler;
+ private WikiPage currentPage;
+ private String currentTag;
+
+ private String currentWikitext;
+ private String currentTitle;
+ private String currentID;
+
+ public SAXPageCallbackHandler(PageCallbackHandler ph) {
+ pageHandler = ph;
+ }
+
+ public void startElement(String uri, String name, String qName, Attributes attr) {
+ currentTag = qName;
+ if (qName.equals("page")) {
+ currentPage = new WikiPage();
+ currentWikitext = "";
+ currentTitle = "";
+ currentID = "";
+ }
+ }
+
+ public void endElement(String uri, String name, String qName) {
+ if (qName.equals("page")) {
+ currentPage.setTitle(currentTitle);
+ currentPage.setID(currentID);
+ currentPage.setWikiText(currentWikitext);
+ pageHandler.process(currentPage);
+ }
+ if (qName.equals("mediawiki")) {
+ // TODO hasMoreElements() should now return false
+ }
+ }
+
+ public void characters(char ch[], int start, int length) {
+ if (currentTag.equals("title")) {
+ currentTitle = currentTitle.concat(new String(ch, start, length));
+ }
+ // TODO: To avoid looking at the revision ID, only the first ID is taken.
+ // I'm not sure how big the block size is in each call to characters(),
+ // so this may be unsafe.
+ else if ((currentTag.equals("id")) && (currentID.length() == 0)) {
+ currentID = new String(ch, start, length);
+ } else if (currentTag.equals("text")) {
+ currentWikitext = currentWikitext.concat(new String(ch, start, length));
+ }
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java
new file mode 100644
index 00000000000..4943bd7c278
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+import java.util.List;
+
+/**
+ * Data structures for a wikipedia page.
+ *
+ * @author Delip Rao
+ */
+public class WikiPage {
+
+ private String title = null;
+ private WikiTextParser wikiTextParser = null;
+ private String id = null;
+
+ /**
+ * Set the page title. This is not intended for direct use.
+ *
+ * @param title
+ */
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ /**
+ * Set the wiki text associated with this page.
+ * This setter also introduces side effects. This is not intended for direct use.
+ *
+ * @param wtext wiki-formatted text
+ */
+ public void setWikiText(String wtext) {
+ wikiTextParser = new WikiTextParser(wtext);
+ }
+
+ /**
+ * @return a string containing the page title.
+ */
+ public String getTitle() {
+ return title;
+ }
+
+ /**
+ * @param languageCode
+ * @return a string containing the title translated
+ * in the given languageCode.
+ */
+ public String getTranslatedTitle(String languageCode) {
+ return wikiTextParser.getTranslatedTitle(languageCode);
+ }
+
+ /**
+ * @return true if this a disambiguation page.
+ */
+ public boolean isDisambiguationPage() {
+ if (title.contains("(disambiguation)") ||
+ wikiTextParser.isDisambiguationPage())
+ return true;
+ else return false;
+ }
+
+ /**
+ * @return true for "special pages" -- like Category:, Wikipedia:, etc
+ */
+ public boolean isSpecialPage() {
+ return title.contains(":");
+ }
+
+ /**
+ * Use this method to get the wiki text associated with this page.
+ * Useful for custom processing the wiki text.
+ *
+ * @return a string containing the wiki text.
+ */
+ public String getWikiText() {
+ return wikiTextParser.getText();
+ }
+
+ /**
+ * @return true if this is a redirection page
+ */
+ public boolean isRedirect() {
+ return wikiTextParser.isRedirect();
+ }
+
+ /**
+ * @return true if this is a stub page
+ */
+ public boolean isStub() {
+ return wikiTextParser.isStub();
+ }
+
+ /**
+ * @return the title of the page being redirected to.
+ */
+ public String getRedirectPage() {
+ return wikiTextParser.getRedirectText();
+ }
+
+ /**
+ * @return plain text stripped of all wiki formatting.
+ */
+ public String getText() {
+ return wikiTextParser.getPlainText();
+ }
+
+ /**
+ * @return a list of categories the page belongs to, null if this a redirection/disambiguation page
+ */
+ public List getCategories() {
+ return wikiTextParser.getCategories();
+ }
+
+ /**
+ * @return a list of links contained in the page
+ */
+ public List getLinks() {
+ return wikiTextParser.getLinks();
+ }
+
+ public void setID(String id) {
+ this.id = id;
+ }
+
+ public InfoBox getInfoBox() {
+ return wikiTextParser.getInfoBox();
+ }
+
+ public String getID() {
+ return id;
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java
new file mode 100644
index 00000000000..0a793d5c45d
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java
@@ -0,0 +1,47 @@
+package org.elasticsearch.river.wikipedia.support;
+
+import java.util.Vector;
+
+/**
+ * A class to iterate the pages after the wikipedia XML file has been parsed with {@link WikiXMLDOMParser}.
+ *
+ * @author Delip Rao
+ * @see WikiXMLDOMParser
+ */
+public class WikiPageIterator {
+
+ private int currentPage = 0;
+ private int lastPage = 0;
+ Vector pageList = null;
+
+ public WikiPageIterator(Vector list) {
+ pageList = list;
+ if (pageList != null)
+ lastPage = pageList.size();
+ }
+
+ /**
+ * @return true if there are more pages to be read
+ */
+ public boolean hasMorePages() {
+ return (currentPage < lastPage);
+ }
+
+ /**
+ * Reset the iterator.
+ */
+ public void reset() {
+ currentPage = 0;
+ }
+
+ /**
+ * Advances the iterator by one position.
+ *
+ * @return a {@link WikiPage}
+ */
+ public WikiPage nextPage() {
+ if (hasMorePages())
+ return pageList.elementAt(currentPage++);
+ return null;
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java
new file mode 100644
index 00000000000..0459b936838
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * For internal use only -- Used by the {@link WikiPage} class.
+ * Can also be used as a stand alone class to parse wiki formatted text.
+ *
+ * @author Delip Rao
+ */
+public class WikiTextParser {
+
+ private String wikiText = null;
+ private ArrayList pageCats = null;
+ private ArrayList pageLinks = null;
+ private boolean redirect = false;
+ private String redirectString = null;
+ private static Pattern redirectPattern =
+ Pattern.compile("#REDIRECT\\s+\\[\\[(.*?)\\]\\]");
+ private boolean stub = false;
+ private boolean disambiguation = false;
+ private static Pattern stubPattern = Pattern.compile("\\-stub\\}\\}");
+ private static Pattern disambCatPattern = Pattern.compile("\\{\\{disambig\\}\\}");
+ private InfoBox infoBox = null;
+
+ public WikiTextParser(String wtext) {
+ wikiText = wtext;
+ Matcher matcher = redirectPattern.matcher(wikiText);
+ if (matcher.find()) {
+ redirect = true;
+ if (matcher.groupCount() == 1)
+ redirectString = matcher.group(1);
+ }
+ matcher = stubPattern.matcher(wikiText);
+ stub = matcher.find();
+ matcher = disambCatPattern.matcher(wikiText);
+ disambiguation = matcher.find();
+ }
+
+ public boolean isRedirect() {
+ return redirect;
+ }
+
+ public boolean isStub() {
+ return stub;
+ }
+
+ public String getRedirectText() {
+ return redirectString;
+ }
+
+ public String getText() {
+ return wikiText;
+ }
+
+ public ArrayList getCategories() {
+ if (pageCats == null) parseCategories();
+ return pageCats;
+ }
+
+ public ArrayList getLinks() {
+ if (pageLinks == null) parseLinks();
+ return pageLinks;
+ }
+
+ private void parseCategories() {
+ pageCats = new ArrayList();
+ Pattern catPattern = Pattern.compile("\\[\\[Category:(.*?)\\]\\]", Pattern.MULTILINE);
+ Matcher matcher = catPattern.matcher(wikiText);
+ while (matcher.find()) {
+ String[] temp = matcher.group(1).split("\\|");
+ pageCats.add(temp[0]);
+ }
+ }
+
+ private void parseLinks() {
+ pageLinks = new ArrayList();
+
+ Pattern catPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE);
+ Matcher matcher = catPattern.matcher(wikiText);
+ while (matcher.find()) {
+ String[] temp = matcher.group(1).split("\\|");
+ if (temp == null || temp.length == 0) continue;
+ String link = temp[0];
+ if (link.contains(":") == false) {
+ pageLinks.add(link);
+ }
+ }
+ }
+
+ public String getPlainText() {
+ String text = wikiText.replaceAll(">", ">");
+ text = text.replaceAll("<", "<");
+ text = text.replaceAll("[.*?]", " ");
+ text = text.replaceAll("?.*?>", " ");
+ text = text.replaceAll("\\{\\{.*?\\}\\}", " ");
+ text = text.replaceAll("\\[\\[.*?:.*?\\]\\]", " ");
+ text = text.replaceAll("\\[\\[(.*?)\\]\\]", "$1");
+ text = text.replaceAll("\\s(.*?)\\|(\\w+\\s)", " $2");
+ text = text.replaceAll("\\[.*?\\]", " ");
+ text = text.replaceAll("\\'+", "");
+ return text;
+ }
+
+ public InfoBox getInfoBox() {
+ //parseInfoBox is expensive. Doing it only once like other parse* methods
+ if (infoBox == null)
+ infoBox = parseInfoBox();
+ return infoBox;
+ }
+
+ private InfoBox parseInfoBox() {
+ String INFOBOX_CONST_STR = "{{Infobox";
+ int startPos = wikiText.indexOf(INFOBOX_CONST_STR);
+ if (startPos < 0) return null;
+ int bracketCount = 2;
+ int endPos = startPos + INFOBOX_CONST_STR.length();
+ for (; endPos < wikiText.length(); endPos++) {
+ switch (wikiText.charAt(endPos)) {
+ case '}':
+ bracketCount--;
+ break;
+ case '{':
+ bracketCount++;
+ break;
+ default:
+ }
+ if (bracketCount == 0) break;
+ }
+ String infoBoxText = wikiText.substring(startPos, endPos + 1);
+ infoBoxText = stripCite(infoBoxText); // strip clumsy {{cite}} tags
+ // strip any html formatting
+ infoBoxText = infoBoxText.replaceAll(">", ">");
+ infoBoxText = infoBoxText.replaceAll("<", "<");
+ infoBoxText = infoBoxText.replaceAll(".*?", " ");
+ infoBoxText = infoBoxText.replaceAll("?.*?>", " ");
+ return new InfoBox(infoBoxText);
+ }
+
+ private String stripCite(String text) {
+ String CITE_CONST_STR = "{{cite";
+ int startPos = text.indexOf(CITE_CONST_STR);
+ if (startPos < 0) return text;
+ int bracketCount = 2;
+ int endPos = startPos + CITE_CONST_STR.length();
+ for (; endPos < text.length(); endPos++) {
+ switch (text.charAt(endPos)) {
+ case '}':
+ bracketCount--;
+ break;
+ case '{':
+ bracketCount++;
+ break;
+ default:
+ }
+ if (bracketCount == 0) break;
+ }
+ text = text.substring(0, startPos - 1) + text.substring(endPos);
+ return stripCite(text);
+ }
+
+ public boolean isDisambiguationPage() {
+ return disambiguation;
+ }
+
+ public String getTranslatedTitle(String languageCode) {
+ Pattern pattern = Pattern.compile("^\\[\\[" + languageCode + ":(.*?)\\]\\]$", Pattern.MULTILINE);
+ Matcher matcher = pattern.matcher(wikiText);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ return null;
+ }
+
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java
new file mode 100644
index 00000000000..253a0fc3dd2
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+import org.elasticsearch.common.compress.bzip2.CBZip2InputStream;
+import org.xml.sax.InputSource;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * @author Delip Rao
+ * @author Jason Smith
+ */
+public abstract class WikiXMLParser {
+
+ private URL wikiXMLFile = null;
+ protected WikiPage currentPage = null;
+
+ public WikiXMLParser(URL fileName) {
+ wikiXMLFile = fileName;
+ }
+
+ /**
+ * Set a callback handler. The callback is executed every time a
+ * page instance is detected in the stream. Custom handlers are
+ * implementations of {@link PageCallbackHandler}
+ *
+ * @param handler
+ * @throws Exception
+ */
+ public abstract void setPageCallback(PageCallbackHandler handler) throws Exception;
+
+ /**
+ * The main parse method.
+ *
+ * @throws Exception
+ */
+ public abstract void parse() throws Exception;
+
+ /**
+ * @return an iterator to the list of pages
+ * @throws Exception
+ */
+ public abstract WikiPageIterator getIterator() throws Exception;
+
+ /**
+ * @return An InputSource created from wikiXMLFile
+ * @throws Exception
+ */
+ protected InputSource getInputSource() throws Exception {
+ BufferedReader br = null;
+
+ if (wikiXMLFile.toExternalForm().endsWith(".gz")) {
+ br = new BufferedReader(new InputStreamReader(new GZIPInputStream(wikiXMLFile.openStream()), "UTF-8"));
+ } else if (wikiXMLFile.toExternalForm().endsWith(".bz2")) {
+ InputStream fis = wikiXMLFile.openStream();
+ byte[] ignoreBytes = new byte[2];
+ fis.read(ignoreBytes); //"B", "Z" bytes from commandline tools
+ br = new BufferedReader(new InputStreamReader(new CBZip2InputStream(fis), "UTF-8"));
+ } else {
+ br = new BufferedReader(new InputStreamReader(wikiXMLFile.openStream(), "UTF-8"));
+ }
+
+ return new InputSource(br);
+ }
+
+ protected void notifyPage(WikiPage page) {
+ currentPage = page;
+
+ }
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java
new file mode 100644
index 00000000000..a7aeb60156c
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java
@@ -0,0 +1,14 @@
+package org.elasticsearch.river.wikipedia.support;
+
+import java.net.URL;
+
+/**
+ * @author Delip Rao
+ */
+public class WikiXMLParserFactory {
+
+ public static WikiXMLParser getSAXParser(URL fileName) {
+ return new WikiXMLSAXParser(fileName);
+ }
+
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java
new file mode 100644
index 00000000000..3f64715f466
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.river.wikipedia.support;
+
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import java.net.URL;
+
+/**
+ * A SAX Parser for Wikipedia XML dumps.
+ *
+ * @author Jason Smith
+ */
+public class WikiXMLSAXParser extends WikiXMLParser {
+
+ private XMLReader xmlReader;
+ private PageCallbackHandler pageHandler = null;
+
+ public WikiXMLSAXParser(URL fileName) {
+ super(fileName);
+ try {
+ xmlReader = XMLReaderFactory.createXMLReader();
+ pageHandler = new IteratorHandler(this);
+ } catch (SAXException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Set a callback handler. The callback is executed every time a
+ * page instance is detected in the stream. Custom handlers are
+ * implementations of {@link PageCallbackHandler}
+ *
+ * @param handler
+ * @throws Exception
+ */
+ public void setPageCallback(PageCallbackHandler handler) throws Exception {
+ pageHandler = handler;
+ }
+
+ /**
+ * The main parse method.
+ *
+ * @throws Exception
+ */
+ public void parse() throws Exception {
+ xmlReader.setContentHandler(new SAXPageCallbackHandler(pageHandler));
+ xmlReader.parse(getInputSource());
+ }
+
+ /**
+ * This parser is event driven, so it
+ * can't provide a page iterator.
+ */
+ @Override
+ public WikiPageIterator getIterator() throws Exception {
+ if (!(pageHandler instanceof IteratorHandler)) {
+ throw new Exception("Custom page callback found. Will not iterate.");
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * A convenience method for the Wikipedia SAX interface
+ *
+ * @param dumpFile - path to the Wikipedia dump
+ * @param handler - callback handler used for parsing
+ * @throws Exception
+ */
+ public static void parseWikipediaDump(URL dumpFile,
+ PageCallbackHandler handler) throws Exception {
+ WikiXMLParser wxsp = WikiXMLParserFactory.getSAXParser(dumpFile);
+ wxsp.setPageCallback(handler);
+ wxsp.parse();
+ }
+
+}
diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java
new file mode 100644
index 00000000000..9f41cbee375
--- /dev/null
+++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * Copied from wikixmlj on 2010-10-03.
+ *
+ * Changed from File handling to URL handling, and removed Dom parser.
+ */
+package org.elasticsearch.river.wikipedia.support;
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index d122a1c63c6..a0461387349 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,6 +21,7 @@ include 'plugins-transport-memcached'
include 'plugins-transport-thrift'
include 'plugins-river-twitter'
+include 'plugins-river-wikipedia'
include 'plugins-river-rabbitmq'
include 'plugins-river-couchdb'